rename pool to buffer (#56)
This commit is contained in:
123
src/buffer.rs
Normal file
123
src/buffer.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use crate::client::Client;
|
||||
use crate::cmd::{Command, Get, Set};
|
||||
use crate::Result;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// Create a new client request buffer
|
||||
///
|
||||
/// The `Client` performs Redis commands directly on the TCP connection. Only a
|
||||
/// single request may be in-flight at a given time and operations require
|
||||
/// mutable access to the `Client` handle. This prevents using a single Redis
|
||||
/// connection from multiple Tokio tasks.
|
||||
///
|
||||
/// The strategy for dealing with this class of problem is to spawn a dedicated
|
||||
/// Tokio task to manage the Redis connection and using "message passing" to
|
||||
/// operate on the connection. Commands are pushed into a channel. The
|
||||
/// connection task pops commands off of the channel and applies them to the
|
||||
/// Redis connection. When the response is received, it is forwarded to the
|
||||
/// original requester.
|
||||
///
|
||||
/// The returned `Buffer` handle may be cloned before passing the new handle to
|
||||
/// separate tasks.
|
||||
pub fn buffer(client: Client) -> Buffer {
|
||||
// Setting the message limit to a hard coded value of 32. in a real-app, the
|
||||
// buffer size should be configurable, but we don't need to do that here.
|
||||
let (tx, rx) = channel(32);
|
||||
|
||||
// Spawn a task to process requests for the connection.
|
||||
tokio::spawn(async move { run(client, rx).await });
|
||||
|
||||
// Return the `Buffer` handle.
|
||||
Buffer { tx }
|
||||
}
|
||||
|
||||
// Message type sent over the channel to the connection task.
|
||||
//
|
||||
// `Command` is the command to forward to the connection.
|
||||
//
|
||||
// `oneshot::Sender` is a channel type that sends a **single** value. It is used
|
||||
// here to send the response received from the connection back to the original
|
||||
// requester.
|
||||
type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
|
||||
|
||||
/// Receive commands sent through the channel and forward them to client. The
|
||||
/// response is returned back to the caller via a `oneshot`.
|
||||
async fn run(mut client: Client, mut rx: Receiver<Message>) {
|
||||
// Repeatedly pop messages from the channel. A return value of `None`
|
||||
// indicates that all `Buffer` handles have dropped and there will never be
|
||||
// another message sent on the channel.
|
||||
while let Some((cmd, tx)) = rx.recv().await {
|
||||
// The command is forwarded to the connection
|
||||
let response = match cmd {
|
||||
Command::Get(get) => {
|
||||
let key = get.key();
|
||||
client.get(&key).await
|
||||
}
|
||||
Command::Set(set) => {
|
||||
let key = set.key();
|
||||
let value = set.value().clone();
|
||||
|
||||
client.set(&key, value).await.map(|_| None)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
// Send the response back to the caller.
|
||||
//
|
||||
// Failing to send the message indicates the `rx` half dropped
|
||||
// before receiving the message. This is a normal runtime event.
|
||||
let _ = tx.send(response);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Buffer {
|
||||
tx: Sender<Message>,
|
||||
}
|
||||
|
||||
impl Buffer {
|
||||
/// Get the value of a key.
|
||||
///
|
||||
/// Same as `Client::get` but requests are **buffered** until the associated
|
||||
/// connection has the ability to send the request.
|
||||
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
|
||||
// Initialize a new `Get` command to send via the channel.
|
||||
let get = Get::new(key);
|
||||
|
||||
// Initialize a new oneshot to be used to receive the response back from the connection.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Send the request
|
||||
self.tx.send((Command::Get(get), tx)).await?;
|
||||
|
||||
// Await the response
|
||||
match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set `key` to hold the given `value`.
|
||||
///
|
||||
/// Same as `Client::set` but requests are **buffered** until the associated
|
||||
/// connection has the ability to send the request
|
||||
pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
|
||||
// Initialize a new `Set` command to send via the channel.
|
||||
let get = Set::new(key, value, None);
|
||||
|
||||
// Initialize a new oneshot to be used to receive the response back from the connection.
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
// Send the request
|
||||
self.tx.send((Command::Set(get), tx)).await?;
|
||||
|
||||
// Await the response
|
||||
match rx.await {
|
||||
Ok(res) => res.map(|_| ()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -52,8 +52,8 @@ impl Set {
|
||||
&self.value
|
||||
}
|
||||
|
||||
/// Get the expires
|
||||
pub(crate) fn expire(&self) -> Option<Duration> {
|
||||
/// Get the expire
|
||||
pub fn expire(&self) -> Option<Duration> {
|
||||
self.expire
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,8 @@ use parse::{Parse, ParseError};
|
||||
|
||||
pub mod server;
|
||||
|
||||
pub mod pool;
|
||||
mod buffer;
|
||||
pub use buffer::{buffer, Buffer};
|
||||
|
||||
mod shutdown;
|
||||
use shutdown::Shutdown;
|
||||
|
||||
106
src/pool.rs
106
src/pool.rs
@@ -1,106 +0,0 @@
|
||||
use crate::client::Client;
|
||||
use crate::cmd::{Command, Get, Set};
|
||||
use crate::Result;
|
||||
use bytes::Bytes;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
|
||||
/// create a new connection Pool from a Client
|
||||
pub fn create(client: Client) -> Pool {
|
||||
// Setting the message limit to a hard coded value of 32.
|
||||
// in a real-app, the buffer size should be configurable, but we don't need to do that here.
|
||||
let (tx, rx) = channel(32);
|
||||
tokio::spawn(async move { run(client, rx).await });
|
||||
|
||||
Pool { tx }
|
||||
}
|
||||
|
||||
/// await for commands send through the channel and forward them to client, then send the result back to the oneshot Receiver
|
||||
async fn run(
|
||||
mut client: Client,
|
||||
mut rx: Receiver<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||
) {
|
||||
while let Some((cmd, tx)) = rx.recv().await {
|
||||
match cmd {
|
||||
Command::Get(get) => {
|
||||
let key = get.key();
|
||||
let result = client.get(&key).await;
|
||||
if let Err(_) = tx.send(result) {
|
||||
error!("failed to send Client result, receiver has already been dropped");
|
||||
}
|
||||
}
|
||||
Command::Set(set) => {
|
||||
let key = set.key();
|
||||
let value = set.value().clone();
|
||||
let expires = set.expire();
|
||||
let result = match expires {
|
||||
None => client.set(&key, value).await,
|
||||
Some(exp) => client.set_expires(&key, value, exp).await,
|
||||
};
|
||||
if let Err(_) = tx.send(result.map(|_| None)) {
|
||||
error!("failed to send Client result, receiver has already been dropped");
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Pool {
|
||||
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
/// get a Connection like object to the mini-redis server instance
|
||||
pub fn get_connection(&self) -> Connection {
|
||||
Connection {
|
||||
tx: self.tx.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// a Connection like object that proxies commands to the real connection
|
||||
/// Commands are send trough mspc Channel, along with the requested Command a oneshot Sender is sent
|
||||
/// the Result from the actual Client requested command is then sent through the oneshot Sender and Received on the Connection Receiver
|
||||
pub struct Connection {
|
||||
tx: Sender<(Command, oneshot::Sender<Result<Option<Bytes>>>)>,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>> {
|
||||
let get = Get::new(key);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send((Command::Get(get), tx)).await?;
|
||||
match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set(&mut self, key: &str, value: Bytes) -> Result<()> {
|
||||
let get = Set::new(key, value, None);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send((Command::Set(get), tx)).await?;
|
||||
match rx.await {
|
||||
Ok(res) => res.map(|_| ()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_expires(
|
||||
&mut self,
|
||||
key: &str,
|
||||
value: Bytes,
|
||||
expiration: Duration,
|
||||
) -> crate::Result<()> {
|
||||
let get = Set::new(key, value, Some(expiration));
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx.send((Command::Set(get), tx)).await?;
|
||||
match rx.await {
|
||||
Ok(res) => res.map(|_| ()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,18 @@
|
||||
use mini_redis::{client, pool, server};
|
||||
use mini_redis::{buffer, client, server};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// A basic "hello world" style test. A server instance is started in a
|
||||
/// background task. A client instance is then established and inserted into the pool, set and get
|
||||
/// commands are then sent to the server. The response is then evaluated
|
||||
/// background task. A client instance is then established and used to intialize
|
||||
/// the buffer. Set and get commands are sent to the server. The response is
|
||||
/// then evaluated.
|
||||
#[tokio::test]
|
||||
async fn pool_key_value_get_set() {
|
||||
let (addr, _) = start_server().await;
|
||||
|
||||
let client = client::connect(addr).await.unwrap();
|
||||
let pool = pool::create(client);
|
||||
let mut client = pool.get_connection();
|
||||
let mut client = buffer(client);
|
||||
|
||||
client.set("hello", "world".into()).await.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user