From 6513e5226c2cfe1108cb89de009c29fe756d436a Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 23 Jan 2022 22:27:30 +0800 Subject: [PATCH] support PING command for easy testing (#90) Signed-off-by: tison --- src/cmd/mod.rs | 7 ++++++ src/cmd/ping.rs | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/cmd/ping.rs diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 2da5ad0..2aae08e 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -10,6 +10,9 @@ pub use set::Set; mod subscribe; pub use subscribe::{Subscribe, Unsubscribe}; +mod ping; +pub use ping::Ping; + mod unknown; pub use unknown::Unknown; @@ -25,6 +28,7 @@ pub enum Command { Set(Set), Subscribe(Subscribe), Unsubscribe(Unsubscribe), + Ping(Ping), Unknown(Unknown), } @@ -58,6 +62,7 @@ impl Command { "set" => Command::Set(Set::parse_frames(&mut parse)?), "subscribe" => Command::Subscribe(Subscribe::parse_frames(&mut parse)?), "unsubscribe" => Command::Unsubscribe(Unsubscribe::parse_frames(&mut parse)?), + "ping" => Command::Ping(Ping::parse_frames(&mut parse)?), _ => { // The command is not recognized and an Unknown command is // returned. @@ -95,6 +100,7 @@ impl Command { Publish(cmd) => cmd.apply(db, dst).await, Set(cmd) => cmd.apply(db, dst).await, Subscribe(cmd) => cmd.apply(db, dst, shutdown).await, + Ping(cmd) => cmd.apply(dst).await, Unknown(cmd) => cmd.apply(dst).await, // `Unsubscribe` cannot be applied. It may only be received from the // context of a `Subscribe` command. @@ -110,6 +116,7 @@ impl Command { Command::Set(_) => "set", Command::Subscribe(_) => "subscribe", Command::Unsubscribe(_) => "unsubscribe", + Command::Ping(_) => "ping", Command::Unknown(cmd) => cmd.get_name(), } } diff --git a/src/cmd/ping.rs b/src/cmd/ping.rs new file mode 100644 index 0000000..ea8bf7c --- /dev/null +++ b/src/cmd/ping.rs @@ -0,0 +1,66 @@ +use crate::{Connection, Frame, Parse, ParseError}; +use bytes::Bytes; +use tracing::{debug, instrument}; + +/// Returns PONG if no argument is provided, otherwise +/// return a copy of the argument as a bulk. +/// +/// This command is often used to test if a connection +/// is still alive, or to measure latency. +#[derive(Debug, Default)] +pub struct Ping { + /// optional message to be returned + msg: Option, +} + +impl Ping { + /// Create a new `Ping` command with optional `msg`. + pub fn new(msg: Option) -> Ping { + Ping { msg } + } + + /// Parse a `Ping` instance from a received frame. + /// + /// The `Parse` argument provides a cursor-like API to read fields from the + /// `Frame`. At this point, the entire frame has already been received from + /// the socket. + /// + /// The `PING` string has already been consumed. + /// + /// # Returns + /// + /// Returns the `Ping` value on success. If the frame is malformed, `Err` is + /// returned. + /// + /// # Format + /// + /// Expects an array frame containing `PING` and an optional message. + /// + /// ```text + /// PING [message] + /// ``` + pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + match parse.next_string() { + Ok(msg) => Ok(Ping::new(Some(msg))), + Err(ParseError::EndOfStream) => Ok(Ping::default()), + Err(e) => Err(e.into()), + } + } + + /// Apply the `Ping` command and return the message. + /// + /// The response is written to `dst`. This is called by the server in order + /// to execute a received command. + #[instrument(skip(self, dst))] + pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> { + let response = match self.msg { + None => Frame::Simple("PONG".to_string()), + Some(msg) => Frame::Bulk(Bytes::from(msg)), + }; + + // Write the response back to the client + dst.write_frame(&response).await?; + + Ok(()) + } +}