diff --git a/src/cmd/get.rs b/src/cmd/get.rs index 3a0cc6e..4ee50a0 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -22,6 +22,11 @@ impl Get { } } + /// Get the key + pub(crate) fn key(&self) -> &str { + &self.key + } + /// Parse a `Get` instance from a received frame. /// /// The `Parse` argument provides a cursor-like API to read fields from the diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 72f6376..1d10b33 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -42,6 +42,21 @@ impl Set { } } + /// Get the key + pub(crate) fn key(&self) -> &str { + &self.key + } + + /// Get the value + pub(crate) fn value(&self) -> Bytes { + self.value.clone() + } + + /// Get the expires + pub(crate) fn expire(&self) -> Option { + self.expire + } + /// Parse a `Set` instance from a received frame. /// /// The `Parse` argument provides a cursor-like API to read fields from the diff --git a/src/lib.rs b/src/lib.rs index 64342e7..1e2ae71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,6 +44,8 @@ use parse::{Parse, ParseError}; pub mod server; +pub mod pool; + mod shutdown; use shutdown::Shutdown; diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..673368c --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,106 @@ +use crate::client::Client; +use crate::cmd::{Command, Get, Set}; +use crate::Result; +use bytes::Bytes; +use std::time::Duration; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::oneshot; +use tracing::error; + +/// create a new connection Pool from a Client +pub fn create(client: Client) -> Pool { + // Setting the message limit to a hard coded value of 32. + // in a real-app, the buffer size should be configurable, but we don't need to do that here. + let (tx, rx) = channel(32); + tokio::spawn(async move { run(client, rx).await }); + + Pool { tx } +} + +/// await for commands send through the channel and forward them to client, then send the result back to the oneshot Receiver +async fn run( + mut client: Client, + mut rx: Receiver<(Command, oneshot::Sender>>)>, +) { + while let Some((cmd, tx)) = rx.recv().await { + match cmd { + Command::Get(get) => { + let key = get.key(); + let result = client.get(&key).await; + if let Err(_) = tx.send(result) { + error!("failed to send Client result, receiver has already been dropped"); + } + } + Command::Set(set) => { + let key = set.key(); + let value = set.value(); + let expires = set.expire(); + let result = match expires { + None => client.set(&key, value).await, + Some(exp) => client.set_expires(&key, value, exp).await, + }; + if let Err(_) = tx.send(result.map(|_| None)) { + error!("failed to send Client result, receiver has already been dropped"); + } + } + _ => unreachable!(), + } + } +} + +pub struct Pool { + tx: Sender<(Command, oneshot::Sender>>)>, +} + +impl Pool { + /// get a Connection like object to the mini-redis server instance + pub fn get_connection(&self) -> Connection { + Connection { + tx: self.tx.clone(), + } + } +} + +/// a Connection like object that proxies commands to the real connection +/// Commands are send trough mspc Channel, along with the requested Command a oneshot Sender is sent +/// the Result from the actual Client requested command is then sent through the oneshot Sender and Received on the Connection Receiver +pub struct Connection { + tx: Sender<(Command, oneshot::Sender>>)>, +} + +impl Connection { + pub async fn get(&mut self, key: &str) -> Result> { + let get = Get::new(key); + let (tx, rx) = oneshot::channel(); + self.tx.send((Command::Get(get), tx)).await?; + match rx.await { + Ok(res) => res, + Err(err) => Err(err.into()), + } + } + + pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> { + let get = Set::new(key, value, None); + let (tx, rx) = oneshot::channel(); + self.tx.send((Command::Set(get), tx)).await?; + match rx.await { + Ok(res) => res.map(|_| ()), + Err(err) => Err(err.into()), + } + } + + pub async fn set_expires( + &mut self, + key: &str, + value: Bytes, + expiration: Duration, + ) -> crate::Result<()> { + let get = Set::new(key, value, Some(expiration)); + let (tx, rx) = oneshot::channel(); + self.tx.send((Command::Set(get), tx)).await?; + match rx.await { + Ok(res) => res.map(|_| ()), + Err(err) => Err(err.into()), + } + } +} diff --git a/tests/pool.rs b/tests/pool.rs new file mode 100644 index 0000000..0efa19a --- /dev/null +++ b/tests/pool.rs @@ -0,0 +1,30 @@ +use mini_redis::{client, pool, server}; +use std::net::SocketAddr; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; + +/// A basic "hello world" style test. A server instance is started in a +/// background task. A client instance is then established and inserted into the pool, set and get +/// commands are then sent to the server. The response is then evaluated +#[tokio::test] +async fn pool_key_value_get_set() { + let (addr, _) = start_server().await; + + let client = client::connect(addr).await.unwrap(); + let pool = pool::create(client); + let mut client = pool.get_connection(); + + client.set("hello", "world".into()).await.unwrap(); + + let value = client.get("hello").await.unwrap().unwrap(); + assert_eq!(b"world", &value[..]) +} + +async fn start_server() -> (SocketAddr, JoinHandle>) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await }); + + (addr, handle) +}