diff --git a/Cargo.lock b/Cargo.lock index 4639fb2..23dec1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,11 +24,6 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "anyhow" -version = "1.0.27" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "arc-swap" version = "0.4.5" @@ -47,7 +42,7 @@ name = "atty" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -99,7 +94,7 @@ dependencies = [ [[package]] name = "clap" version = "3.0.0-beta.1" -source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309" +source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403" dependencies = [ "ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -116,11 +111,11 @@ dependencies = [ [[package]] name = "clap_derive" version = "3.0.0-beta.1" -source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309" +source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403" dependencies = [ "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -215,7 +210,7 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.8" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", @@ -307,7 +302,6 @@ dependencies = [ name = "mini-redis" version = "0.1.0" dependencies = [ - "anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "clap 3.0.0-beta.1 (git+https://github.com/clap-rs/clap/)", @@ -408,7 +402,7 @@ name = "num_cpus" version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -449,7 +443,7 @@ name = "pin-project-internal" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -465,7 +459,7 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -476,7 +470,7 @@ name = "proc-macro-error-attr" version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -485,7 +479,7 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -496,7 +490,7 @@ name = "quote" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -506,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "regex" -version = "1.3.5" +version = "1.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -567,7 +561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "serde_json" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -631,7 +625,7 @@ name = "syn" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -641,7 +635,7 @@ name = "syn-mid" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -698,7 +692,7 @@ dependencies = [ [[package]] name = "tokio" version = "0.2.13" -source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23" +source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32" dependencies = [ "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -769,9 +763,9 @@ dependencies = [ [[package]] name = "tokio-macros" version = "0.2.5" -source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23" +source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32" dependencies = [ - "proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -939,9 +933,9 @@ dependencies = [ "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1016,7 +1010,6 @@ dependencies = [ "checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -"checksum anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785" "checksum arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825" "checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47" "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" @@ -1040,7 +1033,7 @@ dependencies = [ "checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" "checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" -"checksum hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8" +"checksum hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" "checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" @@ -1069,10 +1062,10 @@ dependencies = [ "checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" "checksum proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" "checksum proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -"checksum proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435" +"checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" "checksum quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" -"checksum regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8900ebc1363efa7ea1c399ccc32daed870b4002651e0bed86e72d501ebbe0048" +"checksum regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7f6946991529684867e47d86474e3a6d0c0ab9b82d5821e314b1ede31fa3a4b3" "checksum regex-automata 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" "checksum regex-syntax 0.6.17 (registry+https://github.com/rust-lang/crates.io-index)" = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" @@ -1081,7 +1074,7 @@ dependencies = [ "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" "checksum serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)" = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff" -"checksum serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)" = "9371ade75d4c2d6cb154141b9752cf3781ec9c05e0e5cf35060e1e70ee7b9c25" +"checksum serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "78a7a12c167809363ec3bd7329fc0a3369056996de43c4b37ef3cd54a6ce4867" "checksum sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" diff --git a/Cargo.toml b/Cargo.toml index 6cd7cc3..41c8e11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ name = "mini-redis" version = "0.1.0" [dependencies] -anyhow = "1.0.27" atoi = "0.3.2" bytes = "0.5.4" clap = { git = "https://github.com/clap-rs/clap/" } diff --git a/src/bin/cli.rs b/src/bin/cli.rs new file mode 100644 index 0000000..dc5379a --- /dev/null +++ b/src/bin/cli.rs @@ -0,0 +1,98 @@ +use mini_redis::{client, DEFAULT_PORT}; + +use bytes::Bytes; +use clap::Clap; +use std::num::ParseIntError; +use std::str; +use std::time::Duration; + +#[derive(Clap, Debug)] +#[clap(name = "mini-redis-cli", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Issue Redis commands")] +struct Cli { + #[clap(subcommand)] + command: Command, + + #[clap(name = "hostname", long = "--host", default_value = "127.0.0.1")] + host: String, + + #[clap(name = "port", long = "--port", default_value = DEFAULT_PORT)] + port: String, +} + +#[derive(Clap, Debug)] +enum Command { + /// Get the value of key. + Get { + /// Name of key to get + key: String + }, + /// Set key to hold the string value. + Set { + /// Name of key to set + key: String, + + /// Value to set. + #[clap(parse(from_str = bytes_from_str))] + value: Bytes, + + /// Expire the value after specified amount of time + #[clap(parse(try_from_str = duration_from_ms_str))] + expires: Option, + }, +} + +/// Entry point for CLI tool. +/// +/// The `[tokio::main]` annotation signals that the Tokio runtime should be +/// started when the function is called. The body of the function is executed +/// within the newly spawned runtime. +/// +/// `basic_scheduler` is used here to avoid spawning background threads. The CLI +/// tool use case benefits more by being lighter instead of multi-threaded. +#[tokio::main(basic_scheduler)] +async fn main() -> mini_redis::Result<()> { + // Enable logging + tracing_subscriber::fmt::try_init()?; + + // Parse command line arguments + let cli = Cli::parse(); + + // Get the remote address to connect to + let addr = format!("{}:{}", cli.host, cli.port); + + // Establish a connection + let mut client = client::connect(&addr).await?; + + match cli.command { + Command::Get { key } => { + if let Some(value) = client.get(&key).await? { + if let Ok(string) = str::from_utf8(&value) { + println!("\"{}\"", string); + } else { + println!("{:?}", value); + } + } else { + println!("(nil)"); + } + } + Command::Set { key, value, expires: None } => { + client.set(&key, value).await?; + println!("OK"); + } + Command::Set { key, value, expires: Some(expires) } => { + client.set_expires(&key, value, expires).await?; + println!("OK"); + } + } + + Ok(()) +} + +fn duration_from_ms_str(src: &str) -> Result { + let ms = src.parse::()?; + Ok(Duration::from_millis(ms)) +} + +fn bytes_from_str(src: &str) -> Bytes { + Bytes::from(src.to_string()) +} diff --git a/src/bin/client.rs b/src/bin/client.rs deleted file mode 100644 index 38b7b00..0000000 --- a/src/bin/client.rs +++ /dev/null @@ -1,49 +0,0 @@ -use clap::Clap; -use mini_redis::{client, cmd::Set, DEFAULT_PORT}; -use std::str; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let cli = Cli::parse(); - let port = cli.port.unwrap_or(DEFAULT_PORT.to_string()); - let mut client = client::connect(&format!("127.0.0.1:{}", port)).await?; - match cli.command { - Client::Get { key } => { - let result = client.get(&key).await?; - if let Some(result) = result { - println!("\"{}\"", str::from_utf8(&result).unwrap()); - } else { - println!("(nil)"); - } - Ok(()) - } - Client::Set(opts) => match client.set_with_opts(opts).await { - Ok(_) => { - println!("OK"); - Ok(()) - } - Err(e) => { - eprintln!("{}", e); - Err(e) - } - }, - } -} - -#[derive(Clap, Debug)] -#[clap(name = "mini-redis-client", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Opens a connection to a Redis server")] -struct Cli { - #[clap(subcommand)] - command: Client, - #[clap(name = "port", long = "--port")] - port: Option, -} - -#[derive(Clap, Debug)] -enum Client { - /// Gets a value associated with a key - Get { key: String }, - - /// Associates a value with a key - Set(Set), -} diff --git a/src/bin/server.rs b/src/bin/server.rs index df18fdf..ce3ea2e 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,12 +1,12 @@ -use anyhow::{anyhow, Result}; -use clap::Clap; use mini_redis::{server, DEFAULT_PORT}; +use clap::Clap; + #[tokio::main] -pub async fn main() -> Result<()> { +pub async fn main() -> mini_redis::Result<()> { // enable logging // see https://docs.rs/tracing for more info - tracing_subscriber::fmt::try_init().map_err(|e| anyhow!("{:?}", e))?; + tracing_subscriber::fmt::try_init()?; let cli = Cli::parse(); let port = cli.port.unwrap_or(DEFAULT_PORT.to_string()); diff --git a/src/client.rs b/src/client.rs index e87fd6a..0568976 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,22 +1,18 @@ -use crate::{ - cmd::{ - utils::{bytes_from_str, duration_from_ms_str}, - Set, - }, - frame::Frame, - Command, Connection, -}; +use crate::{Connection, Frame}; +use crate::cmd::{Get, Set}; use bytes::Bytes; use std::io::{Error, ErrorKind}; +use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; +use tracing::{debug, instrument}; /// Mini asynchronous Redis client pub struct Client { conn: Connection, } -pub async fn connect(addr: T) -> Result> { +pub async fn connect(addr: T) -> crate::Result { let socket = TcpStream::connect(addr).await?; let conn = Connection::new(socket); @@ -24,53 +20,89 @@ pub async fn connect(addr: T) -> Result Result, Box> { - unimplemented!(); + /// Get the value of a key + #[instrument(skip(self))] + pub async fn get(&mut self, key: &str) -> crate::Result> { + // Create a `Get` command for the `key` and convert it to a frame. + let frame = Get::new(key).into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket. + self.conn.write_frame(&frame).await?; + + // Wait for the response. + match self.read_response().await? { + Frame::Simple(value) => Ok(Some(value.into())), + Frame::Bulk(value) => Ok(Some(value)), + Frame::Null => Ok(None), + frame => Err(frame.to_error()), + } } - pub async fn set(&mut self, key: &str, value: &str) -> Result<(), Box> { - let opts = Set { + /// Set the value of a key to `value`. + #[instrument(skip(self))] + pub async fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { + self.set_cmd(Set { key: key.to_string(), - value: bytes_from_str(value), + value: value, expire: None, - }; - self.set_with_opts(opts).await + }).await } - pub async fn set_with_expiration( + /// Set the value of a key to `value`. The value expires after `expiration`. + #[instrument(skip(self))] + pub async fn set_expires( &mut self, key: &str, - value: &str, - expiration: &str, - ) -> Result<(), Box> { - let opts = Set { + value: Bytes, + expiration: Duration, + ) -> crate::Result<()> { + self.set_cmd(Set { key: key.to_string(), - value: bytes_from_str(value), - expire: Some(duration_from_ms_str(expiration)?), - }; - self.set_with_opts(opts).await + value: value.into(), + expire: Some(expiration), + }).await } - pub async fn set_with_opts(&mut self, opts: Set) -> Result<(), Box> { - let frame = Command::Set(opts).into_frame()?; + async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> { + // Convert the `Set` command into a frame + let frame = cmd.into_frame(); + + debug!(request = ?frame); + + // Write the frame to the socket self.conn.write_frame(&frame).await?; + + // Read the response + match self.read_response().await? { + Frame::Simple(response) if response == "OK" => Ok(()), + frame => Err(frame.to_error()), + } + } + + /// Reads a response frame from the socket. If an `Error` frame is read, it + /// is converted to `Err`. + async fn read_response(&mut self) -> crate::Result { let response = self.conn.read_frame().await?; - if let Some(response) = response { - match response { - Frame::Simple(response) => { - if response == "OK" { - Ok(()) - } else { - Err("unexpected response from server".into()) - } - } - _ => Err("unexpected response from server".into()), + + debug!(?response); + + match response { + Some(Frame::Error(msg)) => { + Err(msg.into()) + } + Some(frame) => Ok(frame), + None => { + // Receiving `None` here indicates the server has closed the + // connection without sending a frame. This is unexpected and is + // represented as a "connection reset by peer" error. + let err = Error::new( + ErrorKind::ConnectionReset, + "connection reset by server"); + + Err(err.into()) } - } else { - Err(Box::new(Error::new( - ErrorKind::ConnectionReset, - "connection reset by server", - ))) } } } diff --git a/src/cmd/get.rs b/src/cmd/get.rs index f16c0f6..a39fb82 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -1,6 +1,6 @@ use crate::{Connection, Db, Frame, Parse, ParseError}; -use std::io; +use bytes::Bytes; use tracing::{debug, instrument}; #[derive(Debug)] @@ -9,10 +9,14 @@ pub struct Get { } impl Get { + /// Create a new `Get` command which fetches `key`. + pub(crate) fn new(key: impl ToString) -> Get { + Get { key: key.to_string() } + } + // instrumenting functions will log all of the arguments passed to the function // with their debug implementations // see https://docs.rs/tracing/0.1.13/tracing/attr.instrument.html - #[instrument] pub(crate) fn parse_frames(parse: &mut Parse) -> Result { let key = parse.next_string()?; @@ -26,14 +30,24 @@ impl Get { Ok(Get { key }) } - #[instrument] - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + #[instrument(skip(self, db, dst))] + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { let response = if let Some(value) = db.get(&self.key) { Frame::Bulk(value) } else { Frame::Null }; + debug!(?response); - dst.write_frame(&response).await + + dst.write_frame(&response).await?; + Ok(()) + } + + pub(crate) fn into_frame(self) -> Frame { + let mut frame = Frame::array(); + frame.push_bulk(Bytes::from("get".as_bytes())); + frame.push_bulk(Bytes::from(self.key.into_bytes())); + frame } } diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 647e55e..e83c15f 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -10,13 +10,8 @@ pub use set::Set; mod subscribe; pub use subscribe::{Subscribe, Unsubscribe}; -pub(crate) mod utils; - use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown}; -use std::io; -use tracing::instrument; - #[derive(Debug)] pub(crate) enum Command { Get(Get), @@ -27,7 +22,6 @@ pub(crate) enum Command { } impl Command { - #[instrument] pub(crate) fn from_frame(frame: Frame) -> Result { let mut parse = Parse::new(frame)?; @@ -46,20 +40,12 @@ impl Command { Ok(command) } - pub(crate) fn into_frame(self) -> Result { - let frame = match self { - Command::Set(set) => set.into_frame(), - _ => unimplemented!(), - }; - Ok(frame) - } - pub(crate) async fn apply( self, db: &Db, dst: &mut Connection, shutdown: &mut Shutdown, - ) -> io::Result<()> { + ) -> crate::Result<()> { use Command::*; match self { diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs index a7a760b..7e937f2 100644 --- a/src/cmd/publish.rs +++ b/src/cmd/publish.rs @@ -1,7 +1,6 @@ use crate::{Connection, Db, Frame, Parse, ParseError}; use bytes::Bytes; -use std::io; #[derive(Debug)] pub struct Publish { @@ -17,11 +16,12 @@ impl Publish { Ok(Publish { channel, message }) } - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { // Set the value let num_subscribers = db.publish(&self.channel, self.message); let response = Frame::Integer(num_subscribers as u64); - dst.write_frame(&response).await + dst.write_frame(&response).await?; + Ok(()) } } diff --git a/src/cmd/set.rs b/src/cmd/set.rs index ec7c13d..ac6a231 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,26 +1,19 @@ -use crate::cmd::{ - utils::{bytes_from_str, duration_from_ms_str}, - Parse, ParseError, -}; use crate::{Connection, Db, Frame}; -use clap::Clap; +use crate::cmd::{Parse, ParseError}; use bytes::Bytes; -use std::io; use std::time::Duration; use tracing::{debug, instrument}; -#[derive(Clap, Debug)] +#[derive(Debug)] pub struct Set { /// the lookup key pub(crate) key: String, /// the value to be stored - #[clap(parse(from_str = bytes_from_str))] pub(crate) value: Bytes, - /// duration in milliseconds - #[clap(parse(try_from_str = duration_from_ms_str))] + /// When to expire the key pub(crate) expire: Option, } @@ -52,14 +45,15 @@ impl Set { Ok(Set { key, value, expire }) } - #[instrument] - pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> { + #[instrument(skip(db))] + pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { // Set the value db.set(self.key, self.value, self.expire); let response = Frame::Simple("OK".to_string()); debug!(?response); - dst.write_frame(&response).await + dst.write_frame(&response).await?; + Ok(()) } pub(crate) fn into_frame(self) -> Frame { diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index 9eed976..d97202e 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -1,8 +1,7 @@ -use crate::cmd::{Parse, ParseError}; use crate::{Command, Connection, Db, Frame, Shutdown}; +use crate::cmd::{Parse, ParseError}; use bytes::Bytes; -use std::io; use tokio::select; use tokio::stream::{StreamExt, StreamMap}; @@ -48,7 +47,7 @@ impl Subscribe { db: &Db, dst: &mut Connection, shutdown: &mut Shutdown, - ) -> io::Result<()> { + ) -> crate::Result<()> { // Each individual channel subscription is handled using a // `sync::broadcast` channel. Messages are then fanned out to all // clients currently subscribed to the channels. diff --git a/src/cmd/utils.rs b/src/cmd/utils.rs deleted file mode 100644 index 8c1ccd3..0000000 --- a/src/cmd/utils.rs +++ /dev/null @@ -1,11 +0,0 @@ -use bytes::Bytes; -use std::time::Duration; - -pub(crate) fn duration_from_ms_str(src: &str) -> Result { - let millis = src.parse::()?; - Ok(Duration::from_millis(millis)) -} - -pub(crate) fn bytes_from_str(src: &str) -> Bytes { - Bytes::from(src.to_string()) -} diff --git a/src/conn.rs b/src/conn.rs index a8893ec..1b1665d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BytesMut}; use std::io::{self, Cursor}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; use tokio::net::TcpStream; -use tracing::debug; + #[derive(Debug)] pub(crate) struct Connection { stream: BufStream, @@ -19,11 +19,10 @@ impl Connection { } } - pub(crate) async fn read_frame(&mut self) -> io::Result> { + pub(crate) async fn read_frame(&mut self) -> crate::Result> { use frame::Error::Incomplete; loop { - debug!(?self.buffer); let mut buf = Cursor::new(&self.buffer[..]); match Frame::check(&mut buf) { diff --git a/src/frame.rs b/src/frame.rs index 1d3b06d..4776c6c 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -1,7 +1,6 @@ -use tokio::io; - use bytes::{Buf, Bytes}; use std::convert::TryInto; +use std::fmt; use std::io::Cursor; use std::num::TryFromIntError; use std::string::FromUtf8Error; @@ -16,6 +15,7 @@ pub(crate) enum Frame { Array(Vec>), } +#[derive(Debug)] pub(crate) enum Error { /// Not enough data is available to parse a message Incomplete, @@ -113,7 +113,7 @@ impl Frame { if b'-' == peek_u8(src)? { let line = get_line(src)?; - if line != b"-1\r\n" { + if line != b"-1" { return Err(Error::Invalid); } @@ -149,19 +149,34 @@ impl Frame { } } - pub(crate) fn try_as_str(&self) -> Result { - match &self { - Frame::Simple(response) => Ok(response.to_string()), - Frame::Error(response) => Err(response.to_string()), - Frame::Integer(response) => Ok(format!("{}", response)), - Frame::Bulk(response) => Ok(format!("{:?}", response)), - Frame::Null => Ok("(nil)".to_string()), - Frame::Array(response) => { - let mut msg = "".to_string(); - for item in response { - msg.push_str(&item.try_as_str()?) + /// Converts the frame to an "unexpected frame" error + pub(crate) fn to_error(&self) -> crate::Error { + format!("unexpected frame: {}", self).into() + } +} + +impl fmt::Display for Frame { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::str; + + match self { + Frame::Simple(response) => response.fmt(fmt), + Frame::Error(msg) => write!(fmt, "error: {}", msg), + Frame::Integer(num) => num.fmt(fmt), + Frame::Bulk(msg) => match str::from_utf8(msg) { + Ok(string) => string.fmt(fmt), + Err(_) => write!(fmt, "{:?}", msg), + }, + Frame::Null => "(nil)".fmt(fmt), + Frame::Array(parts) => { + for (i, part) in parts.iter().enumerate() { + if i > 0 { + write!(fmt, " ")?; + part.fmt(fmt)?; + } } - Ok(msg) + + Ok(()) } } } @@ -221,12 +236,6 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { Err(Error::Incomplete) } -impl From for io::Error { - fn from(_src: Error) -> io::Error { - unimplemented!(); - } -} - impl From for Error { fn from(_src: FromUtf8Error) -> Error { unimplemented!(); @@ -238,3 +247,14 @@ impl From for Error { unimplemented!(); } } + +impl std::error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Incomplete => "stream ended early".fmt(fmt), + Error::Invalid => "invalid frame format".fmt(fmt), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 4d5a979..a8a28da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,3 +21,9 @@ pub mod server; mod shutdown; use shutdown::Shutdown; + +/// Error returned by most functions. +pub type Error = Box; + +/// A specialized `Result` type for mini-redis operations. +pub type Result = std::result::Result; diff --git a/src/server.rs b/src/server.rs index 0e2e661..2f25364 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,11 @@ use crate::{Command, Connection, Db, Shutdown}; -use anyhow::Result; -use tokio::io; use tokio::net::TcpListener; use tokio::signal; use tokio::sync::broadcast; +use tracing::{debug, error, instrument, info}; +#[derive(Debug)] struct Server { /// Database state db: Db, @@ -18,6 +18,7 @@ struct Server { } /// Handles a connections +#[derive(Debug)] struct Handler { /// Database state db: Db, @@ -30,7 +31,7 @@ struct Handler { } /// Run the mini-redis server. -pub async fn run(port: &str) -> Result<()> { +pub async fn run(port: &str) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); let mut server = Server { @@ -43,11 +44,11 @@ pub async fn run(port: &str) -> Result<()> { res = server.run() => { if let Err(err) = res { // TODO: gracefully handle this error - eprintln!("failed to accept; err = {}", err); + error!(cause = %err, "failed to accept"); } } _ = signal::ctrl_c() => { - println!("shutting down"); + info!("shutting down"); } } @@ -56,7 +57,9 @@ pub async fn run(port: &str) -> Result<()> { impl Server { /// Run the server - async fn run(&mut self) -> io::Result<()> { + async fn run(&mut self) -> crate::Result<()> { + info!("accepting inbound connections"); + loop { let (socket, _) = self.listener.accept().await?; @@ -67,8 +70,8 @@ impl Server { }; tokio::spawn(async move { - if let Err(e) = handler.run().await { - eprintln!("client err = {:?}", e); + if let Err(err) = handler.run().await { + error!(cause = ?err, "connection error"); } }); } @@ -76,7 +79,8 @@ impl Server { } impl Handler { - async fn run(&mut self) -> io::Result<()> { + #[instrument(skip(self))] + async fn run(&mut self) -> crate::Result<()> { while !self.shutdown.is_shutdown() { let maybe_frame = tokio::select! { res = self.connection.read_frame() => res?, @@ -92,6 +96,8 @@ impl Handler { let cmd = Command::from_frame(frame)?; + debug!(?cmd); + cmd.apply(&self.db, &mut self.connection, &mut self.shutdown) .await?; } diff --git a/src/shutdown.rs b/src/shutdown.rs index 284abcc..03c9e34 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,5 +1,6 @@ use tokio::sync::broadcast; +#[derive(Debug)] pub(crate) struct Shutdown { shutdown: bool, notify: broadcast::Receiver<()>,