From bbb80c341e0584c67a01148b0f5ba6db417bc4d6 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 1 Apr 2020 16:09:41 -0700 Subject: [PATCH] apply client/cli polish (#15) Continuation of #11. Refines the client structure and implements GET. `clap` is decoupled from the lib code. This is done to avoid any CLI parsing concerns to leak into the lib. The main motivation for this is to allow the reader to focus on Tokio concerns and not CLI parsing concerns. --- Cargo.lock | 55 +++++++++----------- Cargo.toml | 1 - src/bin/cli.rs | 98 ++++++++++++++++++++++++++++++++++++ src/bin/client.rs | 49 ------------------ src/bin/server.rs | 8 +-- src/client.rs | 116 +++++++++++++++++++++++++++---------------- src/cmd/get.rs | 24 +++++++-- src/cmd/mod.rs | 16 +----- src/cmd/publish.rs | 6 +-- src/cmd/set.rs | 20 +++----- src/cmd/subscribe.rs | 5 +- src/cmd/utils.rs | 11 ---- src/conn.rs | 5 +- src/frame.rs | 62 +++++++++++++++-------- src/lib.rs | 6 +++ src/server.rs | 24 +++++---- src/shutdown.rs | 1 + 17 files changed, 297 insertions(+), 210 deletions(-) create mode 100644 src/bin/cli.rs delete mode 100644 src/bin/client.rs delete mode 100644 src/cmd/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 4639fb2..23dec1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,11 +24,6 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "anyhow" -version = "1.0.27" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "arc-swap" version = "0.4.5" @@ -47,7 +42,7 @@ name = "atty" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -99,7 +94,7 @@ dependencies = [ [[package]] name = "clap" version = "3.0.0-beta.1" -source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309" +source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403" dependencies = [ "ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -116,11 +111,11 @@ dependencies = [ [[package]] name = "clap_derive" version = "3.0.0-beta.1" -source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309" +source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403" dependencies = [ "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -215,7 +210,7 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.8" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", @@ -307,7 +302,6 @@ dependencies = [ name = "mini-redis" version = "0.1.0" dependencies = [ - "anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "clap 3.0.0-beta.1 (git+https://github.com/clap-rs/clap/)", @@ -408,7 +402,7 @@ name = "num_cpus" version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -449,7 +443,7 @@ name = "pin-project-internal" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -465,7 +459,7 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -476,7 +470,7 @@ name = "proc-macro-error-attr" version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -485,7 +479,7 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -496,7 +490,7 @@ name = "quote" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -506,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "regex" -version = "1.3.5" +version = "1.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -567,7 +561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde_json" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -631,7 +625,7 @@ name = "syn" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -641,7 +635,7 @@ name = "syn-mid" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -698,7 +692,7 @@ dependencies = [ [[package]] name = "tokio" version = "0.2.13" -source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23" +source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32" dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -769,9 +763,9 @@ dependencies = [ [[package]] name = "tokio-macros" version = "0.2.5" -source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23" +source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -939,9 +933,9 @@ dependencies = [ "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1016,7 +1010,6 @@ dependencies = [ "checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -"checksum anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" "checksum arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825" "checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47" "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" @@ -1040,7 +1033,7 @@ dependencies = [ "checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" "checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" -"checksum hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8" +"checksum hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" "checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" @@ -1069,10 +1062,10 @@ dependencies = [ "checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" "checksum proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" "checksum proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -"checksum proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" +"checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" "checksum quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" -"checksum regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8900ebc1363efa7ea1c399ccc32daed870b4002651e0bed86e72d501ebbe0048" +"checksum regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7f6946991529684867e47d86474e3a6d0c0ab9b82d5821e314b1ede31fa3a4b3" "checksum regex-automata 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" "checksum regex-syntax 0.6.17 (registry+https://github.com/rust-lang/crates.io-index)" = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" @@ -1081,7 +1074,7 @@ dependencies = [ "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" "checksum serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)" = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" -"checksum serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)" = "9371ade75d4c2d6cb154141b9752cf3781ec9c05e0e5cf35060e1e70ee7b9c25" +"checksum serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "78a7a12c167809363ec3bd7329fc0a3369056996de43c4b37ef3cd54a6ce4867" "checksum sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" diff --git a/Cargo.toml b/Cargo.toml index 6cd7cc3..41c8e11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ name = "mini-redis" version = "0.1.0" [dependencies] -anyhow = "1.0.27" atoi = "0.3.2" bytes = "0.5.4" clap = { git = "https://github.com/clap-rs/clap/" } diff --git a/src/bin/cli.rs b/src/bin/cli.rs new file mode 100644 index 0000000..dc5379a --- /dev/null +++ b/src/bin/cli.rs @@ -0,0 +1,98 @@ +use mini_redis::{client, DEFAULT_PORT}; + +use bytes::Bytes; +use clap::Clap; +use std::num::ParseIntError; +use std::str; +use std::time::Duration; + +#[derive(Clap, Debug)] +#[clap(name = "mini-redis-cli", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Issue Redis commands")] +struct Cli { + #[clap(subcommand)] + command: Command, + + #[clap(name = "hostname", long = "--host", default_value = "127.0.0.1")] + host: String, + + #[clap(name = "port", long = "--port", default_value = DEFAULT_PORT)] + port: String, +} + +#[derive(Clap, Debug)] +enum Command { + /// Get the value of key. + Get { + /// Name of key to get + key: String + }, + /// Set key to hold the string value. + Set { + /// Name of key to set + key: String, + + /// Value to set. + #[clap(parse(from_str = bytes_from_str))] + value: Bytes, + + /// Expire the value after specified amount of time + #[clap(parse(try_from_str = duration_from_ms_str))] + expires: Option, + }, +} + +/// Entry point for CLI tool. +/// +/// The `[tokio::main]` annotation signals that the Tokio runtime should be +/// started when the function is called. The body of the function is executed +/// within the newly spawned runtime. +/// +/// `basic_scheduler` is used here to avoid spawning background threads. The CLI +/// tool use case benefits more by being lighter instead of multi-threaded. +#[tokio::main(basic_scheduler)] +async fn main() -> mini_redis::Result<()> { + // Enable logging + tracing_subscriber::fmt::try_init()?; + + // Parse command line arguments + let cli = Cli::parse(); + + // Get the remote address to connect to + let addr = format!("{}:{}", cli.host, cli.port); + + // Establish a connection + let mut client = client::connect(&addr).await?; + + match cli.command { + Command::Get { key } => { + if let Some(value) = client.get(&key).await? { + if let Ok(string) = str::from_utf8(&value) { + println!("\"{}\"", string); + } else { + println!("{:?}", value); + } + } else { + println!("(nil)"); + } + } + Command::Set { key, value, expires: None } => { + client.set(&key, value).await?; + println!("OK"); + } + Command::Set { key, value, expires: Some(expires) } => { + client.set_expires(&key, value, expires).await?; + println!("OK"); + } + } + + Ok(()) +} + +fn duration_from_ms_str(src: &str) -> Result { + let ms = src.parse::()?; + Ok(Duration::from_millis(ms)) +} + +fn bytes_from_str(src: &str) -> Bytes { + Bytes::from(src.to_string()) +} diff --git a/src/bin/client.rs b/src/bin/client.rs deleted file mode 100644 index 38b7b00..0000000 --- a/src/bin/client.rs +++ /dev/null @@ -1,49 +0,0 @@ -use clap::Clap; -use mini_redis::{client, cmd::Set, DEFAULT_PORT}; -use std::str; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let cli = Cli::parse(); - let port = cli.port.unwrap_or(DEFAULT_PORT.to_string()); - let mut client = client::connect(&format!("127.0.0.1:{}", port)).await?; - match cli.command { - Client::Get { key } => { - let result = client.get(&key).await?; - if let Some(result) = result { - println!("\"{}\"", str::from_utf8(&result).unwrap()); - } else { - println!("(nil)"); - } - Ok(()) - } - Client::Set(opts) => match client.set_with_opts(opts).await { - Ok(_) => { - println!("OK"); - Ok(()) - } - Err(e) => { - eprintln!("{}", e); - Err(e) - } - }, - } -} - -#[derive(Clap, Debug)] -#[clap(name = "mini-redis-client", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Opens a connection to a Redis server")] -struct Cli { - #[clap(subcommand)] - command: Client, - #[clap(name = "port", long = "--port")] - port: Option, -} - -#[derive(Clap, Debug)] -enum Client { - /// Gets a value associated with a key - Get { key: String }, - - /// Associates a value with a key - Set(Set), -} diff --git a/src/bin/server.rs b/src/bin/server.rs index df18fdf..ce3ea2e 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,12 +1,12 @@ -use anyhow::{anyhow, Result}; -use clap::Clap; use mini_redis::{server, DEFAULT_PORT}; +use clap::Clap; + #[tokio::main] -pub async fn main() -> Result<()> { +pub async fn main() -> mini_redis::Result<()> { // enable logging // see https://docs.rs/tracing for more info - tracing_subscriber::fmt::try_init().map_err(|e| anyhow!("{:?}", e))?; + tracing_subscriber::fmt::try_init()?; let cli = Cli::parse(); let port = cli.port.unwrap_or(DEFAULT_PORT.to_string()); diff --git a/src/client.rs b/src/client.rs index e87fd6a..0568976 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,22 +1,18 @@ -use crate::{ - cmd::{ - utils::{bytes_from_str, duration_from_ms_str}, - Set, - }, - frame::Frame, - Command, Connection, -}; +use crate::{Connection, Frame}; +use crate::cmd::{Get, Set}; use bytes::Bytes; use std::io::{Error, ErrorKind}; +use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; +use tracing::{debug, instrument}; /// Mini asynchronous Redis client pub struct Client { conn: Connection, } -pub async fn connect(addr: T) -> Result> { +pub async fn connect(addr: T) -> crate::Result { let socket = TcpStream::connect(addr).await?; let conn = Connection::new(socket); @@ -24,53 +20,89 @@ pub async fn connect(addr: T) -> Result Result, Box> { - unimplemented!(); + /// Get the value of a key + #[instrument(skip(self))] + pub async fn get(&mut self, key: &str) -> crate::Result> { + // Create a `Get` command for the `key` and convert it to a frame. + let frame = Get::new(key).into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket. + self.conn.write_frame(&frame).await?; + + // Wait for the response. + match self.read_response().await? { + Frame::Simple(value) => Ok(Some(value.into())), + Frame::Bulk(value) => Ok(Some(value)), + Frame::Null => Ok(None), + frame => Err(frame.to_error()), + } } - pub async fn set(&mut self, key: &str, value: &str) -> Result<(), Box> { - let opts = Set { + /// Set the value of a key to `value`. + #[instrument(skip(self))] + pub async fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { + self.set_cmd(Set { key: key.to_string(), - value: bytes_from_str(value), + value: value, expire: None, - }; - self.set_with_opts(opts).await + }).await } - pub async fn set_with_expiration( + /// Set the value of a key to `value`. The value expires after `expiration`. + #[instrument(skip(self))] + pub async fn set_expires( &mut self, key: &str, - value: &str, - expiration: &str, - ) -> Result<(), Box> { - let opts = Set { + value: Bytes, + expiration: Duration, + ) -> crate::Result<()> { + self.set_cmd(Set { key: key.to_string(), - value: bytes_from_str(value), - expire: Some(duration_from_ms_str(expiration)?), - }; - self.set_with_opts(opts).await + value: value.into(), + expire: Some(expiration), + }).await } - pub async fn set_with_opts(&mut self, opts: Set) -> Result<(), Box> { - let frame = Command::Set(opts).into_frame()?; + async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> { + // Convert the `Set` command into a frame + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket self.conn.write_frame(&frame).await?; + + // Read the response + match self.read_response().await? { + Frame::Simple(response) if response == "OK" => Ok(()), + frame => Err(frame.to_error()), + } + } + + /// Reads a response frame from the socket. If an `Error` frame is read, it + /// is converted to `Err`. + async fn read_response(&mut self) -> crate::Result { let response = self.conn.read_frame().await?; - if let Some(response) = response { - match response { - Frame::Simple(response) => { - if response == "OK" { - Ok(()) - } else { - Err("unexpected response from server".into()) - } - } - _ => Err("unexpected response from server".into()), + + debug!(?response); + + match response { + Some(Frame::Error(msg)) => { + Err(msg.into()) + } + Some(frame) => Ok(frame), + None => { + // Receiving `None` here indicates the server has closed the + // connection without sending a frame. This is unexpected and is + // represented as a "connection reset by peer" error. + let err = Error::new( + ErrorKind::ConnectionReset, + "connection reset by server"); + + Err(err.into()) } - } else { - Err(Box::new(Error::new( - ErrorKind::ConnectionReset, - "connection reset by server", - ))) } } } diff --git a/src/cmd/get.rs b/src/cmd/get.rs index f16c0f6..a39fb82 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -1,6 +1,6 @@ use crate::{Connection, Db, Frame, Parse, ParseError}; -use std::io; +use bytes::Bytes; use tracing::{debug, instrument}; #[derive(Debug)] @@ -9,10 +9,14 @@ pub struct Get { } impl Get { + /// Create a new `Get` command which fetches `key`. + pub(crate) fn new(key: impl ToString) -> Get { + Get { key: key.to_string() } + } + // instrumenting functions will log all of the arguments passed to the function // with their debug implementations // see https://docs.rs/tracing/0.1.13/tracing/attr.instrument.html - #[instrument] pub(crate) fn parse_frames(parse: &mut Parse) -> Result { let key = parse.next_string()?; @@ -26,14 +30,24 @@ impl Get { Ok(Get { key }) } - #[instrument] - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + #[instrument(skip(self, db, dst))] + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { let response = if let Some(value) = db.get(&self.key) { Frame::Bulk(value) } else { Frame::Null }; + debug!(?response); - dst.write_frame(&response).await + + dst.write_frame(&response).await?; + Ok(()) + } + + pub(crate) fn into_frame(self) -> Frame { + let mut frame = Frame::array(); + frame.push_bulk(Bytes::from("get".as_bytes())); + frame.push_bulk(Bytes::from(self.key.into_bytes())); + frame } } diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 647e55e..e83c15f 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -10,13 +10,8 @@ pub use set::Set; mod subscribe; pub use subscribe::{Subscribe, Unsubscribe}; -pub(crate) mod utils; - use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown}; -use std::io; -use tracing::instrument; - #[derive(Debug)] pub(crate) enum Command { Get(Get), @@ -27,7 +22,6 @@ pub(crate) enum Command { } impl Command { - #[instrument] pub(crate) fn from_frame(frame: Frame) -> Result { let mut parse = Parse::new(frame)?; @@ -46,20 +40,12 @@ impl Command { Ok(command) } - pub(crate) fn into_frame(self) -> Result { - let frame = match self { - Command::Set(set) => set.into_frame(), - _ => unimplemented!(), - }; - Ok(frame) - } - pub(crate) async fn apply( self, db: &Db, dst: &mut Connection, shutdown: &mut Shutdown, - ) -> io::Result<()> { + ) -> crate::Result<()> { use Command::*; match self { diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs index a7a760b..7e937f2 100644 --- a/src/cmd/publish.rs +++ b/src/cmd/publish.rs @@ -1,7 +1,6 @@ use crate::{Connection, Db, Frame, Parse, ParseError}; use bytes::Bytes; -use std::io; #[derive(Debug)] pub struct Publish { @@ -17,11 +16,12 @@ impl Publish { Ok(Publish { channel, message }) } - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { // Set the value let num_subscribers = db.publish(&self.channel, self.message); let response = Frame::Integer(num_subscribers as u64); - dst.write_frame(&response).await + dst.write_frame(&response).await?; + Ok(()) } } diff --git a/src/cmd/set.rs b/src/cmd/set.rs index ec7c13d..ac6a231 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,26 +1,19 @@ -use crate::cmd::{ - utils::{bytes_from_str, duration_from_ms_str}, - Parse, ParseError, -}; use crate::{Connection, Db, Frame}; -use clap::Clap; +use crate::cmd::{Parse, ParseError}; use bytes::Bytes; -use std::io; use std::time::Duration; use tracing::{debug, instrument}; -#[derive(Clap, Debug)] +#[derive(Debug)] pub struct Set { /// the lookup key pub(crate) key: String, /// the value to be stored - #[clap(parse(from_str = bytes_from_str))] pub(crate) value: Bytes, - /// duration in milliseconds - #[clap(parse(try_from_str = duration_from_ms_str))] + /// When to expire the key pub(crate) expire: Option, } @@ -52,14 +45,15 @@ impl Set { Ok(Set { key, value, expire }) } - #[instrument] - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + #[instrument(skip(db))] + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { // Set the value db.set(self.key, self.value, self.expire); let response = Frame::Simple("OK".to_string()); debug!(?response); - dst.write_frame(&response).await + dst.write_frame(&response).await?; + Ok(()) } pub(crate) fn into_frame(self) -> Frame { diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index 9eed976..d97202e 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -1,8 +1,7 @@ -use crate::cmd::{Parse, ParseError}; use crate::{Command, Connection, Db, Frame, Shutdown}; +use crate::cmd::{Parse, ParseError}; use bytes::Bytes; -use std::io; use tokio::select; use tokio::stream::{StreamExt, StreamMap}; @@ -48,7 +47,7 @@ impl Subscribe { db: &Db, dst: &mut Connection, shutdown: &mut Shutdown, - ) -> io::Result<()> { + ) -> crate::Result<()> { // Each individual channel subscription is handled using a // `sync::broadcast` channel. Messages are then fanned out to all // clients currently subscribed to the channels. diff --git a/src/cmd/utils.rs b/src/cmd/utils.rs deleted file mode 100644 index 8c1ccd3..0000000 --- a/src/cmd/utils.rs +++ /dev/null @@ -1,11 +0,0 @@ -use bytes::Bytes; -use std::time::Duration; - -pub(crate) fn duration_from_ms_str(src: &str) -> Result { - let millis = src.parse::()?; - Ok(Duration::from_millis(millis)) -} - -pub(crate) fn bytes_from_str(src: &str) -> Bytes { - Bytes::from(src.to_string()) -} diff --git a/src/conn.rs b/src/conn.rs index a8893ec..1b1665d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BytesMut}; use std::io::{self, Cursor}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::net::TcpStream; -use tracing::debug; + #[derive(Debug)] pub(crate) struct Connection { stream: BufStream, @@ -19,11 +19,10 @@ impl Connection { } } - pub(crate) async fn read_frame(&mut self) -> io::Result> { + pub(crate) async fn read_frame(&mut self) -> crate::Result> { use frame::Error::Incomplete; loop { - debug!(?self.buffer); let mut buf = Cursor::new(&self.buffer[..]); match Frame::check(&mut buf) { diff --git a/src/frame.rs b/src/frame.rs index 1d3b06d..4776c6c 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -1,7 +1,6 @@ -use tokio::io; - use bytes::{Buf, Bytes}; use std::convert::TryInto; +use std::fmt; use std::io::Cursor; use std::num::TryFromIntError; use std::string::FromUtf8Error; @@ -16,6 +15,7 @@ pub(crate) enum Frame { Array(Vec>), } +#[derive(Debug)] pub(crate) enum Error { /// Not enough data is available to parse a message Incomplete, @@ -113,7 +113,7 @@ impl Frame { if b'-' == peek_u8(src)? { let line = get_line(src)?; - if line != b"-1\r\n" { + if line != b"-1" { return Err(Error::Invalid); } @@ -149,19 +149,34 @@ impl Frame { } } - pub(crate) fn try_as_str(&self) -> Result { - match &self { - Frame::Simple(response) => Ok(response.to_string()), - Frame::Error(response) => Err(response.to_string()), - Frame::Integer(response) => Ok(format!("{}", response)), - Frame::Bulk(response) => Ok(format!("{:?}", response)), - Frame::Null => Ok("(nil)".to_string()), - Frame::Array(response) => { - let mut msg = "".to_string(); - for item in response { - msg.push_str(&item.try_as_str()?) + /// Converts the frame to an "unexpected frame" error + pub(crate) fn to_error(&self) -> crate::Error { + format!("unexpected frame: {}", self).into() + } +} + +impl fmt::Display for Frame { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::str; + + match self { + Frame::Simple(response) => response.fmt(fmt), + Frame::Error(msg) => write!(fmt, "error: {}", msg), + Frame::Integer(num) => num.fmt(fmt), + Frame::Bulk(msg) => match str::from_utf8(msg) { + Ok(string) => string.fmt(fmt), + Err(_) => write!(fmt, "{:?}", msg), + }, + Frame::Null => "(nil)".fmt(fmt), + Frame::Array(parts) => { + for (i, part) in parts.iter().enumerate() { + if i > 0 { + write!(fmt, " ")?; + part.fmt(fmt)?; + } } - Ok(msg) + + Ok(()) } } } @@ -221,12 +236,6 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { Err(Error::Incomplete) } -impl From for io::Error { - fn from(_src: Error) -> io::Error { - unimplemented!(); - } -} - impl From for Error { fn from(_src: FromUtf8Error) -> Error { unimplemented!(); @@ -238,3 +247,14 @@ impl From for Error { unimplemented!(); } } + +impl std::error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Incomplete => "stream ended early".fmt(fmt), + Error::Invalid => "invalid frame format".fmt(fmt), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 4d5a979..a8a28da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,3 +21,9 @@ pub mod server; mod shutdown; use shutdown::Shutdown; + +/// Error returned by most functions. +pub type Error = Box; + +/// A specialized `Result` type for mini-redis operations. +pub type Result = std::result::Result; diff --git a/src/server.rs b/src/server.rs index 0e2e661..2f25364 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,11 @@ use crate::{Command, Connection, Db, Shutdown}; -use anyhow::Result; -use tokio::io; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::broadcast; +use tracing::{debug, error, instrument, info}; +#[derive(Debug)] struct Server { /// Database state db: Db, @@ -18,6 +18,7 @@ struct Server { } /// Handles a connections +#[derive(Debug)] struct Handler { /// Database state db: Db, @@ -30,7 +31,7 @@ struct Handler { } /// Run the mini-redis server. -pub async fn run(port: &str) -> Result<()> { +pub async fn run(port: &str) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); let mut server = Server { @@ -43,11 +44,11 @@ pub async fn run(port: &str) -> Result<()> { res = server.run() => { if let Err(err) = res { // TODO: gracefully handle this error - eprintln!("failed to accept; err = {}", err); + error!(cause = %err, "failed to accept"); } } _ = signal::ctrl_c() => { - println!("shutting down"); + info!("shutting down"); } } @@ -56,7 +57,9 @@ pub async fn run(port: &str) -> Result<()> { impl Server { /// Run the server - async fn run(&mut self) -> io::Result<()> { + async fn run(&mut self) -> crate::Result<()> { + info!("accepting inbound connections"); + loop { let (socket, _) = self.listener.accept().await?; @@ -67,8 +70,8 @@ impl Server { }; tokio::spawn(async move { - if let Err(e) = handler.run().await { - eprintln!("client err = {:?}", e); + if let Err(err) = handler.run().await { + error!(cause = ?err, "connection error"); } }); } @@ -76,7 +79,8 @@ impl Server { } impl Handler { - async fn run(&mut self) -> io::Result<()> { + #[instrument(skip(self))] + async fn run(&mut self) -> crate::Result<()> { while !self.shutdown.is_shutdown() { let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, @@ -92,6 +96,8 @@ impl Handler { let cmd = Command::from_frame(frame)?; + debug!(?cmd); + cmd.apply(&self.db, &mut self.connection, &mut self.shutdown) .await?; } diff --git a/src/shutdown.rs b/src/shutdown.rs index 284abcc..03c9e34 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,5 +1,6 @@ use tokio::sync::broadcast; +#[derive(Debug)] pub(crate) struct Shutdown { shutdown: bool, notify: broadcast::Receiver<()>,