diff --git a/examples/sub.rs b/examples/sub.rs index 761b106..ad87cb9 100644 --- a/examples/sub.rs +++ b/examples/sub.rs @@ -24,13 +24,15 @@ pub async fn main() -> Result<()> { // Open a connection to the mini-redis address. let client = client::connect("127.0.0.1:6379").await?; - // subscribe to channel foo let mut subscriber = client.subscribe(vec!["foo".into()]).await?; // await messages on channel foo if let Some(msg) = subscriber.next_message().await? { - println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content); + println!( + "got message from the channel: {}; message = {:?}", + msg.channel, msg.content + ); } Ok(()) diff --git a/src/bin/cli.rs b/src/bin/cli.rs index a970584..e54d8f2 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -24,7 +24,7 @@ enum Command { /// Get the value of key. Get { /// Name of key to get - key: String + key: String, }, /// Set key to hold the string value. Set { @@ -76,11 +76,19 @@ async fn main() -> mini_redis::Result<()> { println!("(nil)"); } } - Command::Set { key, value, expires: None } => { + Command::Set { + key, + value, + expires: None, + } => { client.set(&key, value).await?; println!("OK"); } - Command::Set { key, value, expires: Some(expires) } => { + Command::Set { + key, + value, + expires: Some(expires), + } => { client.set_expires(&key, value, expires).await?; println!("OK"); } diff --git a/src/client.rs b/src/client.rs index 4764676..917ba95 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,13 +5,13 @@ use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe}; use crate::{Connection, Frame}; +use async_stream::try_stream; use bytes::Bytes; use std::io::{Error, ErrorKind}; use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; use tokio::stream::Stream; use tracing::{debug, instrument}; -use async_stream::try_stream; /// Established connection with a Redis server. /// @@ -416,7 +416,8 @@ impl Subscriber { self.client.subscribe_cmd(channels).await?; // Update the set of subscribed channels. - self.subscribed_channels.extend(channels.iter().map(Clone::clone)); + self.subscribed_channels + .extend(channels.iter().map(Clone::clone)); Ok(()) } @@ -462,7 +463,7 @@ impl Subscriber { if self.subscribed_channels.len() != len - 1 { return Err(response.to_error()); } - }, + } _ => return Err(response.to_error()), }, frame => return Err(frame.to_error()), diff --git a/src/cmd/get.rs b/src/cmd/get.rs index 573d68e..3a0cc6e 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -17,7 +17,9 @@ pub struct Get { impl Get { /// Create a new `Get` command which fetches `key`. pub(crate) fn new(key: impl ToString) -> Get { - Get { key: key.to_string() } + Get { + key: key.to_string(), + } } /// Parse a `Get` instance from a received frame. diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index a99363d..ef56ee7 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -25,7 +25,7 @@ pub(crate) enum Command { Set(Set), Subscribe(Subscribe), Unsubscribe(Unsubscribe), - Unknown(Unknown) + Unknown(Unknown), } impl Command { @@ -66,7 +66,7 @@ impl Command { // the command is not recognized, there is most likely // unconsumed fields remaining in the `Parse` instance. return Ok(Command::Unknown(Unknown::new(command_name))); - }, + } }; // Check if there is any remaining unconsumed fields in the `Parse` diff --git a/src/cmd/set.rs b/src/cmd/set.rs index e417ce6..e18b57b 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,5 +1,5 @@ -use crate::{Connection, Db, Frame}; use crate::cmd::{Parse, ParseError}; +use crate::{Connection, Db, Frame}; use bytes::Bytes; use std::time::Duration; diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index d93db90..aa20170 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -27,7 +27,9 @@ pub struct Unsubscribe { impl Subscribe { /// Creates a new `Subscribe` command to listen on the specified channels. pub(crate) fn new(channels: &[String]) -> Subscribe { - Subscribe { channels: channels.to_vec() } + Subscribe { + channels: channels.to_vec(), + } } /// Parse a `Subscribe` instance from a received frame. @@ -221,7 +223,9 @@ impl Subscribe { impl Unsubscribe { /// Create a new `Unsubscribe` command with the given `channels`. pub(crate) fn new(channels: &[String]) -> Unsubscribe { - Unsubscribe { channels: channels.to_vec() } + Unsubscribe { + channels: channels.to_vec(), + } } /// Parse a `Unsubscribe` instance from a received frame. diff --git a/src/cmd/unknown.rs b/src/cmd/unknown.rs index fc7c5ea..25f869a 100644 --- a/src/cmd/unknown.rs +++ b/src/cmd/unknown.rs @@ -12,7 +12,9 @@ impl Unknown { /// Create a new `Unknown` command which responds to unknown commands /// issued by clients pub(crate) fn new(key: impl ToString) -> Unknown { - Unknown { command_name: key.to_string() } + Unknown { + command_name: key.to_string(), + } } /// Returns the command name diff --git a/src/connection.rs b/src/connection.rs index d010466..258f20c 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -54,7 +54,7 @@ impl Connection { /// # Returns /// /// On success, the received frame is returned. If the `TcpStream` - /// is closed in a way that doesn't break a frame in half, it retuns + /// is closed in a way that doesn't break a frame in half, it retuns /// `None`. Otherwise, an error is returned. pub(crate) async fn read_frame(&mut self) -> crate::Result> { use frame::Error::Incomplete; diff --git a/src/parse.rs b/src/parse.rs index 74b26b1..4306150 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -67,7 +67,11 @@ impl Parse { Frame::Bulk(data) => str::from_utf8(&data[..]) .map(|s| s.to_string()) .map_err(|_| "protocol error; invalid string".into()), - frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()), + frame => Err(format!( + "protocol error; expected simple frame or bulk frame, got {:?}", + frame + ) + .into()), } } @@ -83,7 +87,11 @@ impl Parse { // raw bytes, they are considered separate types. Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())), Frame::Bulk(data) => Ok(data), - frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()), + frame => Err(format!( + "protocol error; expected simple frame or bulk frame, got {:?}", + frame + ) + .into()), } } @@ -135,13 +143,10 @@ impl From<&str> for ParseError { impl fmt::Display for ParseError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ParseError::EndOfStream => { - "protocol error; unexpected end of stream".fmt(f) - } + ParseError::EndOfStream => "protocol error; unexpected end of stream".fmt(f), ParseError::Other(err) => err.fmt(f), } } } -impl std::error::Error for ParseError { -} +impl std::error::Error for ParseError {} diff --git a/src/server.rs b/src/server.rs index 4be37fe..e9d6d60 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; -use tracing::{debug, error, instrument, info}; +use tracing::{debug, error, info, instrument}; /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. @@ -184,7 +184,11 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< // Extract the `shutdown_complete` receiver and transmitter // explicitly drop `shutdown_transmitter`. This is important, as the // `.await` below would otherwise never complete. - let Listener { mut shutdown_complete_rx, shutdown_complete_tx, .. } = server; + let Listener { + mut shutdown_complete_rx, + shutdown_complete_tx, + .. + } = server; drop(shutdown_complete_tx); diff --git a/tests/client.rs b/tests/client.rs index 9b1a837..fb19a5a 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,7 +1,7 @@ +use mini_redis::{client, server}; use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::task::JoinHandle; -use mini_redis::{client, server}; /// A basic "hello world" style test. A server instance is started in a /// background task. A client instance is then established and set and get @@ -42,7 +42,10 @@ async fn receive_message_multiple_subscribed_channels() { let (addr, _) = start_server().await; let client = client::connect(addr.clone()).await.unwrap(); - let mut subscriber = client.subscribe(vec!["hello".into(), "world".into()]).await.unwrap(); + let mut subscriber = client + .subscribe(vec!["hello".into(), "world".into()]) + .await + .unwrap(); tokio::spawn(async move { let mut client = client::connect(addr).await.unwrap(); @@ -58,7 +61,6 @@ async fn receive_message_multiple_subscribed_channels() { client.publish("world", "howdy?".into()).await.unwrap() }); - let message2 = subscriber.next_message().await.unwrap().unwrap(); assert_eq!("world", &message2.channel); assert_eq!(b"howdy?", &message2.content[..]) @@ -71,7 +73,10 @@ async fn unsubscribes_from_channels() { let (addr, _) = start_server().await; let client = client::connect(addr.clone()).await.unwrap(); - let mut subscriber = client.subscribe(vec!["hello".into(), "world".into()]).await.unwrap(); + let mut subscriber = client + .subscribe(vec!["hello".into(), "world".into()]) + .await + .unwrap(); subscriber.unsubscribe(&[]).await.unwrap(); assert_eq!(subscriber.get_subscribed().len(), 0); @@ -81,10 +86,7 @@ async fn start_server() -> (SocketAddr, JoinHandle>) { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - let handle = tokio::spawn(async move { - server::run(listener, tokio::signal::ctrl_c()).await - }); + let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await }); (addr, handle) } - diff --git a/tests/server.rs b/tests/server.rs index 0536ed6..4ac410c 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,6 +1,6 @@ use mini_redis::server; -use std::net::{SocketAddr, Shutdown}; +use std::net::{Shutdown, SocketAddr}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::time::{self, Duration}; @@ -17,7 +17,10 @@ async fn key_value_get_set() { let mut stream = TcpStream::connect(addr).await.unwrap(); // Get a key, data is missing - stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Read nil response let mut response = [0; 5]; @@ -25,7 +28,10 @@ async fn key_value_get_set() { assert_eq!(b"$-1\r\n", &response); // Set a key - stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + stream + .write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await + .unwrap(); // Read OK let mut response = [0; 5]; @@ -33,7 +39,10 @@ async fn key_value_get_set() { assert_eq!(b"+OK\r\n", &response); // Get the key, data is present - stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Shutdown the write half stream.shutdown(Shutdown::Write).unwrap(); @@ -65,8 +74,13 @@ async fn key_value_timeout() { let mut stream = TcpStream::connect(addr).await.unwrap(); // Set a key - stream.write_all(b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\ - +EX\r\n:1\r\n").await.unwrap(); + stream + .write_all( + b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\ + +EX\r\n:1\r\n", + ) + .await + .unwrap(); let mut response = [0; 5]; @@ -76,7 +90,10 @@ async fn key_value_timeout() { assert_eq!(b"+OK\r\n", &response); // Get the key, data is present - stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Read "world" response let mut response = [0; 11]; @@ -89,7 +106,10 @@ async fn key_value_timeout() { time::advance(Duration::from_secs(1)).await; // Get a key, data is missing - stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Read nil response let mut response = [0; 5]; @@ -107,7 +127,10 @@ async fn pub_sub() { // Publish a message, there are no subscribers yet so the server will // return `0`. - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); @@ -116,15 +139,23 @@ async fn pub_sub() { // Create a subscriber. This subscriber will only subscribe to the `hello` // channel. let mut sub1 = TcpStream::connect(addr).await.unwrap(); - sub1.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap(); + sub1.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub1.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], + &response[..] + ); // Publish a message, there now is a subscriber - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); @@ -133,31 +164,48 @@ async fn pub_sub() { // The first subscriber received the message let mut response = [0; 39]; sub1.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\nworld\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\nworld\r\n"[..], + &response[..] + ); // Create a second subscriber // // This subscriber will be subscribed to both `hello` and `foo` let mut sub2 = TcpStream::connect(addr).await.unwrap(); - sub2.write_all(b"*3\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n$3\r\nfoo\r\n").await.unwrap(); + sub2.write_all(b"*3\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n$3\r\nfoo\r\n") + .await + .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub2.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], + &response[..] + ); let mut response = [0; 32]; sub2.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], + &response[..] + ); // Publish another message on `hello`, there are two subscribers - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\njazzy\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\njazzy\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":2\r\n", &response); // Publish a message on `foo`, there is only one subscriber - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); @@ -166,21 +214,32 @@ async fn pub_sub() { // The first subscriber received the message let mut response = [0; 39]; sub1.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], + &response[..] + ); // The second subscriber received the message let mut response = [0; 39]; sub2.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], + &response[..] + ); // The first subscriber **did not** receive the second message let mut response = [0; 1]; - time::timeout(Duration::from_millis(100), sub1.read(&mut response)).await.unwrap_err(); + time::timeout(Duration::from_millis(100), sub1.read(&mut response)) + .await + .unwrap_err(); // The second subscriber **did** receive the message let mut response = [0; 35]; sub2.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], + &response[..] + ); } #[tokio::test] @@ -191,34 +250,55 @@ async fn manage_subscription() { // Create a subscriber let mut sub = TcpStream::connect(addr).await.unwrap(); - sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap(); + sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n") + .await + .unwrap(); // Read the subscribe response let mut response = [0; 34]; sub.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], + &response[..] + ); // Update subscription to add `foo` - sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$3\r\nfoo\r\n").await.unwrap(); + sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$3\r\nfoo\r\n") + .await + .unwrap(); let mut response = [0; 32]; sub.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], + &response[..] + ); // Update subscription to remove `hello` - sub.write_all(b"*2\r\n$11\r\nUNSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap(); + sub.write_all(b"*2\r\n$11\r\nUNSUBSCRIBE\r\n$5\r\nhello\r\n") + .await + .unwrap(); let mut response = [0; 37]; sub.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], + &response[..] + ); // Publish a message to `hello` and then a message to `foo` - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":0\r\n", &response); - publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n").await.unwrap(); + publisher + .write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n") + .await + .unwrap(); let mut response = [0; 4]; publisher.read_exact(&mut response).await.unwrap(); assert_eq!(b":1\r\n", &response); @@ -227,18 +307,28 @@ async fn manage_subscription() { // The second subscriber **did** receive the message let mut response = [0; 35]; sub.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], + &response[..] + ); // No more messages let mut response = [0; 1]; - time::timeout(Duration::from_millis(100), sub.read(&mut response)).await.unwrap_err(); + time::timeout(Duration::from_millis(100), sub.read(&mut response)) + .await + .unwrap_err(); // Unsubscribe from all channels - sub.write_all(b"*1\r\n$11\r\nunsubscribe\r\n").await.unwrap(); + sub.write_all(b"*1\r\n$11\r\nunsubscribe\r\n") + .await + .unwrap(); let mut response = [0; 35]; sub.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"[..], + &response[..] + ); } // In this case we test that server Responds with an Error message if a client @@ -251,7 +341,10 @@ async fn send_error_unknown_command() { let mut stream = TcpStream::connect(addr).await.unwrap(); // Get a key, data is missing - stream.write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n") + .await + .unwrap(); let mut response = [0; 28]; @@ -269,22 +362,34 @@ async fn send_error_get_set_after_subscribe() { let mut stream = TcpStream::connect(addr).await.unwrap(); // send SUBSCRIBE command - stream.write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n") + .await + .unwrap(); let mut response = [0; 34]; stream.read_exact(&mut response).await.unwrap(); - assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]); + assert_eq!( + &b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], + &response[..] + ); - stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap(); + stream + .write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n") + .await + .unwrap(); let mut response = [0; 28]; stream.read_exact(&mut response).await.unwrap(); assert_eq!(b"-ERR unknown command \'set\'\r\n", &response); - stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap(); + stream + .write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n") + .await + .unwrap(); let mut response = [0; 28]; @@ -296,9 +401,7 @@ async fn start_server() -> SocketAddr { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - tokio::spawn(async move { - server::run(listener, tokio::signal::ctrl_c()).await - }); + tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await }); addr }