From 1cb8ec9b0f5aa2e132e3685d2fc5e4e9b8659800 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 12 Jun 2020 19:38:27 -0700 Subject: [PATCH] rename `pool` to `buffer` (#56) --- src/buffer.rs | 123 +++++++++++++++++++++++++++++++++++ src/cmd/set.rs | 4 +- src/lib.rs | 3 +- src/pool.rs | 106 ------------------------------ tests/{pool.rs => buffer.rs} | 10 +-- 5 files changed, 132 insertions(+), 114 deletions(-) create mode 100644 src/buffer.rs delete mode 100644 src/pool.rs rename tests/{pool.rs => buffer.rs} (72%) diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 0000000..4b771ec --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,123 @@ +use crate::client::Client; +use crate::cmd::{Command, Get, Set}; +use crate::Result; + +use bytes::Bytes; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::oneshot; + +/// Create a new client request buffer +/// +/// The `Client` performs Redis commands directly on the TCP connection. Only a +/// single request may be in-flight at a given time and operations require +/// mutable access to the `Client` handle. This prevents using a single Redis +/// connection from multiple Tokio tasks. +/// +/// The strategy for dealing with this class of problem is to spawn a dedicated +/// Tokio task to manage the Redis connection and using "message passing" to +/// operate on the connection. Commands are pushed into a channel. The +/// connection task pops commands off of the channel and applies them to the +/// Redis connection. When the response is received, it is forwarded to the +/// original requester. +/// +/// The returned `Buffer` handle may be cloned before passing the new handle to +/// separate tasks. +pub fn buffer(client: Client) -> Buffer { + // Setting the message limit to a hard coded value of 32. in a real-app, the + // buffer size should be configurable, but we don't need to do that here. + let (tx, rx) = channel(32); + + // Spawn a task to process requests for the connection. + tokio::spawn(async move { run(client, rx).await }); + + // Return the `Buffer` handle. + Buffer { tx } +} + +// Message type sent over the channel to the connection task. +// +// `Command` is the command to forward to the connection. +// +// `oneshot::Sender` is a channel type that sends a **single** value. It is used +// here to send the response received from the connection back to the original +// requester. +type Message = (Command, oneshot::Sender>>); + +/// Receive commands sent through the channel and forward them to client. The +/// response is returned back to the caller via a `oneshot`. +async fn run(mut client: Client, mut rx: Receiver) { + // Repeatedly pop messages from the channel. A return value of `None` + // indicates that all `Buffer` handles have dropped and there will never be + // another message sent on the channel. + while let Some((cmd, tx)) = rx.recv().await { + // The command is forwarded to the connection + let response = match cmd { + Command::Get(get) => { + let key = get.key(); + client.get(&key).await + } + Command::Set(set) => { + let key = set.key(); + let value = set.value().clone(); + + client.set(&key, value).await.map(|_| None) + } + _ => unreachable!(), + }; + + // Send the response back to the caller. + // + // Failing to send the message indicates the `rx` half dropped + // before receiving the message. This is a normal runtime event. + let _ = tx.send(response); + } +} + +#[derive(Clone)] +pub struct Buffer { + tx: Sender, +} + +impl Buffer { + /// Get the value of a key. + /// + /// Same as `Client::get` but requests are **buffered** until the associated + /// connection has the ability to send the request. + pub async fn get(&mut self, key: &str) -> Result> { + // Initialize a new `Get` command to send via the channel. + let get = Get::new(key); + + // Initialize a new oneshot to be used to receive the response back from the connection. + let (tx, rx) = oneshot::channel(); + + // Send the request + self.tx.send((Command::Get(get), tx)).await?; + + // Await the response + match rx.await { + Ok(res) => res, + Err(err) => Err(err.into()), + } + } + + /// Set `key` to hold the given `value`. + /// + /// Same as `Client::set` but requests are **buffered** until the associated + /// connection has the ability to send the request + pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> { + // Initialize a new `Set` command to send via the channel. + let get = Set::new(key, value, None); + + // Initialize a new oneshot to be used to receive the response back from the connection. + let (tx, rx) = oneshot::channel(); + + // Send the request + self.tx.send((Command::Set(get), tx)).await?; + + // Await the response + match rx.await { + Ok(res) => res.map(|_| ()), + Err(err) => Err(err.into()), + } + } +} diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 25884dc..b528ddc 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -52,8 +52,8 @@ impl Set { &self.value } - /// Get the expires - pub(crate) fn expire(&self) -> Option { + /// Get the expire + pub fn expire(&self) -> Option { self.expire } diff --git a/src/lib.rs b/src/lib.rs index 9421079..2085f08 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,8 @@ use parse::{Parse, ParseError}; pub mod server; -pub mod pool; +mod buffer; +pub use buffer::{buffer, Buffer}; mod shutdown; use shutdown::Shutdown; diff --git a/src/pool.rs b/src/pool.rs deleted file mode 100644 index c8ea31a..0000000 --- a/src/pool.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::client::Client; -use crate::cmd::{Command, Get, Set}; -use crate::Result; -use bytes::Bytes; -use std::time::Duration; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::oneshot; -use tracing::error; - -/// create a new connection Pool from a Client -pub fn create(client: Client) -> Pool { - // Setting the message limit to a hard coded value of 32. - // in a real-app, the buffer size should be configurable, but we don't need to do that here. - let (tx, rx) = channel(32); - tokio::spawn(async move { run(client, rx).await }); - - Pool { tx } -} - -/// await for commands send through the channel and forward them to client, then send the result back to the oneshot Receiver -async fn run( - mut client: Client, - mut rx: Receiver<(Command, oneshot::Sender>>)>, -) { - while let Some((cmd, tx)) = rx.recv().await { - match cmd { - Command::Get(get) => { - let key = get.key(); - let result = client.get(&key).await; - if let Err(_) = tx.send(result) { - error!("failed to send Client result, receiver has already been dropped"); - } - } - Command::Set(set) => { - let key = set.key(); - let value = set.value().clone(); - let expires = set.expire(); - let result = match expires { - None => client.set(&key, value).await, - Some(exp) => client.set_expires(&key, value, exp).await, - }; - if let Err(_) = tx.send(result.map(|_| None)) { - error!("failed to send Client result, receiver has already been dropped"); - } - } - _ => unreachable!(), - } - } -} - -pub struct Pool { - tx: Sender<(Command, oneshot::Sender>>)>, -} - -impl Pool { - /// get a Connection like object to the mini-redis server instance - pub fn get_connection(&self) -> Connection { - Connection { - tx: self.tx.clone(), - } - } -} - -/// a Connection like object that proxies commands to the real connection -/// Commands are send trough mspc Channel, along with the requested Command a oneshot Sender is sent -/// the Result from the actual Client requested command is then sent through the oneshot Sender and Received on the Connection Receiver -pub struct Connection { - tx: Sender<(Command, oneshot::Sender>>)>, -} - -impl Connection { - pub async fn get(&mut self, key: &str) -> Result> { - let get = Get::new(key); - let (tx, rx) = oneshot::channel(); - self.tx.send((Command::Get(get), tx)).await?; - match rx.await { - Ok(res) => res, - Err(err) => Err(err.into()), - } - } - - pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> { - let get = Set::new(key, value, None); - let (tx, rx) = oneshot::channel(); - self.tx.send((Command::Set(get), tx)).await?; - match rx.await { - Ok(res) => res.map(|_| ()), - Err(err) => Err(err.into()), - } - } - - pub async fn set_expires( - &mut self, - key: &str, - value: Bytes, - expiration: Duration, - ) -> crate::Result<()> { - let get = Set::new(key, value, Some(expiration)); - let (tx, rx) = oneshot::channel(); - self.tx.send((Command::Set(get), tx)).await?; - match rx.await { - Ok(res) => res.map(|_| ()), - Err(err) => Err(err.into()), - } - } -} diff --git a/tests/pool.rs b/tests/buffer.rs similarity index 72% rename from tests/pool.rs rename to tests/buffer.rs index 0efa19a..7b0d852 100644 --- a/tests/pool.rs +++ b/tests/buffer.rs @@ -1,18 +1,18 @@ -use mini_redis::{client, pool, server}; +use mini_redis::{buffer, client, server}; use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::task::JoinHandle; /// A basic "hello world" style test. A server instance is started in a -/// background task. A client instance is then established and inserted into the pool, set and get -/// commands are then sent to the server. The response is then evaluated +/// background task. A client instance is then established and used to intialize +/// the buffer. Set and get commands are sent to the server. The response is +/// then evaluated. #[tokio::test] async fn pool_key_value_get_set() { let (addr, _) = start_server().await; let client = client::connect(addr).await.unwrap(); - let pool = pool::create(client); - let mut client = pool.get_connection(); + let mut client = buffer(client); client.set("hello", "world".into()).await.unwrap();