add pub sub client implementation with examples (#22)

* add pub sub client implementation with examples

* replace subscribed_channels list Vec with HashSet to avoid duplicates

* update Subscriber to use async-stream instead of manual Stream impl

* revert update to error handling server.rs, as #21 handles it

* remove uneeded recursion limit extension
This commit is contained in:
João Oliveira
2020-04-05 18:33:21 +01:00
committed by GitHub
parent f187085156
commit 3fbd9ddc42
7 changed files with 363 additions and 22 deletions

View File

@@ -1,11 +1,15 @@
use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe};
use crate::{Connection, Frame};
use crate::cmd::{Get, Set};
use bytes::Bytes;
use std::io::{Error, ErrorKind};
use std::iter::FromIterator;
use std::collections::HashSet;
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::stream::Stream;
use tracing::{debug, instrument};
use async_stream::stream;
/// Mini asynchronous Redis client
pub struct Client {
@@ -47,7 +51,33 @@ impl Client {
key: key.to_string(),
value: value,
expire: None,
}).await
})
.await
}
/// publish `message` on the `channel`
#[instrument(skip(self))]
pub async fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.publish_cmd(Publish {
channel: channel.to_string(),
message: message,
})
.await
}
/// subscribe to the list of channels
/// when client sends a `SUBSCRIBE` command, server's handle for client enters a mode where only
/// `SUBSCRIBE` and `UNSUBSCRIBE` commands are allowed, so we consume client and return Subscribe type
/// which only allows `SUBSCRIBE` and `UNSUBSCRIBE` commands
#[instrument(skip(self))]
pub async fn subscribe(mut self, channels: Vec<String>) -> crate::Result<Subscriber> {
let channels = self.subscribe_cmd(Subscribe { channels: channels }).await?;
let subscribed_channels = HashSet::from_iter(channels);
Ok(Subscriber {
conn: self.conn,
subscribed_channels,
})
}
/// Set the value of a key to `value`. The value expires after `expiration`.
@@ -62,7 +92,8 @@ impl Client {
key: key.to_string(),
value: value.into(),
expire: Some(expiration),
}).await
})
.await
}
async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> {
@@ -81,6 +112,52 @@ impl Client {
}
}
async fn publish_cmd(&mut self, cmd: Publish) -> crate::Result<u64> {
// Convert the `Publish` command into a frame
let frame = cmd.into_frame();
debug!(request = ?frame);
// Write the frame to the socket
self.conn.write_frame(&frame).await?;
// Read the response
match self.read_response().await? {
Frame::Integer(response) => Ok(response),
frame => Err(frame.to_error()),
}
}
async fn subscribe_cmd(&mut self, cmd: Subscribe) -> crate::Result<Vec<String>> {
// Convert the `Subscribe` command into a frame
let channels = cmd.channels.clone();
let frame = cmd.into_frame();
debug!(request = ?frame);
// Write the frame to the socket
self.conn.write_frame(&frame).await?;
// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[subscribe, schannel]
if subscribe.to_string() == "subscribe"
&& &schannel.to_string() == channel =>
{
()
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}
Ok(channels)
}
/// Reads a response frame from the socket. If an `Error` frame is read, it
/// is converted to `Err`.
async fn read_response(&mut self) -> crate::Result<Frame> {
@@ -89,20 +166,176 @@ impl Client {
debug!(?response);
match response {
Some(Frame::Error(msg)) => {
Err(msg.into())
}
Some(Frame::Error(msg)) => Err(msg.into()),
Some(frame) => Ok(frame),
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(
ErrorKind::ConnectionReset,
"connection reset by server");
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");
Err(err.into())
}
}
}
}
pub struct Subscriber {
conn: Connection,
subscribed_channels: HashSet<String>,
}
impl Subscriber {
/// await for next message published on the subscribed channels
pub async fn next_message(&mut self) -> crate::Result<Message> {
match self.receive_message().await {
Some(message) => message,
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");
Err(err.into())
}
}
}
/// Convert the subscriber into a Stream
/// yielding new messages published on subscribed channels
pub fn into_stream(mut self) -> impl Stream<Item = crate::Result<Message>> {
stream! {
while let Some(message) = self.receive_message().await {
yield message;
}
}
}
/// Subscribe to a list of new channels
#[instrument(skip(self))]
pub async fn subscribe(&mut self, channels: Vec<String>) -> crate::Result<()> {
let cmd = Subscribe { channels: channels };
let channels = cmd.channels.clone();
let frame = cmd.into_frame();
debug!(request = ?frame);
// Write the frame to the socket
self.conn.write_frame(&frame).await?;
// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[subscribe, schannel]
if &subscribe.to_string() == "subscribe"
&& &schannel.to_string() == channel =>
{
()
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}
self.subscribed_channels.extend(channels);
Ok(())
}
/// Unsubscribe to a list of new channels
#[instrument(skip(self))]
pub async fn unsubscribe(&mut self, channels: Vec<String>) -> crate::Result<()> {
let cmd = Unsubscribe { channels: channels };
let mut channels = cmd.channels.clone();
let frame = cmd.into_frame();
debug!(request = ?frame);
// Write the frame to the socket
self.conn.write_frame(&frame).await?;
// if the input channel list is empty, server acknowledges as unsubscribing
// from all subscribed channels, so we assert that the unsubscribe list received
// matches the client subscribed one
if channels.is_empty() {
channels = Vec::from_iter(self.subscribed_channels.clone());
}
// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[unsubscribe, uchannel]
if &unsubscribe.to_string() == "unsubscribe"
&& &uchannel.to_string() == channel =>
{
self.subscribed_channels.remove(&uchannel.to_string());
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}
Ok(())
}
/// Receives a frame published from server on socket and convert it to a `Message`
/// if frame is not `Frame::Array` with proper message structure return Err
async fn receive_message(&mut self) -> Option<crate::Result<Message>> {
match self.conn.read_frame().await {
Ok(None) => None,
Err(err) => Some(Err(err.into())),
Ok(Some(mframe)) => {
debug!(?mframe);
match mframe {
Frame::Array(ref frame) => match frame.as_slice() {
[message, channel, content] if &message.to_string() == "message" => {
Some(Ok(Message {
channel: channel.to_string(),
content: Bytes::from(content.to_string()),
}))
}
_ => Some(Err(mframe.to_error())),
},
frame => Some(Err(frame.to_error())),
}
}
}
}
/// Reads a response frame to a command from the socket. If an `Error` frame is read, it
/// is converted to `Err`.
async fn read_response(&mut self) -> crate::Result<Frame> {
let response = self.conn.read_frame().await?;
debug!(?response);
match response {
Some(Frame::Error(msg)) => Err(msg.into()),
Some(frame) => Ok(frame),
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");
Err(err.into())
}
}
}
}
/// A message received on a subscribed channel
#[derive(Debug, Clone)]
pub struct Message {
pub channel: String,
pub content: Bytes,
}

