diff --git a/src/lib.rs b/src/lib.rs index a8a28da..83de51c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,29 @@ -pub const DEFAULT_PORT: &str = "6379"; +//! A minimal (i.e. very incomplete) implementation of a Redis server and +//! client. +//! +//! The purpose of this project is to provide a larger example of an +//! asynchronous Rust project built with Tokio. Do not attempt to run this in +//! production... seriously. +//! +//! # Layout +//! +//! The library is structured such that it can be used with guides. There are +//! modules that are public that probably would not be public in a "real" redis +//! client library. +//! +//! The major components are: +//! +//! * `server`: Redis server implementation. Includes a single `run` function +//! that takes a `TcpListener` and starts accepting redis client connections. +//! +//! * `client`: an asynchronous Redis client implementation. Demonstrates how to +//! build clients with Tokio. +//! +//! * `cmd`: implementations of the supported Redis commands. +//! +//! * `frame`: represents a single Redis protocol frame. A frame is used as an +//! intermediate representation between a "command" and the byte +//! representation. pub mod client; @@ -22,8 +47,25 @@ pub mod server; mod shutdown; use shutdown::Shutdown; +/// Default port that a redis server listens on. +/// +/// Used if no port is specified. +pub const DEFAULT_PORT: &str = "6379"; + /// Error returned by most functions. +/// +/// When writing a real application, one might want to consider a specialized +/// errror handling crate or defining an error type as an `enum` of causes. +/// However, for our example, using a boxed `std::error::Error` is sufficient. +/// +/// For performance reasons, boxing is avoided in any hot path. For example, in +/// `parse`, a custom error `enum` is defined. This is because the error is hit +/// and handled during normal execution when a partial frame is received on a +/// socket. `std::error::Error` is implemented for `parse::Error` which allows +/// it to be converted to `Box`. pub type Error = Box; /// A specialized `Result` type for mini-redis operations. +/// +/// This is defined as a convenience. pub type Result = std::result::Result; diff --git a/src/server.rs b/src/server.rs index 4710563..e06a95a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,35 +1,124 @@ +//! Minimal Redis server implementation +//! +//! Provides an async `run` function that listens for inbound connections, +//! spawning a task per connection. + use crate::{Command, Connection, Db, Shutdown}; use std::future::Future; -use tokio::net::TcpListener; -use tokio::sync::broadcast; +use std::sync::Arc; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{broadcast, mpsc, Semaphore}; +use tokio::time::{self, Duration}; use tracing::{debug, error, instrument, info}; +/// Server listener state. Created in the `run` call. It includes a `run` method +/// which performs the TCP listening and initialization of per-connection state. #[derive(Debug)] -struct Server { - /// Database state +struct Listener { + /// Shared database handle. + /// + /// Contains the key / value store as well as the broadcast channels for + /// pub/sub. + /// + /// This is a wrapper around an `Arc`. This enables `db` to be cloned and + /// passed into the per connection state (`Handler`). db: Db, - /// TCP listener + /// TCP listener supplied by the `run` caller. listener: TcpListener, - /// Listen for shutdown + /// Limit the max number of connections. + /// + /// A `Semaphore` is used to limit the max number of connections. Before + /// attempting to accept a new connection, a permit is acquired from the + /// semaphore. If none are available, the listener waits for one. + /// + /// When handlers complete processing a connection, the permit is returned + /// to the semaphore. + limit_connections: Arc, + + /// Broadcasts a shutdown signal to all active connections. + /// + /// The initial `shutdown` trigger is provided by the `run` caller. The + /// server is responsible for gracefully shutting down active connections. + /// When a connection task is spawned, it is passed a broadcast receiver + /// handle. When a graceful shutdown is initiated, a `()` value is sent via + /// the broadcast::Sender. Each active connection receives it, reaches a + /// safe terminal state, and completes the task. notify_shutdown: broadcast::Sender<()>, + + /// Used as part of the graceful shutdown process to wait for client + /// connections to complete processing. + /// + /// Tokio channels are closed once all `Sender` handles go out of scope. + /// When a channel is closed, the receiver receives `None`. This is + /// leveraged to detect all connection handlers completing. When a + /// connection handler is initialized, it is assigned a clone of + /// `shutdown_complete_tx`. When the listener shuts down, it drops the + /// sender held by this `shutdown_complete_tx` field. Once all handler tasks + /// complete, all clones of the `Sender` are also dropped. This results in + /// `shutdown_complete_rx.recv()` completing with `None`. At this point, it + /// is safe to exit the server process. + shutdown_complete_rx: mpsc::Receiver<()>, + shutdown_complete_tx: mpsc::Sender<()>, } -/// Handles a connections +/// Per-connection handler. Reads requests from `connection` and applies the +/// commands to `db`. #[derive(Debug)] struct Handler { - /// Database state + /// Shared database handle. + /// + /// When a command is received from `connection`, it is applied with `db`. + /// The implementation of the command is in the `cmd` module. Each command + /// will need to interact with `db` in order to complete the work. db: Db, /// The TCP connection decorated with the redis protocol encoder / decoder + /// implemented using a buffered `TcpStream`. + /// + /// When `Listener` receives an inbound connection, the `TcpStream` is + /// passed to `Connection::new`, which initializes the associated buffers. + /// `Connection` allows the handler to operate at the "frame" level and keep + /// the byte level protocol parsing details encapsulated in `Connection`. connection: Connection, - /// Listen for shutdown notifications + /// Max connection semaphore. + /// + /// When the handler is dropped, a permit is returned to this semaphore. If + /// the listener is waiting for connections to close, it will be notified of + /// the newly available permit and resume accepting connections. + limit_connections: Arc, + + /// Listen for shutdown notifications. + /// + /// A wrapper around the `broadcast::Receiver` paired with the sender in + /// `Listener`. The connection handler processes requests from the + /// connection until the peer disconnects **or** a shutdown notification is + /// received from `shutdown`. In the latter case, any in-flight work being + /// processed for the peer is continued until it reaches a safe state, at + /// which point the connction is terminated. shutdown: Shutdown, + + /// Not used directly. Instead, when `Handler` is dropped...? + _shutdown_complete: mpsc::Sender<()>, } +/// Maximum number of concurrent connections the redis server will accept. +/// +/// When this limit is reached, the server will stop accepting connections until +/// an active connection terminates. +/// +/// A real application will want to make this value configurable, but for this +/// example, it is hard coded. +/// +/// This is also set to a pretty low value to discourage using this in +/// production (you'd think that all the disclaimers would make it obvious that +/// this is not a serious project... but I thought that about mini-http as +/// well). +const MAX_CONNECTIONS: usize = 250; + /// Run the mini-redis server. /// /// Accepts connections from the supplied listener. For each inbound connection, @@ -43,78 +132,233 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< // A broadcast channel is used to signal shutdown to each of the active // connections. When the provided `shutdown` future completes let (notify_shutdown, _) = broadcast::channel(1); + let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(0); - let mut server = Server { + // Initialize the listener state + let mut server = Listener { listener, db: Db::new(), + limit_connections: Arc::new(Semaphore::new(MAX_CONNECTIONS)), notify_shutdown, + shutdown_complete_tx, + shutdown_complete_rx, }; // Concurrently run the server and listen for the `shutdown` signal. The // server task runs until an error is encountered, so under normal // circumstances, this `select!` statement runs until the `shutdown` signal // is received. + // + // `select!` statements are written in the form of: + // + // ``` + // = => + // ``` + // + // All `` statements are executed concurrently. Once the **first** + // op completes, its associated `` is + // performed. + // + // The `select! macro is a foundational building block for writing + // asynchronous Rust. See the API docs for more details: + // + // https://docs.rs/tokio/*/tokio/macro.select.html tokio::select! { res = server.run() => { + // If an error is received here, accepting connections from the TCP + // listener failed multiple times and the server is giving up and + // shutting down. + // + // Errors encountered when handling individual connections do not + // bubble up to this point. if let Err(err) = res { - // TODO: gracefully handle this error error!(cause = %err, "failed to accept"); } } _ = shutdown => { + // The shutdown signal has been received. info!("shutting down"); } } + // Extract the `shutdown_complete` receiver. By not mentioning + // `shutdown_receiver` here, it is dropped. This is important, as the + // `.await` below would otherwise never complete. + let Listener { mut shutdown_complete_rx, .. } = server; + + // Wait for all active connections to finish processing. As the `Sender` + // handle held by the listener has been dropped above, the only remaining + // `Sender` instances are held by connection handler tasks. When those drop, + // the `mpsc` channel will close and `recv()` will return `None`. + let _ = shutdown_complete_rx.recv().await; + Ok(()) } -impl Server { +impl Listener { /// Run the server /// /// Listen for inbound connections. For each inbound connection, spawn a /// task to process that connection. + /// + /// # Errors + /// + /// Returns `Err` if accepting returns an error. This can happen for a + /// number reasons that resolve over time. For example, if the underlying + /// operating system has reached an internal limit for max number of + /// sockets, accept will fail. + /// + /// The process is not able to detect when a transient error resolves + /// itself. One strategy for handling this is to implement a back off + /// strategy, which is what we do here. async fn run(&mut self) -> crate::Result<()> { info!("accepting inbound connections"); loop { - let (socket, _) = self.listener.accept().await?; + // Wait for a permit to become available + // + // `acquire` returns a permit that is bound via a lifetime to the + // semaphore. When the permit value is dropped, it is automatically + // returned to the sempahore. This is convenient in many cases. + // However, in this case, the permit must be returned in a different + // task than it is acquired in (the handler task). To do this, we + // "forget" the permit, which drops the permit value **without** + // incrementing the semaphore's permits. Then, in the handler task + // we manually add a new permit when processing completes. + self.limit_connections.acquire().await.forget(); + // Accept a new socket. This will attempt to perform error handling. + // The `accept` method internally attempts to recover errors, so an + // error here is non-recoverable. + let socket = self.accept().await?; + + // Create the necessary per-connection handler state. let mut handler = Handler { + // Get a handle to the shared database. Internally, this is an + // `Arc`, so a clone only increments the ref count. db: self.db.clone(), + + // Initialize the connection state. This allocates read/write + // buffers to perform redis protocol frame parsing. connection: Connection::new(socket), + + // The connection state needs a handle to the max connections + // semaphore. When the handler is done processing the + // connection, a permit is added back to the semaphore. + limit_connections: self.limit_connections.clone(), + + // Receive shutdown notifcations. shutdown: Shutdown::new(self.notify_shutdown.subscribe()), + + // Notifies the receiver half once all clones are + // dropped. + _shutdown_complete: self.shutdown_complete_tx.clone(), }; + // Spawn a new task to process the connections. Tokio tasks are like + // asynchronous green threads and are executed concurrently. tokio::spawn(async move { + // Process the connection. If an error is encountered, log it. if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); } }); } } + + /// Accept an inbound connection. + /// + /// Errors are handled by backing off and retrying. An exponential backoff + /// strategy is used. After the first failure, the task waits for 1 second. + /// After the second failure, the task waits for 2 seconds. Each subsequent + /// failure doubles the wait time. If accepting fails on the 6th try after + /// waiting for 64 seconds, then this function returns with an error. + async fn accept(&mut self) -> crate::Result { + let mut backoff = 1; + + // Try to accept a few times + loop { + // Perform the accept operation. If a socket is successfully + // accepted, return it. Otherwise, save the error. + match self.listener.accept().await { + Ok((socket, _)) => return Ok(socket), + Err(err) => { + if backoff > 64 { + // Accept has failed too many times. Return the error. + return Err(err.into()); + } + } + } + + // Pause execution until the back off period elapses. + time::delay_for(Duration::from_secs(backoff)).await; + + // Double the back off + backoff *= 2; + } + } } impl Handler { + /// Process a single connection. + /// + /// Request frames are read from the socket and processed. Responses are + /// written back to the socket. + /// + /// Currently, pipelining is not implemented. Pipelining is the ability to + /// process more than one request concurrently per connection without + /// interleaving frames. See for more details: + /// https://redis.io/topics/pipelining + /// + /// When the shutdown signal is received, the connection is processed until + /// it reaches a safe state, at which point it is terminated. #[instrument(skip(self))] async fn run(&mut self) -> crate::Result<()> { + // As long as the shutdown signal has not been received, try to read a + // new request frame. while !self.shutdown.is_shutdown() { + // While reading a request frame, also listen for the shutdown + // signal. let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, _ = self.shutdown.recv() => { - break; + // If a shutdown signal is received, return from `run`. + // This will result in the task terminating. + return Ok(()); } }; + // If `None` is returned from `read_frame()` then the peer closed + // the socket. There is no further work to do and the task can be + // terminated. let frame = match maybe_frame { Some(frame) => frame, None => return Ok(()), }; + // Convert the redis frame into a command struct. This returns an + // error if the frame is not a valid redis command or it is an + // unsupported command. let cmd = Command::from_frame(frame)?; + // Logs the `cmd` object. The syntax here is a shorthand provided by + // the `tracing` crate. It can be thought of as similar to: + // + // ``` + // debug!(cmd = format!("{:?}", cmd)); + // ``` + // + // `tracing` provides structured logging, so information is "logged" + // as key-value pairs. debug!(?cmd); + // Perform the work needed to apply the command. This may mutate the + // database state as a result. + // + // The connection is passed into the apply function which allows the + // command to write response frames directly to the connection. In + // the case of pub/sub, multiple frames may be send back to the + // peer. cmd.apply(&self.db, &mut self.connection, &mut self.shutdown) .await?; } @@ -122,3 +366,19 @@ impl Handler { Ok(()) } } + +impl Drop for Handler { + fn drop(&mut self) { + // Add a permit back to the semaphore. + // + // Doing so unblocks the listener if the max number of + // connections has been reached. + // + // This is done in a `Drop` implementation in order to guaranatee that + // the permit is added even if the task handling the connection panics. + // If `add_permit` was called at the end of the `run` function and some + // bug causes a panic. The permit would never be returned to the + // semaphore. + self.limit_connections.add_permits(1); + } +}