From 79422c071497bfce9e98c8229fbe55fccf923b4f Mon Sep 17 00:00:00 2001 From: Ting Sun Date: Fri, 14 Apr 2023 18:47:10 +0800 Subject: [PATCH] collect all clients to one single folder (#119) --- examples/hello_world.rs | 4 +- examples/pub.rs | 4 +- examples/sub.rs | 4 +- src/bin/cli.rs | 4 +- src/{ => clients}/blocking_client.rs | 82 ++++++++--------- src/{buffer.rs => clients/buffered_client.rs} | 66 +++++++------- src/{ => clients}/client.rs | 90 +++++++++---------- src/clients/mod.rs | 8 ++ src/lib.rs | 9 +- tests/{buffer.rs => buffered_client.rs} | 9 +- tests/client.rs | 20 ++--- 11 files changed, 154 insertions(+), 146 deletions(-) rename src/{ => clients}/blocking_client.rs (81%) rename src/{buffer.rs => clients/buffered_client.rs} (65%) rename src/{ => clients}/client.rs (89%) create mode 100644 src/clients/mod.rs rename tests/{buffer.rs => buffered_client.rs} (83%) diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 34d2ae8..16e85c6 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -13,12 +13,12 @@ #![warn(rust_2018_idioms)] -use mini_redis::{client, Result}; +use mini_redis::{clients::Client, Result}; #[tokio::main] pub async fn main() -> Result<()> { // Open a connection to the mini-redis address. - let mut client = client::connect("127.0.0.1:6379").await?; + let mut client = Client::connect("127.0.0.1:6379").await?; // Set the key "hello" with value "world" client.set("hello", "world".into()).await?; diff --git a/examples/pub.rs b/examples/pub.rs index bdae6dd..016ad0b 100644 --- a/examples/pub.rs +++ b/examples/pub.rs @@ -17,12 +17,12 @@ #![warn(rust_2018_idioms)] -use mini_redis::{client, Result}; +use mini_redis::{clients::Client, Result}; #[tokio::main] async fn main() -> Result<()> { // Open a connection to the mini-redis address. - let mut client = client::connect("127.0.0.1:6379").await?; + let mut client = Client::connect("127.0.0.1:6379").await?; // publish message `bar` on channel foo client.publish("foo", "bar".into()).await?; diff --git a/examples/sub.rs b/examples/sub.rs index 2d0a2cc..3ed4851 100644 --- a/examples/sub.rs +++ b/examples/sub.rs @@ -17,12 +17,12 @@ #![warn(rust_2018_idioms)] -use mini_redis::{client, Result}; +use mini_redis::{clients::Client, Result}; #[tokio::main] pub async fn main() -> Result<()> { // Open a connection to the mini-redis address. - let client = client::connect("127.0.0.1:6379").await?; + let client = Client::connect("127.0.0.1:6379").await?; // subscribe to channel foo let mut subscriber = client.subscribe(vec!["foo".into()]).await?; diff --git a/src/bin/cli.rs b/src/bin/cli.rs index 8b80c33..61ed8d2 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,4 +1,4 @@ -use mini_redis::{client, DEFAULT_PORT}; +use mini_redis::{clients::Client, DEFAULT_PORT}; use bytes::Bytes; use clap::{Parser, Subcommand}; @@ -86,7 +86,7 @@ async fn main() -> mini_redis::Result<()> { let addr = format!("{}:{}", cli.host, cli.port); // Establish a connection - let mut client = client::connect(&addr).await?; + let mut client = Client::connect(&addr).await?; // Process the requested command match cli.command { diff --git a/src/blocking_client.rs b/src/clients/blocking_client.rs similarity index 81% rename from src/blocking_client.rs rename to src/clients/blocking_client.rs index 962a1e9..0519400 100644 --- a/src/blocking_client.rs +++ b/src/clients/blocking_client.rs @@ -7,7 +7,7 @@ use std::time::Duration; use tokio::net::ToSocketAddrs; use tokio::runtime::Runtime; -pub use crate::client::Message; +pub use crate::clients::Message; /// Established connection with a Redis server. /// @@ -18,7 +18,7 @@ pub use crate::client::Message; /// Requests are issued using the various methods of `Client`. pub struct BlockingClient { /// The asynchronous `Client`. - inner: crate::client::Client, + inner: crate::clients::Client, /// A `current_thread` runtime for executing operations on the asynchronous /// client in a blocking manner. @@ -33,7 +33,7 @@ pub struct BlockingClient { /// called. pub struct BlockingSubscriber { /// The asynchronous `Subscriber`. - inner: crate::client::Subscriber, + inner: crate::clients::Subscriber, /// A `current_thread` runtime for executing operations on the asynchronous /// `Subscriber` in a blocking manner. @@ -43,43 +43,43 @@ pub struct BlockingSubscriber { /// The iterator returned by `Subscriber::into_iter`. struct SubscriberIterator { /// The asynchronous `Subscriber`. - inner: crate::client::Subscriber, + inner: crate::clients::Subscriber, /// A `current_thread` runtime for executing operations on the asynchronous /// `Subscriber` in a blocking manner. rt: Runtime, } -/// Establish a connection with the Redis server located at `addr`. -/// -/// `addr` may be any type that can be asynchronously converted to a -/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs` -/// trait is the Tokio version and not the `std` version. -/// -/// # Examples -/// -/// ```no_run -/// use mini_redis::blocking_client; -/// -/// fn main() { -/// let client = match blocking_client::connect("localhost:6379") { -/// Ok(client) => client, -/// Err(_) => panic!("failed to establish connection"), -/// }; -/// # drop(client); -/// } -/// ``` -pub fn connect(addr: T) -> crate::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let inner = rt.block_on(crate::client::connect(addr))?; - - Ok(BlockingClient { inner, rt }) -} - impl BlockingClient { + /// Establish a connection with the Redis server located at `addr`. + /// + /// `addr` may be any type that can be asynchronously converted to a + /// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs` + /// trait is the Tokio version and not the `std` version. + /// + /// # Examples + /// + /// ```no_run + /// use mini_redis::clients::BlockingClient; + /// + /// fn main() { + /// let client = match BlockingClient::connect("localhost:6379") { + /// Ok(client) => client, + /// Err(_) => panic!("failed to establish connection"), + /// }; + /// # drop(client); + /// } + /// ``` + pub fn connect(addr: T) -> crate::Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let inner = rt.block_on(crate::clients::Client::connect(addr))?; + + Ok(BlockingClient { inner, rt }) + } + /// Get the value of key. /// /// If the key does not exist the special value `None` is returned. @@ -89,10 +89,10 @@ impl BlockingClient { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::blocking_client; + /// use mini_redis::clients::BlockingClient; /// /// fn main() { - /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// let mut client = BlockingClient::connect("localhost:6379").unwrap(); /// /// let val = client.get("foo").unwrap(); /// println!("Got = {:?}", val); @@ -115,10 +115,10 @@ impl BlockingClient { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::blocking_client; + /// use mini_redis::clients::BlockingClient; /// /// fn main() { - /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// let mut client = BlockingClient::connect("localhost:6379").unwrap(); /// /// client.set("foo", "bar".into()).unwrap(); /// @@ -149,13 +149,13 @@ impl BlockingClient { /// favorable. /// /// ```no_run - /// use mini_redis::blocking_client; + /// use mini_redis::clients::BlockingClient; /// use std::thread; /// use std::time::Duration; /// /// fn main() { /// let ttl = Duration::from_millis(500); - /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// let mut client = BlockingClient::connect("localhost:6379").unwrap(); /// /// client.set_expires("foo", "bar".into(), ttl).unwrap(); /// @@ -191,10 +191,10 @@ impl BlockingClient { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::blocking_client; + /// use mini_redis::clients::BlockingClient; /// /// fn main() { - /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// let mut client = BlockingClient::connect("localhost:6379").unwrap(); /// /// let val = client.publish("foo", "bar".into()).unwrap(); /// println!("Got = {:?}", val); diff --git a/src/buffer.rs b/src/clients/buffered_client.rs similarity index 65% rename from src/buffer.rs rename to src/clients/buffered_client.rs index be7b0ee..b58f7de 100644 --- a/src/buffer.rs +++ b/src/clients/buffered_client.rs @@ -1,39 +1,11 @@ -use crate::client::Client; +use crate::clients::Client; use crate::Result; use bytes::Bytes; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; -/// Create a new client request buffer -/// -/// The `Client` performs Redis commands directly on the TCP connection. Only a -/// single request may be in-flight at a given time and operations require -/// mutable access to the `Client` handle. This prevents using a single Redis -/// connection from multiple Tokio tasks. -/// -/// The strategy for dealing with this class of problem is to spawn a dedicated -/// Tokio task to manage the Redis connection and using "message passing" to -/// operate on the connection. Commands are pushed into a channel. The -/// connection task pops commands off of the channel and applies them to the -/// Redis connection. When the response is received, it is forwarded to the -/// original requester. -/// -/// The returned `Buffer` handle may be cloned before passing the new handle to -/// separate tasks. -pub fn buffer(client: Client) -> Buffer { - // Setting the message limit to a hard coded value of 32. in a real-app, the - // buffer size should be configurable, but we don't need to do that here. - let (tx, rx) = channel(32); - - // Spawn a task to process requests for the connection. - tokio::spawn(async move { run(client, rx).await }); - - // Return the `Buffer` handle. - Buffer { tx } -} - -// Enum used to message pass the requested command from the `Buffer` handle +// Enum used to message pass the requested command from the `BufferedClient` handle #[derive(Debug)] enum Command { Get(String), @@ -53,7 +25,7 @@ type Message = (Command, oneshot::Sender>>); /// response is returned back to the caller via a `oneshot`. async fn run(mut client: Client, mut rx: Receiver) { // Repeatedly pop messages from the channel. A return value of `None` - // indicates that all `Buffer` handles have dropped and there will never be + // indicates that all `BufferedClient` handles have dropped and there will never be // another message sent on the channel. while let Some((cmd, tx)) = rx.recv().await { // The command is forwarded to the connection @@ -71,11 +43,39 @@ async fn run(mut client: Client, mut rx: Receiver) { } #[derive(Clone)] -pub struct Buffer { +pub struct BufferedClient { tx: Sender, } -impl Buffer { +impl BufferedClient { + /// Create a new client request buffer + /// + /// The `Client` performs Redis commands directly on the TCP connection. Only a + /// single request may be in-flight at a given time and operations require + /// mutable access to the `Client` handle. This prevents using a single Redis + /// connection from multiple Tokio tasks. + /// + /// The strategy for dealing with this class of problem is to spawn a dedicated + /// Tokio task to manage the Redis connection and using "message passing" to + /// operate on the connection. Commands are pushed into a channel. The + /// connection task pops commands off of the channel and applies them to the + /// Redis connection. When the response is received, it is forwarded to the + /// original requester. + /// + /// The returned `BufferedClient` handle may be cloned before passing the new handle to + /// separate tasks. + pub fn buffer(client: Client) -> BufferedClient { + // Setting the message limit to a hard coded value of 32. in a real-app, the + // buffer size should be configurable, but we don't need to do that here. + let (tx, rx) = channel(32); + + // Spawn a task to process requests for the connection. + tokio::spawn(async move { run(client, rx).await }); + + // Return the `BufferedClient` handle. + BufferedClient { tx } + } + /// Get the value of a key. /// /// Same as `Client::get` but requests are **buffered** until the associated diff --git a/src/client.rs b/src/clients/client.rs similarity index 89% rename from src/client.rs rename to src/clients/client.rs index 3bdcdb5..607c77e 100644 --- a/src/client.rs +++ b/src/clients/client.rs @@ -51,42 +51,42 @@ pub struct Message { pub content: Bytes, } -/// Establish a connection with the Redis server located at `addr`. -/// -/// `addr` may be any type that can be asynchronously converted to a -/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs` -/// trait is the Tokio version and not the `std` version. -/// -/// # Examples -/// -/// ```no_run -/// use mini_redis::client; -/// -/// #[tokio::main] -/// async fn main() { -/// let client = match client::connect("localhost:6379").await { -/// Ok(client) => client, -/// Err(_) => panic!("failed to establish connection"), -/// }; -/// # drop(client); -/// } -/// ``` -/// -pub async fn connect(addr: T) -> crate::Result { - // The `addr` argument is passed directly to `TcpStream::connect`. This - // performs any asynchronous DNS lookup and attempts to establish the TCP - // connection. An error at either step returns an error, which is then - // bubbled up to the caller of `mini_redis` connect. - let socket = TcpStream::connect(addr).await?; - - // Initialize the connection state. This allocates read/write buffers to - // perform redis protocol frame parsing. - let connection = Connection::new(socket); - - Ok(Client { connection }) -} - impl Client { + /// Establish a connection with the Redis server located at `addr`. + /// + /// `addr` may be any type that can be asynchronously converted to a + /// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs` + /// trait is the Tokio version and not the `std` version. + /// + /// # Examples + /// + /// ```no_run + /// use mini_redis::clients::Client; + /// + /// #[tokio::main] + /// async fn main() { + /// let client = match Client::connect("localhost:6379").await { + /// Ok(client) => client, + /// Err(_) => panic!("failed to establish connection"), + /// }; + /// # drop(client); + /// } + /// ``` + /// + pub async fn connect(addr: T) -> crate::Result { + // The `addr` argument is passed directly to `TcpStream::connect`. This + // performs any asynchronous DNS lookup and attempts to establish the TCP + // connection. An error at either step returns an error, which is then + // bubbled up to the caller of `mini_redis` connect. + let socket = TcpStream::connect(addr).await?; + + // Initialize the connection state. This allocates read/write buffers to + // perform redis protocol frame parsing. + let connection = Connection::new(socket); + + Ok(Client { connection }) + } + /// Ping to the server. /// /// Returns PONG if no argument is provided, otherwise @@ -99,11 +99,11 @@ impl Client { /// /// Demonstrates basic usage. /// ```no_run - /// use mini_redis::client; + /// use mini_redis::clients::Client; /// /// #[tokio::main] /// async fn main() { - /// let mut client = client::connect("localhost:6379").await.unwrap(); + /// let mut client = Client::connect("localhost:6379").await.unwrap(); /// /// let pong = client.ping(None).await.unwrap(); /// assert_eq!(b"PONG", &pong[..]); @@ -132,11 +132,11 @@ impl Client { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::client; + /// use mini_redis::clients::Client; /// /// #[tokio::main] /// async fn main() { - /// let mut client = client::connect("localhost:6379").await.unwrap(); + /// let mut client = Client::connect("localhost:6379").await.unwrap(); /// /// let val = client.get("foo").await.unwrap(); /// println!("Got = {:?}", val); @@ -178,11 +178,11 @@ impl Client { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::client; + /// use mini_redis::clients::Client; /// /// #[tokio::main] /// async fn main() { - /// let mut client = client::connect("localhost:6379").await.unwrap(); + /// let mut client = Client::connect("localhost:6379").await.unwrap(); /// /// client.set("foo", "bar".into()).await.unwrap(); /// @@ -217,14 +217,14 @@ impl Client { /// favorable. /// /// ```no_run - /// use mini_redis::client; + /// use mini_redis::clients::Client; /// use tokio::time; /// use std::time::Duration; /// /// #[tokio::main] /// async fn main() { /// let ttl = Duration::from_millis(500); - /// let mut client = client::connect("localhost:6379").await.unwrap(); + /// let mut client = Client::connect("localhost:6379").await.unwrap(); /// /// client.set_expires("foo", "bar".into(), ttl).await.unwrap(); /// @@ -282,11 +282,11 @@ impl Client { /// Demonstrates basic usage. /// /// ```no_run - /// use mini_redis::client; + /// use mini_redis::clients::Client; /// /// #[tokio::main] /// async fn main() { - /// let mut client = client::connect("localhost:6379").await.unwrap(); + /// let mut client = Client::connect("localhost:6379").await.unwrap(); /// /// let val = client.publish("foo", "bar".into()).await.unwrap(); /// println!("Got = {:?}", val); diff --git a/src/clients/mod.rs b/src/clients/mod.rs new file mode 100644 index 0000000..36ae6cc --- /dev/null +++ b/src/clients/mod.rs @@ -0,0 +1,8 @@ +mod client; +pub use client::{Client, Message, Subscriber}; + +mod blocking_client; +pub use blocking_client::BlockingClient; + +mod buffered_client; +pub use buffered_client::BufferedClient; diff --git a/src/lib.rs b/src/lib.rs index 264a1fb..de67773 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ //! * `server`: Redis server implementation. Includes a single `run` function //! that takes a `TcpListener` and starts accepting redis client connections. //! -//! * `client`: an asynchronous Redis client implementation. Demonstrates how to +//! * `clients/client`: an asynchronous Redis client implementation. Demonstrates how to //! build clients with Tokio. //! //! * `cmd`: implementations of the supported Redis commands. @@ -25,8 +25,8 @@ //! intermediate representation between a "command" and the byte //! representation. -pub mod blocking_client; -pub mod client; +pub mod clients; +pub use clients::{BlockingClient, BufferedClient, Client}; pub mod cmd; pub use cmd::Command; @@ -46,9 +46,6 @@ use parse::{Parse, ParseError}; pub mod server; -mod buffer; -pub use buffer::{buffer, Buffer}; - mod shutdown; use shutdown::Shutdown; diff --git a/tests/buffer.rs b/tests/buffered_client.rs similarity index 83% rename from tests/buffer.rs rename to tests/buffered_client.rs index 823b720..bb51f59 100644 --- a/tests/buffer.rs +++ b/tests/buffered_client.rs @@ -1,4 +1,7 @@ -use mini_redis::{buffer, client, server}; +use mini_redis::{ + clients::{BufferedClient, Client}, + server, +}; use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -11,8 +14,8 @@ use tokio::task::JoinHandle; async fn pool_key_value_get_set() { let (addr, _) = start_server().await; - let client = client::connect(addr).await.unwrap(); - let mut client = buffer(client); + let client = Client::connect(addr).await.unwrap(); + let mut client = BufferedClient::buffer(client); client.set("hello", "world".into()).await.unwrap(); diff --git a/tests/client.rs b/tests/client.rs index 72cd28a..57c13b6 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,4 +1,4 @@ -use mini_redis::{client, server}; +use mini_redis::{clients::Client, server}; use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::task::JoinHandle; @@ -8,7 +8,7 @@ use tokio::task::JoinHandle; #[tokio::test] async fn ping_pong_without_message() { let (addr, _) = start_server().await; - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); let pong = client.ping(None).await.unwrap(); assert_eq!(b"PONG", &pong[..]); @@ -19,7 +19,7 @@ async fn ping_pong_without_message() { #[tokio::test] async fn ping_pong_with_message() { let (addr, _) = start_server().await; - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); let pong = client.ping(Some("你好世界".into())).await.unwrap(); assert_eq!("你好世界".as_bytes(), &pong[..]); @@ -32,7 +32,7 @@ async fn ping_pong_with_message() { async fn key_value_get_set() { let (addr, _) = start_server().await; - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); client.set("hello", "world".into()).await.unwrap(); let value = client.get("hello").await.unwrap().unwrap(); @@ -45,11 +45,11 @@ async fn key_value_get_set() { async fn receive_message_subscribed_channel() { let (addr, _) = start_server().await; - let client = client::connect(addr.clone()).await.unwrap(); + let client = Client::connect(addr.clone()).await.unwrap(); let mut subscriber = client.subscribe(vec!["hello".into()]).await.unwrap(); tokio::spawn(async move { - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); client.publish("hello", "world".into()).await.unwrap() }); @@ -63,14 +63,14 @@ async fn receive_message_subscribed_channel() { async fn receive_message_multiple_subscribed_channels() { let (addr, _) = start_server().await; - let client = client::connect(addr.clone()).await.unwrap(); + let client = Client::connect(addr.clone()).await.unwrap(); let mut subscriber = client .subscribe(vec!["hello".into(), "world".into()]) .await .unwrap(); tokio::spawn(async move { - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); client.publish("hello", "world".into()).await.unwrap() }); @@ -79,7 +79,7 @@ async fn receive_message_multiple_subscribed_channels() { assert_eq!(b"world", &message1.content[..]); tokio::spawn(async move { - let mut client = client::connect(addr).await.unwrap(); + let mut client = Client::connect(addr).await.unwrap(); client.publish("world", "howdy?".into()).await.unwrap() }); @@ -94,7 +94,7 @@ async fn receive_message_multiple_subscribed_channels() { async fn unsubscribes_from_channels() { let (addr, _) = start_server().await; - let client = client::connect(addr.clone()).await.unwrap(); + let client = Client::connect(addr.clone()).await.unwrap(); let mut subscriber = client .subscribe(vec!["hello".into(), "world".into()]) .await