View File

@@ -4,8 +4,8 @@ use bytes::Bytes;
#[derive(Debug)]
pub struct Publish {
channel: String,
message: Bytes,
pub(crate) channel: String,
pub(crate) message: Bytes,
}
impl Publish {
@@ -24,4 +24,13 @@ impl Publish {
dst.write_frame(&response).await?;
Ok(())
}
pub(crate) fn into_frame(self) -> Frame {
let mut frame = Frame::array();
frame.push_bulk(Bytes::from("publish".as_bytes()));
frame.push_bulk(Bytes::from(self.channel.into_bytes()));
frame.push_bulk(self.message);
frame
}
}

View File

@@ -1,5 +1,5 @@
use crate::{Command, Connection, Db, Frame, Shutdown};
use crate::cmd::{Parse, ParseError};
use crate::{Command, Connection, Db, Frame, Shutdown};
use bytes::Bytes;
use tokio::select;
@@ -7,12 +7,12 @@ use tokio::stream::{StreamExt, StreamMap};
#[derive(Debug)]
pub struct Subscribe {
channels: Vec<String>,
pub(crate) channels: Vec<String>,
}
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Unsubscribe {
channels: Vec<String>,
pub(crate) channels: Vec<String>,
}
impl Subscribe {
@@ -98,8 +98,8 @@ impl Subscribe {
res = dst.read_frame() => {
let frame = match res? {
Some(frame) => frame,
// How to handle remote client closing write half
None => unimplemented!(),
// How to handle remote client closing write half?
None => return Ok(())
};
// A command has been received from the client.
@@ -147,6 +147,15 @@ impl Subscribe {
};
}
}
pub(crate) fn into_frame(self) -> Frame {
let mut frame = Frame::array();
frame.push_bulk(Bytes::from("subscribe".as_bytes()));
for channel in self.channels {
frame.push_bulk(Bytes::from(channel.into_bytes()));
}
frame
}
}
impl Unsubscribe {
@@ -166,4 +175,13 @@ impl Unsubscribe {
Ok(Unsubscribe { channels })
}
pub(crate) fn into_frame(self) -> Frame {
let mut frame = Frame::array();
frame.push_bulk(Bytes::from("unsubscribe".as_bytes()));
for channel in self.channels {
frame.push_bulk(Bytes::from(channel.into_bytes()));
}
frame
}
}