add initial connection pool (#53)
This commit is contained in:
@@ -22,6 +22,11 @@ impl Get {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the key
|
||||||
|
pub(crate) fn key(&self) -> &str {
|
||||||
|
&self.key
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse a `Get` instance from a received frame.
|
/// Parse a `Get` instance from a received frame.
|
||||||
///
|
///
|
||||||
/// The `Parse` argument provides a cursor-like API to read fields from the
|
/// The `Parse` argument provides a cursor-like API to read fields from the
|
||||||
|
|||||||
@@ -42,6 +42,21 @@ impl Set {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the key
|
||||||
|
pub(crate) fn key(&self) -> &str {
|
||||||
|
&self.key
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the value
|
||||||
|
pub(crate) fn value(&self) -> Bytes {
|
||||||
|
self.value.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the expires
|
||||||
|
pub(crate) fn expire(&self) -> Option<Duration> {
|
||||||
|
self.expire
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse a `Set` instance from a received frame.
|
/// Parse a `Set` instance from a received frame.
|
||||||
///
|
///
|
||||||
/// The `Parse` argument provides a cursor-like API to read fields from the
|
/// The `Parse` argument provides a cursor-like API to read fields from the
|
||||||
|
|||||||
@@ -44,6 +44,8 @@ use parse::{Parse, ParseError};
|
|||||||
|
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
|
||||||
|
pub mod pool;
|
||||||
|
|
||||||
mod shutdown;
|
mod shutdown;
|
||||||
use shutdown::Shutdown;
|
use shutdown::Shutdown;
|
||||||
|
|
||||||
|
|||||||
106
src/pool.rs
Normal file
106
src/pool.rs
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
use crate::client::Client;
|
||||||
|
use crate::cmd::{Command, Get, Set};
|
||||||
|
use crate::Result;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
/// create a new connection Pool from a Client
|
||||||
|
pub fn create(client: Client) -> Pool {
|
||||||
|
// Setting the message limit to a hard coded value of 32.
|
||||||
|
// in a real-app, the buffer size should be configurable, but we don't need to do that here.
|
||||||
|
let (tx, rx) = channel(32);
|
||||||
|
tokio::spawn(async move { run(client, rx).await });
|
||||||
|
|
||||||
|
Pool { tx }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// await for commands send through the channel and forward them to client, then send the result back to the oneshot Receiver
|
||||||
|
async fn run(
|
||||||
|
mut client: Client,
|
||||||
|
mut rx: Receiver<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||||
|
) {
|
||||||
|
while let Some((cmd, tx)) = rx.recv().await {
|
||||||
|
match cmd {
|
||||||
|
Command::Get(get) => {
|
||||||
|
let key = get.key();
|
||||||
|
let result = client.get(&key).await;
|
||||||
|
if let Err(_) = tx.send(result) {
|
||||||
|
error!("failed to send Client result, receiver has already been dropped");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::Set(set) => {
|
||||||
|
let key = set.key();
|
||||||
|
let value = set.value();
|
||||||
|
let expires = set.expire();
|
||||||
|
let result = match expires {
|
||||||
|
None => client.set(&key, value).await,
|
||||||
|
Some(exp) => client.set_expires(&key, value, exp).await,
|
||||||
|
};
|
||||||
|
if let Err(_) = tx.send(result.map(|_| None)) {
|
||||||
|
error!("failed to send Client result, receiver has already been dropped");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Pool {
|
||||||
|
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Pool {
|
||||||
|
/// get a Connection like object to the mini-redis server instance
|
||||||
|
pub fn get_connection(&self) -> Connection {
|
||||||
|
Connection {
|
||||||
|
tx: self.tx.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// a Connection like object that proxies commands to the real connection
|
||||||
|
/// Commands are send trough mspc Channel, along with the requested Command a oneshot Sender is sent
|
||||||
|
/// the Result from the actual Client requested command is then sent through the oneshot Sender and Received on the Connection Receiver
|
||||||
|
pub struct Connection {
|
||||||
|
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Connection {
|
||||||
|
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
|
||||||
|
let get = Get::new(key);
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.tx.send((Command::Get(get), tx)).await?;
|
||||||
|
match rx.await {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
|
||||||
|
let get = Set::new(key, value, None);
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.tx.send((Command::Set(get), tx)).await?;
|
||||||
|
match rx.await {
|
||||||
|
Ok(res) => res.map(|_| ()),
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn set_expires(
|
||||||
|
&mut self,
|
||||||
|
key: &str,
|
||||||
|
value: Bytes,
|
||||||
|
expiration: Duration,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
let get = Set::new(key, value, Some(expiration));
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.tx.send((Command::Set(get), tx)).await?;
|
||||||
|
match rx.await {
|
||||||
|
Ok(res) => res.map(|_| ()),
|
||||||
|
Err(err) => Err(err.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
30
tests/pool.rs
Normal file
30
tests/pool.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use mini_redis::{client, pool, server};
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
/// A basic "hello world" style test. A server instance is started in a
|
||||||
|
/// background task. A client instance is then established and inserted into the pool, set and get
|
||||||
|
/// commands are then sent to the server. The response is then evaluated
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pool_key_value_get_set() {
|
||||||
|
let (addr, _) = start_server().await;
|
||||||
|
|
||||||
|
let client = client::connect(addr).await.unwrap();
|
||||||
|
let pool = pool::create(client);
|
||||||
|
let mut client = pool.get_connection();
|
||||||
|
|
||||||
|
client.set("hello", "world".into()).await.unwrap();
|
||||||
|
|
||||||
|
let value = client.get("hello").await.unwrap().unwrap();
|
||||||
|
assert_eq!(b"world", &value[..])
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await });
|
||||||
|
|
||||||
|
(addr, handle)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user