write server test (#19)
This commit is contained in:
@@ -12,3 +12,7 @@ tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] }
|
||||
tracing = "0.1.13"
|
||||
tracing-futures = { version = "0.2.3", features = ["tokio"] }
|
||||
tracing-subscriber = "0.2.2"
|
||||
|
||||
[dev-dependencies]
|
||||
# Enable test-utilities in dev mode only. This is mostly for tests.
|
||||
tokio = { git = "https://github.com/tokio-rs/tokio", features = ["test-util"] }
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use mini_redis::{server, DEFAULT_PORT};
|
||||
|
||||
use clap::Clap;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::signal;
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn main() -> mini_redis::Result<()> {
|
||||
@@ -10,7 +12,11 @@ pub async fn main() -> mini_redis::Result<()> {
|
||||
|
||||
let cli = Cli::parse();
|
||||
let port = cli.port.unwrap_or(DEFAULT_PORT.to_string());
|
||||
server::run(&port).await
|
||||
|
||||
// Bind a TCP listener
|
||||
let listener = TcpListener::bind(&format!("127.0.0.1:{}", port)).await?;
|
||||
|
||||
server::run(listener, signal::ctrl_c()).await
|
||||
}
|
||||
|
||||
#[derive(Clap, Debug)]
|
||||
|
||||
@@ -80,7 +80,7 @@ impl Frame {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
_ => Err(Error::Invalid),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{Command, Connection, Db, Shutdown};
|
||||
|
||||
use std::future::Future;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::signal;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::{debug, error, instrument, info};
|
||||
|
||||
@@ -31,15 +31,29 @@ struct Handler {
|
||||
}
|
||||
|
||||
/// Run the mini-redis server.
|
||||
pub async fn run(port: &str) -> crate::Result<()> {
|
||||
///
|
||||
/// Accepts connections from the supplied listener. For each inbound connection,
|
||||
/// a task is spawned to handle that connection. The server runs until the
|
||||
/// `shutdown` future completes, at which point the server shuts down
|
||||
/// gracefully.
|
||||
///
|
||||
/// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will
|
||||
/// listen for a SIGINT signal.
|
||||
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
|
||||
// A broadcast channel is used to signal shutdown to each of the active
|
||||
// connections. When the provided `shutdown` future completes
|
||||
let (notify_shutdown, _) = broadcast::channel(1);
|
||||
|
||||
let mut server = Server {
|
||||
listener: TcpListener::bind(&format!("127.0.0.1:{}", port)).await?,
|
||||
listener,
|
||||
db: Db::new(),
|
||||
notify_shutdown,
|
||||
};
|
||||
|
||||
// Concurrently run the server and listen for the `shutdown` signal. The
|
||||
// server task runs until an error is encountered, so under normal
|
||||
// circumstances, this `select!` statement runs until the `shutdown` signal
|
||||
// is received.
|
||||
tokio::select! {
|
||||
res = server.run() => {
|
||||
if let Err(err) = res {
|
||||
@@ -47,7 +61,7 @@ pub async fn run(port: &str) -> crate::Result<()> {
|
||||
error!(cause = %err, "failed to accept");
|
||||
}
|
||||
}
|
||||
_ = signal::ctrl_c() => {
|
||||
_ = shutdown => {
|
||||
info!("shutting down");
|
||||
}
|
||||
}
|
||||
@@ -57,6 +71,9 @@ pub async fn run(port: &str) -> crate::Result<()> {
|
||||
|
||||
impl Server {
|
||||
/// Run the server
|
||||
///
|
||||
/// Listen for inbound connections. For each inbound connection, spawn a
|
||||
/// task to process that connection.
|
||||
async fn run(&mut self) -> crate::Result<()> {
|
||||
info!("accepting inbound connections");
|
||||
|
||||
|
||||
110
tests/server.rs
Normal file
110
tests/server.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
use mini_redis::server;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{self, Duration};
|
||||
|
||||
/// A basic "hello world" style test. A server instance is started in a
|
||||
/// background task. A client TCP connection is then established and raw redis
|
||||
/// commands are sent to the server. The response is evaluated at the byte
|
||||
/// level.
|
||||
#[tokio::test]
|
||||
async fn key_value_get_set() {
|
||||
let (addr, _handle) = start_server().await;
|
||||
|
||||
// Establish a connection to the server
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// Get a key, data is missing
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
|
||||
// Read nil response
|
||||
let mut response = [0; 5];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"$-1\r\n", &response);
|
||||
|
||||
// Set a key
|
||||
stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
|
||||
// Read OK
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"+OK\r\n", &response);
|
||||
|
||||
// Get the key, data is present
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
|
||||
// Read "world" response
|
||||
let mut response = [0; 11];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"$5\r\nworld\r\n", &response);
|
||||
}
|
||||
|
||||
/// Similar to the basic key-value test, however, this time timeouts will be
|
||||
/// tested. This test demonstrates how to test time related behavior.
|
||||
///
|
||||
/// When writing tests, it is useful to remove sources of non-determinism. Time
|
||||
/// is a source of non-determinism. Here, we "pause" time using the
|
||||
/// `time::pause()` function. This function is available with the `test-util`
|
||||
/// feature flag. This allows us to deterministically control how time appears
|
||||
/// to advance to the application.
|
||||
#[tokio::test]
|
||||
async fn key_value_timeout() {
|
||||
tokio::time::pause();
|
||||
|
||||
let (addr, _handle) = start_server().await;
|
||||
|
||||
// Establish a connection to the server
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// Set a key
|
||||
stream.write_all(b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\
|
||||
+EX\r\n:1\r\n").await.unwrap();
|
||||
|
||||
let mut response = [0; 5];
|
||||
|
||||
// Read OK
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"+OK\r\n", &response);
|
||||
|
||||
// Get the key, data is present
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
|
||||
// Read "world" response
|
||||
let mut response = [0; 11];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"$5\r\nworld\r\n", &response);
|
||||
|
||||
// Wait for the key to expire
|
||||
time::advance(Duration::from_secs(1)).await;
|
||||
|
||||
// Get a key, data is missing
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
|
||||
// Read nil response
|
||||
let mut response = [0; 5];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(b"$-1\r\n", &response);
|
||||
}
|
||||
|
||||
async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
server::run(listener, tokio::signal::ctrl_c()).await
|
||||
});
|
||||
|
||||
(addr, handle)
|
||||
}
|
||||
Reference in New Issue
Block a user