fmt (#34)
This commit is contained in:
@@ -24,13 +24,15 @@ pub async fn main() -> Result<()> {
|
||||
// Open a connection to the mini-redis address.
|
||||
let client = client::connect("127.0.0.1:6379").await?;
|
||||
|
||||
|
||||
// subscribe to channel foo
|
||||
let mut subscriber = client.subscribe(vec!["foo".into()]).await?;
|
||||
|
||||
// await messages on channel foo
|
||||
if let Some(msg) = subscriber.next_message().await? {
|
||||
println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content);
|
||||
println!(
|
||||
"got message from the channel: {}; message = {:?}",
|
||||
msg.channel, msg.content
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -24,7 +24,7 @@ enum Command {
|
||||
/// Get the value of key.
|
||||
Get {
|
||||
/// Name of key to get
|
||||
key: String
|
||||
key: String,
|
||||
},
|
||||
/// Set key to hold the string value.
|
||||
Set {
|
||||
@@ -76,11 +76,19 @@ async fn main() -> mini_redis::Result<()> {
|
||||
println!("(nil)");
|
||||
}
|
||||
}
|
||||
Command::Set { key, value, expires: None } => {
|
||||
Command::Set {
|
||||
key,
|
||||
value,
|
||||
expires: None,
|
||||
} => {
|
||||
client.set(&key, value).await?;
|
||||
println!("OK");
|
||||
}
|
||||
Command::Set { key, value, expires: Some(expires) } => {
|
||||
Command::Set {
|
||||
key,
|
||||
value,
|
||||
expires: Some(expires),
|
||||
} => {
|
||||
client.set_expires(&key, value, expires).await?;
|
||||
println!("OK");
|
||||
}
|
||||
|
||||
@@ -5,13 +5,13 @@
|
||||
use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe};
|
||||
use crate::{Connection, Frame};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::time::Duration;
|
||||
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||
use tokio::stream::Stream;
|
||||
use tracing::{debug, instrument};
|
||||
use async_stream::try_stream;
|
||||
|
||||
/// Established connection with a Redis server.
|
||||
///
|
||||
@@ -416,7 +416,8 @@ impl Subscriber {
|
||||
self.client.subscribe_cmd(channels).await?;
|
||||
|
||||
// Update the set of subscribed channels.
|
||||
self.subscribed_channels.extend(channels.iter().map(Clone::clone));
|
||||
self.subscribed_channels
|
||||
.extend(channels.iter().map(Clone::clone));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -462,7 +463,7 @@ impl Subscriber {
|
||||
if self.subscribed_channels.len() != len - 1 {
|
||||
return Err(response.to_error());
|
||||
}
|
||||
},
|
||||
}
|
||||
_ => return Err(response.to_error()),
|
||||
},
|
||||
frame => return Err(frame.to_error()),
|
||||
|
||||
@@ -17,7 +17,9 @@ pub struct Get {
|
||||
impl Get {
|
||||
/// Create a new `Get` command which fetches `key`.
|
||||
pub(crate) fn new(key: impl ToString) -> Get {
|
||||
Get { key: key.to_string() }
|
||||
Get {
|
||||
key: key.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a `Get` instance from a received frame.
|
||||
|
||||
@@ -25,7 +25,7 @@ pub(crate) enum Command {
|
||||
Set(Set),
|
||||
Subscribe(Subscribe),
|
||||
Unsubscribe(Unsubscribe),
|
||||
Unknown(Unknown)
|
||||
Unknown(Unknown),
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -66,7 +66,7 @@ impl Command {
|
||||
// the command is not recognized, there is most likely
|
||||
// unconsumed fields remaining in the `Parse` instance.
|
||||
return Ok(Command::Unknown(Unknown::new(command_name)));
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
// Check if there is any remaining unconsumed fields in the `Parse`
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{Connection, Db, Frame};
|
||||
use crate::cmd::{Parse, ParseError};
|
||||
use crate::{Connection, Db, Frame};
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -27,7 +27,9 @@ pub struct Unsubscribe {
|
||||
impl Subscribe {
|
||||
/// Creates a new `Subscribe` command to listen on the specified channels.
|
||||
pub(crate) fn new(channels: &[String]) -> Subscribe {
|
||||
Subscribe { channels: channels.to_vec() }
|
||||
Subscribe {
|
||||
channels: channels.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a `Subscribe` instance from a received frame.
|
||||
@@ -221,7 +223,9 @@ impl Subscribe {
|
||||
impl Unsubscribe {
|
||||
/// Create a new `Unsubscribe` command with the given `channels`.
|
||||
pub(crate) fn new(channels: &[String]) -> Unsubscribe {
|
||||
Unsubscribe { channels: channels.to_vec() }
|
||||
Unsubscribe {
|
||||
channels: channels.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a `Unsubscribe` instance from a received frame.
|
||||
|
||||
@@ -12,7 +12,9 @@ impl Unknown {
|
||||
/// Create a new `Unknown` command which responds to unknown commands
|
||||
/// issued by clients
|
||||
pub(crate) fn new(key: impl ToString) -> Unknown {
|
||||
Unknown { command_name: key.to_string() }
|
||||
Unknown {
|
||||
command_name: key.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the command name
|
||||
|
||||
@@ -54,7 +54,7 @@ impl Connection {
|
||||
/// # Returns
|
||||
///
|
||||
/// On success, the received frame is returned. If the `TcpStream`
|
||||
/// is closed in a way that doesn't break a frame in half, it retuns
|
||||
/// is closed in a way that doesn't break a frame in half, it retuns
|
||||
/// `None`. Otherwise, an error is returned.
|
||||
pub(crate) async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
|
||||
use frame::Error::Incomplete;
|
||||
|
||||
19
src/parse.rs
19
src/parse.rs
@@ -67,7 +67,11 @@ impl Parse {
|
||||
Frame::Bulk(data) => str::from_utf8(&data[..])
|
||||
.map(|s| s.to_string())
|
||||
.map_err(|_| "protocol error; invalid string".into()),
|
||||
frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()),
|
||||
frame => Err(format!(
|
||||
"protocol error; expected simple frame or bulk frame, got {:?}",
|
||||
frame
|
||||
)
|
||||
.into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +87,11 @@ impl Parse {
|
||||
// raw bytes, they are considered separate types.
|
||||
Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())),
|
||||
Frame::Bulk(data) => Ok(data),
|
||||
frame => Err(format!("protocol error; expected simple frame or bulk frame, got {:?}", frame).into()),
|
||||
frame => Err(format!(
|
||||
"protocol error; expected simple frame or bulk frame, got {:?}",
|
||||
frame
|
||||
)
|
||||
.into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,13 +143,10 @@ impl From<&str> for ParseError {
|
||||
impl fmt::Display for ParseError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
ParseError::EndOfStream => {
|
||||
"protocol error; unexpected end of stream".fmt(f)
|
||||
}
|
||||
ParseError::EndOfStream => "protocol error; unexpected end of stream".fmt(f),
|
||||
ParseError::Other(err) => err.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ParseError {
|
||||
}
|
||||
impl std::error::Error for ParseError {}
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::sync::Arc;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::{broadcast, mpsc, Semaphore};
|
||||
use tokio::time::{self, Duration};
|
||||
use tracing::{debug, error, instrument, info};
|
||||
use tracing::{debug, error, info, instrument};
|
||||
|
||||
/// Server listener state. Created in the `run` call. It includes a `run` method
|
||||
/// which performs the TCP listening and initialization of per-connection state.
|
||||
@@ -184,7 +184,11 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
|
||||
// Extract the `shutdown_complete` receiver and transmitter
|
||||
// explicitly drop `shutdown_transmitter`. This is important, as the
|
||||
// `.await` below would otherwise never complete.
|
||||
let Listener { mut shutdown_complete_rx, shutdown_complete_tx, .. } = server;
|
||||
let Listener {
|
||||
mut shutdown_complete_rx,
|
||||
shutdown_complete_tx,
|
||||
..
|
||||
} = server;
|
||||
|
||||
drop(shutdown_complete_tx);
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use mini_redis::{client, server};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
use mini_redis::{client, server};
|
||||
|
||||
/// A basic "hello world" style test. A server instance is started in a
|
||||
/// background task. A client instance is then established and set and get
|
||||
@@ -42,7 +42,10 @@ async fn receive_message_multiple_subscribed_channels() {
|
||||
let (addr, _) = start_server().await;
|
||||
|
||||
let client = client::connect(addr.clone()).await.unwrap();
|
||||
let mut subscriber = client.subscribe(vec!["hello".into(), "world".into()]).await.unwrap();
|
||||
let mut subscriber = client
|
||||
.subscribe(vec!["hello".into(), "world".into()])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut client = client::connect(addr).await.unwrap();
|
||||
@@ -58,7 +61,6 @@ async fn receive_message_multiple_subscribed_channels() {
|
||||
client.publish("world", "howdy?".into()).await.unwrap()
|
||||
});
|
||||
|
||||
|
||||
let message2 = subscriber.next_message().await.unwrap().unwrap();
|
||||
assert_eq!("world", &message2.channel);
|
||||
assert_eq!(b"howdy?", &message2.content[..])
|
||||
@@ -71,7 +73,10 @@ async fn unsubscribes_from_channels() {
|
||||
let (addr, _) = start_server().await;
|
||||
|
||||
let client = client::connect(addr.clone()).await.unwrap();
|
||||
let mut subscriber = client.subscribe(vec!["hello".into(), "world".into()]).await.unwrap();
|
||||
let mut subscriber = client
|
||||
.subscribe(vec!["hello".into(), "world".into()])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
subscriber.unsubscribe(&[]).await.unwrap();
|
||||
assert_eq!(subscriber.get_subscribed().len(), 0);
|
||||
@@ -81,10 +86,7 @@ async fn start_server() -> (SocketAddr, JoinHandle<mini_redis::Result<()>>) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
server::run(listener, tokio::signal::ctrl_c()).await
|
||||
});
|
||||
let handle = tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await });
|
||||
|
||||
(addr, handle)
|
||||
}
|
||||
|
||||
|
||||
187
tests/server.rs
187
tests/server.rs
@@ -1,6 +1,6 @@
|
||||
use mini_redis::server;
|
||||
|
||||
use std::net::{SocketAddr, Shutdown};
|
||||
use std::net::{Shutdown, SocketAddr};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::time::{self, Duration};
|
||||
@@ -17,7 +17,10 @@ async fn key_value_get_set() {
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// Get a key, data is missing
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read nil response
|
||||
let mut response = [0; 5];
|
||||
@@ -25,7 +28,10 @@ async fn key_value_get_set() {
|
||||
assert_eq!(b"$-1\r\n", &response);
|
||||
|
||||
// Set a key
|
||||
stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read OK
|
||||
let mut response = [0; 5];
|
||||
@@ -33,7 +39,10 @@ async fn key_value_get_set() {
|
||||
assert_eq!(b"+OK\r\n", &response);
|
||||
|
||||
// Get the key, data is present
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Shutdown the write half
|
||||
stream.shutdown(Shutdown::Write).unwrap();
|
||||
@@ -65,8 +74,13 @@ async fn key_value_timeout() {
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// Set a key
|
||||
stream.write_all(b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\
|
||||
+EX\r\n:1\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(
|
||||
b"*5\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n\
|
||||
+EX\r\n:1\r\n",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 5];
|
||||
|
||||
@@ -76,7 +90,10 @@ async fn key_value_timeout() {
|
||||
assert_eq!(b"+OK\r\n", &response);
|
||||
|
||||
// Get the key, data is present
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read "world" response
|
||||
let mut response = [0; 11];
|
||||
@@ -89,7 +106,10 @@ async fn key_value_timeout() {
|
||||
time::advance(Duration::from_secs(1)).await;
|
||||
|
||||
// Get a key, data is missing
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read nil response
|
||||
let mut response = [0; 5];
|
||||
@@ -107,7 +127,10 @@ async fn pub_sub() {
|
||||
|
||||
// Publish a message, there are no subscribers yet so the server will
|
||||
// return `0`.
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
@@ -116,15 +139,23 @@ async fn pub_sub() {
|
||||
// Create a subscriber. This subscriber will only subscribe to the `hello`
|
||||
// channel.
|
||||
let mut sub1 = TcpStream::connect(addr).await.unwrap();
|
||||
sub1.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
sub1.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read the subscribe response
|
||||
let mut response = [0; 34];
|
||||
sub1.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Publish a message, there now is a subscriber
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
@@ -133,31 +164,48 @@ async fn pub_sub() {
|
||||
// The first subscriber received the message
|
||||
let mut response = [0; 39];
|
||||
sub1.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\nworld\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\nworld\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Create a second subscriber
|
||||
//
|
||||
// This subscriber will be subscribed to both `hello` and `foo`
|
||||
let mut sub2 = TcpStream::connect(addr).await.unwrap();
|
||||
sub2.write_all(b"*3\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n$3\r\nfoo\r\n").await.unwrap();
|
||||
sub2.write_all(b"*3\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n$3\r\nfoo\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read the subscribe response
|
||||
let mut response = [0; 34];
|
||||
sub2.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
let mut response = [0; 32];
|
||||
sub2.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Publish another message on `hello`, there are two subscribers
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\njazzy\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\njazzy\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(b":2\r\n", &response);
|
||||
|
||||
// Publish a message on `foo`, there is only one subscriber
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
@@ -166,21 +214,32 @@ async fn pub_sub() {
|
||||
// The first subscriber received the message
|
||||
let mut response = [0; 39];
|
||||
sub1.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// The second subscriber received the message
|
||||
let mut response = [0; 39];
|
||||
sub2.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$7\r\nmessage\r\n$5\r\nhello\r\n$5\r\njazzy\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// The first subscriber **did not** receive the second message
|
||||
let mut response = [0; 1];
|
||||
time::timeout(Duration::from_millis(100), sub1.read(&mut response)).await.unwrap_err();
|
||||
time::timeout(Duration::from_millis(100), sub1.read(&mut response))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// The second subscriber **did** receive the message
|
||||
let mut response = [0; 35];
|
||||
sub2.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -191,34 +250,55 @@ async fn manage_subscription() {
|
||||
|
||||
// Create a subscriber
|
||||
let mut sub = TcpStream::connect(addr).await.unwrap();
|
||||
sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Read the subscribe response
|
||||
let mut response = [0; 34];
|
||||
sub.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Update subscription to add `foo`
|
||||
sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$3\r\nfoo\r\n").await.unwrap();
|
||||
sub.write_all(b"*2\r\n$9\r\nSUBSCRIBE\r\n$3\r\nfoo\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 32];
|
||||
sub.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:2\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Update subscription to remove `hello`
|
||||
sub.write_all(b"*2\r\n$11\r\nUNSUBSCRIBE\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
sub.write_all(b"*2\r\n$11\r\nUNSUBSCRIBE\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 37];
|
||||
sub.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$11\r\nunsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// Publish a message to `hello` and then a message to `foo`
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$5\r\nhello\r\n$5\r\nworld\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(b":0\r\n", &response);
|
||||
|
||||
publisher.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n").await.unwrap();
|
||||
publisher
|
||||
.write_all(b"*3\r\n$7\r\nPUBLISH\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
let mut response = [0; 4];
|
||||
publisher.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(b":1\r\n", &response);
|
||||
@@ -227,18 +307,28 @@ async fn manage_subscription() {
|
||||
// The second subscriber **did** receive the message
|
||||
let mut response = [0; 35];
|
||||
sub.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
// No more messages
|
||||
let mut response = [0; 1];
|
||||
time::timeout(Duration::from_millis(100), sub.read(&mut response)).await.unwrap_err();
|
||||
time::timeout(Duration::from_millis(100), sub.read(&mut response))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Unsubscribe from all channels
|
||||
sub.write_all(b"*1\r\n$11\r\nunsubscribe\r\n").await.unwrap();
|
||||
sub.write_all(b"*1\r\n$11\r\nunsubscribe\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 35];
|
||||
sub.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(&b"*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
}
|
||||
|
||||
// In this case we test that server Responds with an Error message if a client
|
||||
@@ -251,7 +341,10 @@ async fn send_error_unknown_command() {
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// Get a key, data is missing
|
||||
stream.write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nFOO\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 28];
|
||||
|
||||
@@ -269,22 +362,34 @@ async fn send_error_get_set_after_subscribe() {
|
||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
// send SUBSCRIBE command
|
||||
stream.write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 34];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
|
||||
assert_eq!(&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..], &response[..]);
|
||||
assert_eq!(
|
||||
&b"*3\r\n$9\r\nsubscribe\r\n$5\r\nhello\r\n:1\r\n"[..],
|
||||
&response[..]
|
||||
);
|
||||
|
||||
stream.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 28];
|
||||
|
||||
stream.read_exact(&mut response).await.unwrap();
|
||||
assert_eq!(b"-ERR unknown command \'set\'\r\n", &response);
|
||||
|
||||
stream.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n").await.unwrap();
|
||||
stream
|
||||
.write_all(b"*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut response = [0; 28];
|
||||
|
||||
@@ -296,9 +401,7 @@ async fn start_server() -> SocketAddr {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
server::run(listener, tokio::signal::ctrl_c()).await
|
||||
});
|
||||
tokio::spawn(async move { server::run(listener, tokio::signal::ctrl_c()).await });
|
||||
|
||||
addr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user