From 4ac69aeb762a3e5506199d18d99cf6c22e16c1dc Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 3 Apr 2020 12:21:39 -0700 Subject: [PATCH] write server test (#19) --- Cargo.toml | 4 ++ src/bin/server.rs | 8 +++- src/frame.rs | 2 +- src/server.rs | 25 +++++++++-- tests/server.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 tests/server.rs diff --git a/Cargo.toml b/Cargo.toml index 41c8e11..a8d8009 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,7 @@ tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] } tracing = "0.1.13" tracing-futures = { version = "0.2.3", features = ["tokio"] } tracing-subscriber = "0.2.2" + +[dev-dependencies] +# Enable test-utilities in dev mode only. This is mostly for tests. +tokio = { git = "https://github.com/tokio-rs/tokio", features = ["test-util"] } diff --git a/src/bin/server.rs b/src/bin/server.rs index ce3ea2e..fde506c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,6 +1,8 @@ use mini_redis::{server, DEFAULT_PORT}; use clap::Clap; +use tokio::net::TcpListener; +use tokio::signal; #[tokio::main] pub async fn main() -> mini_redis::Result<()> { @@ -10,7 +12,11 @@ pub async fn main() -> mini_redis::Result<()> { let cli = Cli::parse(); let port = cli.port.unwrap_or(DEFAULT_PORT.to_string()); - server::run(&port).await + + // Bind a TCP listener + let listener = TcpListener::bind(&format!("127.0.0.1:{}", port)).await?; + + server::run(listener, signal::ctrl_c()).await } #[derive(Clap, Debug)] diff --git a/src/frame.rs b/src/frame.rs index 4776c6c..10929f0 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -80,7 +80,7 @@ impl Frame { Ok(()) } - _ => unimplemented!(), + _ => Err(Error::Invalid), } } diff --git a/src/server.rs b/src/server.rs index 2f25364..4710563 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use crate::{Command, Connection, Db, Shutdown}; +use std::future::Future; use tokio::net::TcpListener; -use tokio::signal; use tokio::sync::broadcast; use tracing::{debug, error, instrument, info}; @@ -31,15 +31,29 @@ struct Handler { } /// Run the mini-redis server. -pub async fn run(port: &str) -> crate::Result<()> { +/// +/// Accepts connections from the supplied listener. For each inbound connection, +/// a task is spawned to handle that connection. The server runs until the +/// `shutdown` future completes, at which point the server shuts down +/// gracefully. +/// +/// `tokio::signal::ctrl_c()` can be used as the `shutdown` argument. This will +/// listen for a SIGINT signal. +pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { + // A broadcast channel is used to signal shutdown to each of the active + // connections. When the provided `shutdown` future completes let (notify_shutdown, _) = broadcast::channel(1); let mut server = Server { - listener: TcpListener::bind(&format!("127.0.0.1:{}", port)).await?, + listener, db: Db::new(), notify_shutdown, }; + // Concurrently run the server and listen for the `shutdown` signal. The + // server task runs until an error is encountered, so under normal + // circumstances, this `select!` statement runs until the `shutdown` signal + // is received. tokio::select! { res = server.run() => { if let Err(err) = res { @@ -47,7 +61,7 @@ pub async fn run(port: &str) -> crate::Result<()> { error!(cause = %err, "failed to accept"); } } - _ = signal::ctrl_c() => { + _ = shutdown => { info!("shutting down"); } } @@ -57,6 +71,9 @@ pub async fn run(port: &str) -> crate::Result<()> { impl Server { /// Run the server + /// + /// Listen for inbound connections. For each inbound connection, spawn a + /// task to process that connection. async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); diff --git a/tests/server.rs b/tests/server.rs new file mode 100644 index 0000000..bb94f31 --- /dev/null +++ b/tests/server.rs @@ -0,0 +1,110 @@ +use mini_redis::server; + +use std::net::SocketAddr; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::task::JoinHandle; +use tokio::time::{self, Duration}; + +/// A basic "hello world" style test. A server instance is started in a +/// background task. A client TCP connection is then established and raw redis +/// commands are sent to the server. The response is evaluated at the byte +/// level. +#[tokio::test] +async fn key_value_get_set() { + let (addr, _handle) = start_server().await; + + // Establish a connection to the server + let mut stream = TcpStream::connect(addr).await.unwrap(); + + // Get a key, data is missing + stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + + // Read nil response + let mut response = [0; 5]; + + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"$-1\r\n", &response); + + // Set a key + stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + + // Read OK + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"+OK\r\n", &response); + + // Get the key, data is present + stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + + // Read "world" response + let mut response = [0; 11]; + + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"$5\r\nworld\r\n", &response); +} + +/// Similar to the basic key-value test, however, this time timeouts will be +/// tested. This test demonstrates how to test time related behavior. +/// +/// When writing tests, it is useful to remove sources of non-determinism. Time +/// is a source of non-determinism. Here, we "pause" time using the +/// `time::pause()` function. This function is available with the `test-util` +/// feature flag. This allows us to deterministically control how time appears +/// to advance to the application. +#[tokio::test] +async fn key_value_timeout() { + tokio::time::pause(); + + let (addr, _handle) = start_server().await; + + // Establish a connection to the server + let mut stream = TcpStream::connect(addr).await.unwrap(); + + // Set a key + stream.write_all(b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\ + +EX\r\n:1\r\n").await.unwrap(); + + let mut response = [0; 5]; + + // Read OK + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"+OK\r\n", &response); + + // Get the key, data is present + stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + + // Read "world" response + let mut response = [0; 11]; + + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"$5\r\nworld\r\n", &response); + + // Wait for the key to expire + time::advance(Duration::from_secs(1)).await; + + // Get a key, data is missing + stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + + // Read nil response + let mut response = [0; 5]; + + stream.read_exact(&mut response).await.unwrap(); + + assert_eq!(b"$-1\r\n", &response); +} + +async fn start_server() -> (SocketAddr, JoinHandle>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn(async move { + server::run(listener, tokio::signal::ctrl_c()).await + }); + + (addr, handle) +}