collect all clients to one single folder (#119)

This commit is contained in:
Ting Sun
2023-04-14 18:47:10 +08:00
committed by GitHub
parent b1e365b62f
commit 79422c0714
11 changed files with 154 additions and 146 deletions

View File

@@ -13,12 +13,12 @@
#![warn(rust_2018_idioms)]
use mini_redis::{client, Result};
use mini_redis::{clients::Client, Result};
#[tokio::main]
pub async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;
let mut client = Client::connect("127.0.0.1:6379").await?;
// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;

View File

@@ -17,12 +17,12 @@
#![warn(rust_2018_idioms)]
use mini_redis::{client, Result};
use mini_redis::{clients::Client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;
let mut client = Client::connect("127.0.0.1:6379").await?;
// publish message `bar` on channel foo
client.publish("foo", "bar".into()).await?;

View File

@@ -17,12 +17,12 @@
#![warn(rust_2018_idioms)]
use mini_redis::{client, Result};
use mini_redis::{clients::Client, Result};
#[tokio::main]
pub async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let client = client::connect("127.0.0.1:6379").await?;
let client = Client::connect("127.0.0.1:6379").await?;
// subscribe to channel foo
let mut subscriber = client.subscribe(vec!["foo".into()]).await?;

View File

@@ -1,4 +1,4 @@
use mini_redis::{client, DEFAULT_PORT};
use mini_redis::{clients::Client, DEFAULT_PORT};
use bytes::Bytes;
use clap::{Parser, Subcommand};
@@ -86,7 +86,7 @@ async fn main() -> mini_redis::Result<()> {
let addr = format!("{}:{}", cli.host, cli.port);
// Establish a connection
let mut client = client::connect(&addr).await?;
let mut client = Client::connect(&addr).await?;
// Process the requested command
match cli.command {

View File

@@ -7,7 +7,7 @@ use std::time::Duration;
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::client::Message;
pub use crate::clients::Message;
/// Established connection with a Redis server.
///
@@ -18,7 +18,7 @@ pub use crate::client::Message;
/// Requests are issued using the various methods of `Client`.
pub struct BlockingClient {
/// The asynchronous `Client`.
inner: crate::client::Client,
inner: crate::clients::Client,
/// A `current_thread` runtime for executing operations on the asynchronous
/// client in a blocking manner.
@@ -33,7 +33,7 @@ pub struct BlockingClient {
/// called.
pub struct BlockingSubscriber {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
inner: crate::clients::Subscriber,
/// A `current_thread` runtime for executing operations on the asynchronous
/// `Subscriber` in a blocking manner.
@@ -43,43 +43,43 @@ pub struct BlockingSubscriber {
/// The iterator returned by `Subscriber::into_iter`.
struct SubscriberIterator {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
inner: crate::clients::Subscriber,
/// A `current_thread` runtime for executing operations on the asynchronous
/// `Subscriber` in a blocking manner.
rt: Runtime,
}
/// Establish a connection with the Redis server located at `addr`.
///
/// `addr` may be any type that can be asynchronously converted to a
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
/// trait is the Tokio version and not the `std` version.
///
/// # Examples
///
/// ```no_run
/// use mini_redis::blocking_client;
///
/// fn main() {
/// let client = match blocking_client::connect("localhost:6379") {
/// Ok(client) => client,
/// Err(_) => panic!("failed to establish connection"),
/// };
/// # drop(client);
/// }
/// ```
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let inner = rt.block_on(crate::client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
impl BlockingClient {
/// Establish a connection with the Redis server located at `addr`.
///
/// `addr` may be any type that can be asynchronously converted to a
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
/// trait is the Tokio version and not the `std` version.
///
/// # Examples
///
/// ```no_run
/// use mini_redis::clients::BlockingClient;
///
/// fn main() {
/// let client = match BlockingClient::connect("localhost:6379") {
/// Ok(client) => client,
/// Err(_) => panic!("failed to establish connection"),
/// };
/// # drop(client);
/// }
/// ```
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let inner = rt.block_on(crate::clients::Client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
/// Get the value of key.
///
/// If the key does not exist the special value `None` is returned.
@@ -89,10 +89,10 @@ impl BlockingClient {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::blocking_client;
/// use mini_redis::clients::BlockingClient;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
///
/// let val = client.get("foo").unwrap();
/// println!("Got = {:?}", val);
@@ -115,10 +115,10 @@ impl BlockingClient {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::blocking_client;
/// use mini_redis::clients::BlockingClient;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
///
/// client.set("foo", "bar".into()).unwrap();
///
@@ -149,13 +149,13 @@ impl BlockingClient {
/// favorable.
///
/// ```no_run
/// use mini_redis::blocking_client;
/// use mini_redis::clients::BlockingClient;
/// use std::thread;
/// use std::time::Duration;
///
/// fn main() {
/// let ttl = Duration::from_millis(500);
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
///
/// client.set_expires("foo", "bar".into(), ttl).unwrap();
///
@@ -191,10 +191,10 @@ impl BlockingClient {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::blocking_client;
/// use mini_redis::clients::BlockingClient;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
/// let mut client = BlockingClient::connect("localhost:6379").unwrap();
///
/// let val = client.publish("foo", "bar".into()).unwrap();
/// println!("Got = {:?}", val);

View File

@@ -1,39 +1,11 @@
use crate::client::Client;
use crate::clients::Client;
use crate::Result;
use bytes::Bytes;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
/// Create a new client request buffer
///
/// The `Client` performs Redis commands directly on the TCP connection. Only a
/// single request may be in-flight at a given time and operations require
/// mutable access to the `Client` handle. This prevents using a single Redis
/// connection from multiple Tokio tasks.
///
/// The strategy for dealing with this class of problem is to spawn a dedicated
/// Tokio task to manage the Redis connection and using "message passing" to
/// operate on the connection. Commands are pushed into a channel. The
/// connection task pops commands off of the channel and applies them to the
/// Redis connection. When the response is received, it is forwarded to the
/// original requester.
///
/// The returned `Buffer` handle may be cloned before passing the new handle to
/// separate tasks.
pub fn buffer(client: Client) -> Buffer {
// Setting the message limit to a hard coded value of 32. in a real-app, the
// buffer size should be configurable, but we don't need to do that here.
let (tx, rx) = channel(32);
// Spawn a task to process requests for the connection.
tokio::spawn(async move { run(client, rx).await });
// Return the `Buffer` handle.
Buffer { tx }
}
// Enum used to message pass the requested command from the `Buffer` handle
// Enum used to message pass the requested command from the `BufferedClient` handle
#[derive(Debug)]
enum Command {
Get(String),
@@ -53,7 +25,7 @@ type Message = (Command, oneshot::Sender<Result<Option<Bytes>>>);
/// response is returned back to the caller via a `oneshot`.
async fn run(mut client: Client, mut rx: Receiver<Message>) {
// Repeatedly pop messages from the channel. A return value of `None`
// indicates that all `Buffer` handles have dropped and there will never be
// indicates that all `BufferedClient` handles have dropped and there will never be
// another message sent on the channel.
while let Some((cmd, tx)) = rx.recv().await {
// The command is forwarded to the connection
@@ -71,11 +43,39 @@ async fn run(mut client: Client, mut rx: Receiver<Message>) {
}
#[derive(Clone)]
pub struct Buffer {
pub struct BufferedClient {
tx: Sender<Message>,
}
impl Buffer {
impl BufferedClient {
/// Create a new client request buffer
///
/// The `Client` performs Redis commands directly on the TCP connection. Only a
/// single request may be in-flight at a given time and operations require
/// mutable access to the `Client` handle. This prevents using a single Redis
/// connection from multiple Tokio tasks.
///
/// The strategy for dealing with this class of problem is to spawn a dedicated
/// Tokio task to manage the Redis connection and using "message passing" to
/// operate on the connection. Commands are pushed into a channel. The
/// connection task pops commands off of the channel and applies them to the
/// Redis connection. When the response is received, it is forwarded to the
/// original requester.
///
/// The returned `BufferedClient` handle may be cloned before passing the new handle to
/// separate tasks.
pub fn buffer(client: Client) -> BufferedClient {
// Setting the message limit to a hard coded value of 32. in a real-app, the
// buffer size should be configurable, but we don't need to do that here.
let (tx, rx) = channel(32);
// Spawn a task to process requests for the connection.
tokio::spawn(async move { run(client, rx).await });
// Return the `BufferedClient` handle.
BufferedClient { tx }
}
/// Get the value of a key.
///
/// Same as `Client::get` but requests are **buffered** until the associated

View File

@@ -51,42 +51,42 @@ pub struct Message {
pub content: Bytes,
}
/// Establish a connection with the Redis server located at `addr`.
///
/// `addr` may be any type that can be asynchronously converted to a
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
/// trait is the Tokio version and not the `std` version.
///
/// # Examples
///
/// ```no_run
/// use mini_redis::client;
///
/// #[tokio::main]
/// async fn main() {
/// let client = match client::connect("localhost:6379").await {
/// Ok(client) => client,
/// Err(_) => panic!("failed to establish connection"),
/// };
/// # drop(client);
/// }
/// ```
///
pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
// The `addr` argument is passed directly to `TcpStream::connect`. This
// performs any asynchronous DNS lookup and attempts to establish the TCP
// connection. An error at either step returns an error, which is then
// bubbled up to the caller of `mini_redis` connect.
let socket = TcpStream::connect(addr).await?;
// Initialize the connection state. This allocates read/write buffers to
// perform redis protocol frame parsing.
let connection = Connection::new(socket);
Ok(Client { connection })
}
impl Client {
/// Establish a connection with the Redis server located at `addr`.
///
/// `addr` may be any type that can be asynchronously converted to a
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
/// trait is the Tokio version and not the `std` version.
///
/// # Examples
///
/// ```no_run
/// use mini_redis::clients::Client;
///
/// #[tokio::main]
/// async fn main() {
/// let client = match Client::connect("localhost:6379").await {
/// Ok(client) => client,
/// Err(_) => panic!("failed to establish connection"),
/// };
/// # drop(client);
/// }
/// ```
///
pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
// The `addr` argument is passed directly to `TcpStream::connect`. This
// performs any asynchronous DNS lookup and attempts to establish the TCP
// connection. An error at either step returns an error, which is then
// bubbled up to the caller of `mini_redis` connect.
let socket = TcpStream::connect(addr).await?;
// Initialize the connection state. This allocates read/write buffers to
// perform redis protocol frame parsing.
let connection = Connection::new(socket);
Ok(Client { connection })
}
/// Ping to the server.
///
/// Returns PONG if no argument is provided, otherwise
@@ -99,11 +99,11 @@ impl Client {
///
/// Demonstrates basic usage.
/// ```no_run
/// use mini_redis::client;
/// use mini_redis::clients::Client;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = client::connect("localhost:6379").await.unwrap();
/// let mut client = Client::connect("localhost:6379").await.unwrap();
///
/// let pong = client.ping(None).await.unwrap();
/// assert_eq!(b"PONG", &pong[..]);
@@ -132,11 +132,11 @@ impl Client {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::client;
/// use mini_redis::clients::Client;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = client::connect("localhost:6379").await.unwrap();
/// let mut client = Client::connect("localhost:6379").await.unwrap();
///
/// let val = client.get("foo").await.unwrap();
/// println!("Got = {:?}", val);
@@ -178,11 +178,11 @@ impl Client {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::client;
/// use mini_redis::clients::Client;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = client::connect("localhost:6379").await.unwrap();
/// let mut client = Client::connect("localhost:6379").await.unwrap();
///
/// client.set("foo", "bar".into()).await.unwrap();
///
@@ -217,14 +217,14 @@ impl Client {
/// favorable.
///
/// ```no_run
/// use mini_redis::client;
/// use mini_redis::clients::Client;
/// use tokio::time;
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let ttl = Duration::from_millis(500);
/// let mut client = client::connect("localhost:6379").await.unwrap();
/// let mut client = Client::connect("localhost:6379").await.unwrap();
///
/// client.set_expires("foo", "bar".into(), ttl).await.unwrap();
///
@@ -282,11 +282,11 @@ impl Client {
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_redis::client;
/// use mini_redis::clients::Client;
///
/// #[tokio::main]
/// async fn main() {
/// let mut client = client::connect("localhost:6379").await.unwrap();
/// let mut client = Client::connect("localhost:6379").await.unwrap();
///
/// let val = client.publish("foo", "bar".into()).await.unwrap();
/// println!("Got = {:?}", val);

8
src/clients/mod.rs Normal file
View File

@@ -0,0 +1,8 @@
mod client;
pub use client::{Client, Message, Subscriber};
mod blocking_client;
pub use blocking_client::BlockingClient;
mod buffered_client;
pub use buffered_client::BufferedClient;

View File

@@ -16,7 +16,7 @@
//! * `server`: Redis server implementation. Includes a single `run` function
//! that takes a `TcpListener` and starts accepting redis client connections.
//!
//! * `client`: an asynchronous Redis client implementation. Demonstrates how to
//! * `clients/client`: an asynchronous Redis client implementation. Demonstrates how to
//! build clients with Tokio.
//!
//! * `cmd`: implementations of the supported Redis commands.
@@ -25,8 +25,8 @@
//! intermediate representation between a "command" and the byte
//! representation.
pub mod blocking_client;
pub mod client;
pub mod clients;
pub use clients::{BlockingClient, BufferedClient, Client};
pub mod cmd;
pub use cmd::Command;
@@ -46,9 +46,6 @@ use parse::{Parse, ParseError};
pub mod server;
mod buffer;
pub use buffer::{buffer, Buffer};
mod shutdown;
use shutdown::Shutdown;

View File

@@ -1,4 +1,7 @@
use mini_redis::{buffer, client, server};
use mini_redis::{
clients::{BufferedClient, Client},
server,
};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
@@ -11,8 +14,8 @@ use tokio::task::JoinHandle;
async fn pool_key_value_get_set() {
let (addr, _) = start_server().await;
let client = client::connect(addr).await.unwrap();
let mut client = buffer(client);
let client = Client::connect(addr).await.unwrap();
let mut client = BufferedClient::buffer(client);
client.set("hello", "world".into()).await.unwrap();

View File

@@ -1,4 +1,4 @@
use mini_redis::{client, server};
use mini_redis::{clients::Client, server};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
@@ -8,7 +8,7 @@ use tokio::task::JoinHandle;
#[tokio::test]
async fn ping_pong_without_message() {
let (addr, _) = start_server().await;
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
let pong = client.ping(None).await.unwrap();
assert_eq!(b"PONG", &pong[..]);
@@ -19,7 +19,7 @@ async fn ping_pong_without_message() {
#[tokio::test]
async fn ping_pong_with_message() {
let (addr, _) = start_server().await;
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
let pong = client.ping(Some("你好世界".into())).await.unwrap();
assert_eq!("你好世界".as_bytes(), &pong[..]);
@@ -32,7 +32,7 @@ async fn ping_pong_with_message() {
async fn key_value_get_set() {
let (addr, _) = start_server().await;
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
client.set("hello", "world".into()).await.unwrap();
let value = client.get("hello").await.unwrap().unwrap();
@@ -45,11 +45,11 @@ async fn key_value_get_set() {
async fn receive_message_subscribed_channel() {
let (addr, _) = start_server().await;
let client = client::connect(addr.clone()).await.unwrap();
let client = Client::connect(addr.clone()).await.unwrap();
let mut subscriber = client.subscribe(vec!["hello".into()]).await.unwrap();
tokio::spawn(async move {
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
client.publish("hello", "world".into()).await.unwrap()
});
@@ -63,14 +63,14 @@ async fn receive_message_subscribed_channel() {
async fn receive_message_multiple_subscribed_channels() {
let (addr, _) = start_server().await;
let client = client::connect(addr.clone()).await.unwrap();
let client = Client::connect(addr.clone()).await.unwrap();
let mut subscriber = client
.subscribe(vec!["hello".into(), "world".into()])
.await
.unwrap();
tokio::spawn(async move {
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
client.publish("hello", "world".into()).await.unwrap()
});
@@ -79,7 +79,7 @@ async fn receive_message_multiple_subscribed_channels() {
assert_eq!(b"world", &message1.content[..]);
tokio::spawn(async move {
let mut client = client::connect(addr).await.unwrap();
let mut client = Client::connect(addr).await.unwrap();
client.publish("world", "howdy?".into()).await.unwrap()
});
@@ -94,7 +94,7 @@ async fn receive_message_multiple_subscribed_channels() {
async fn unsubscribes_from_channels() {
let (addr, _) = start_server().await;
let client = client::connect(addr.clone()).await.unwrap();
let client = Client::connect(addr.clone()).await.unwrap();
let mut subscriber = client
.subscribe(vec!["hello".into(), "world".into()])
.await