diff --git a/src/blocking_client.rs b/src/blocking_client.rs new file mode 100644 index 0000000..962a1e9 --- /dev/null +++ b/src/blocking_client.rs @@ -0,0 +1,264 @@ +//! Minimal blocking Redis client implementation +//! +//! Provides a blocking connect and methods for issuing the supported commands. + +use bytes::Bytes; +use std::time::Duration; +use tokio::net::ToSocketAddrs; +use tokio::runtime::Runtime; + +pub use crate::client::Message; + +/// Established connection with a Redis server. +/// +/// Backed by a single `TcpStream`, `BlockingClient` provides basic network +/// client functionality (no pooling, retrying, ...). Connections are +/// established using the [`connect`](fn@connect) function. +/// +/// Requests are issued using the various methods of `Client`. +pub struct BlockingClient { + /// The asynchronous `Client`. + inner: crate::client::Client, + + /// A `current_thread` runtime for executing operations on the asynchronous + /// client in a blocking manner. + rt: Runtime, +} + +/// A client that has entered pub/sub mode. +/// +/// Once clients subscribe to a channel, they may only perform pub/sub related +/// commands. The `BlockingClient` type is transitioned to a +/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being +/// called. +pub struct BlockingSubscriber { + /// The asynchronous `Subscriber`. + inner: crate::client::Subscriber, + + /// A `current_thread` runtime for executing operations on the asynchronous + /// `Subscriber` in a blocking manner. + rt: Runtime, +} + +/// The iterator returned by `Subscriber::into_iter`. +struct SubscriberIterator { + /// The asynchronous `Subscriber`. + inner: crate::client::Subscriber, + + /// A `current_thread` runtime for executing operations on the asynchronous + /// `Subscriber` in a blocking manner. + rt: Runtime, +} + +/// Establish a connection with the Redis server located at `addr`. +/// +/// `addr` may be any type that can be asynchronously converted to a +/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs` +/// trait is the Tokio version and not the `std` version. +/// +/// # Examples +/// +/// ```no_run +/// use mini_redis::blocking_client; +/// +/// fn main() { +/// let client = match blocking_client::connect("localhost:6379") { +/// Ok(client) => client, +/// Err(_) => panic!("failed to establish connection"), +/// }; +/// # drop(client); +/// } +/// ``` +pub fn connect(addr: T) -> crate::Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let inner = rt.block_on(crate::client::connect(addr))?; + + Ok(BlockingClient { inner, rt }) +} + +impl BlockingClient { + /// Get the value of key. + /// + /// If the key does not exist the special value `None` is returned. + /// + /// # Examples + /// + /// Demonstrates basic usage. + /// + /// ```no_run + /// use mini_redis::blocking_client; + /// + /// fn main() { + /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// + /// let val = client.get("foo").unwrap(); + /// println!("Got = {:?}", val); + /// } + /// ``` + pub fn get(&mut self, key: &str) -> crate::Result> { + self.rt.block_on(self.inner.get(key)) + } + + /// Set `key` to hold the given `value`. + /// + /// The `value` is associated with `key` until it is overwritten by the next + /// call to `set` or it is removed. + /// + /// If key already holds a value, it is overwritten. Any previous time to + /// live associated with the key is discarded on successful SET operation. + /// + /// # Examples + /// + /// Demonstrates basic usage. + /// + /// ```no_run + /// use mini_redis::blocking_client; + /// + /// fn main() { + /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// + /// client.set("foo", "bar".into()).unwrap(); + /// + /// // Getting the value immediately works + /// let val = client.get("foo").unwrap().unwrap(); + /// assert_eq!(val, "bar"); + /// } + /// ``` + pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { + self.rt.block_on(self.inner.set(key, value)) + } + + /// Set `key` to hold the given `value`. The value expires after `expiration` + /// + /// The `value` is associated with `key` until one of the following: + /// - it expires. + /// - it is overwritten by the next call to `set`. + /// - it is removed. + /// + /// If key already holds a value, it is overwritten. Any previous time to + /// live associated with the key is discarded on a successful SET operation. + /// + /// # Examples + /// + /// Demonstrates basic usage. This example is not **guaranteed** to always + /// work as it relies on time based logic and assumes the client and server + /// stay relatively synchronized in time. The real world tends to not be so + /// favorable. + /// + /// ```no_run + /// use mini_redis::blocking_client; + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let ttl = Duration::from_millis(500); + /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// + /// client.set_expires("foo", "bar".into(), ttl).unwrap(); + /// + /// // Getting the value immediately works + /// let val = client.get("foo").unwrap().unwrap(); + /// assert_eq!(val, "bar"); + /// + /// // Wait for the TTL to expire + /// thread::sleep(ttl); + /// + /// let val = client.get("foo").unwrap(); + /// assert!(val.is_some()); + /// } + /// ``` + pub fn set_expires( + &mut self, + key: &str, + value: Bytes, + expiration: Duration, + ) -> crate::Result<()> { + self.rt + .block_on(self.inner.set_expires(key, value, expiration)) + } + + /// Posts `message` to the given `channel`. + /// + /// Returns the number of subscribers currently listening on the channel. + /// There is no guarantee that these subscribers receive the message as they + /// may disconnect at any time. + /// + /// # Examples + /// + /// Demonstrates basic usage. + /// + /// ```no_run + /// use mini_redis::blocking_client; + /// + /// fn main() { + /// let mut client = blocking_client::connect("localhost:6379").unwrap(); + /// + /// let val = client.publish("foo", "bar".into()).unwrap(); + /// println!("Got = {:?}", val); + /// } + /// ``` + pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result { + self.rt.block_on(self.inner.publish(channel, message)) + } + + /// Subscribes the client to the specified channels. + /// + /// Once a client issues a subscribe command, it may no longer issue any + /// non-pub/sub commands. The function consumes `self` and returns a + /// `BlockingSubscriber`. + /// + /// The `BlockingSubscriber` value is used to receive messages as well as + /// manage the list of channels the client is subscribed to. + pub fn subscribe(self, channels: Vec) -> crate::Result { + let subscriber = self.rt.block_on(self.inner.subscribe(channels))?; + Ok(BlockingSubscriber { + inner: subscriber, + rt: self.rt, + }) + } +} + +impl BlockingSubscriber { + /// Returns the set of channels currently subscribed to. + pub fn get_subscribed(&self) -> &[String] { + self.inner.get_subscribed() + } + + /// Receive the next message published on a subscribed channel, waiting if + /// necessary. + /// + /// `None` indicates the subscription has been terminated. + pub fn next_message(&mut self) -> crate::Result> { + self.rt.block_on(self.inner.next_message()) + } + + /// Convert the subscriber into an `Iterator` yielding new messages published + /// on subscribed channels. + pub fn into_iter(self) -> impl Iterator> { + SubscriberIterator { + inner: self.inner, + rt: self.rt, + } + } + + /// Subscribe to a list of new channels + pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> { + self.rt.block_on(self.inner.subscribe(channels)) + } + + /// Unsubscribe to a list of new channels + pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> { + self.rt.block_on(self.inner.unsubscribe(channels)) + } +} + +impl Iterator for SubscriberIterator { + type Item = crate::Result; + + fn next(&mut self) -> Option> { + self.rt.block_on(self.inner.next_message()).transpose() + } +} diff --git a/src/lib.rs b/src/lib.rs index 4af024d..e12588e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ //! intermediate representation between a "command" and the byte //! representation. +pub mod blocking_client; pub mod client; pub mod cmd;