apply client/cli polish (#15)

Continuation of #11. Refines the client structure and implements GET.

`clap` is decoupled from the lib code. This is done to avoid any CLI
parsing concerns to leak into the lib. The main motivation for this is
to allow the reader to focus on Tokio concerns and not CLI parsing
concerns.
This commit is contained in:
Carl Lerche
2020-04-01 16:09:41 -07:00
committed by GitHub
parent 7bd7086d41
commit bbb80c341e
17 changed files with 297 additions and 210 deletions

55
Cargo.lock generated
View File

@@ -24,11 +24,6 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "anyhow"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "arc-swap"
version = "0.4.5"
@@ -47,7 +42,7 @@ name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -99,7 +94,7 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.0-beta.1"
source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309"
source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403"
dependencies = [
"ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -116,11 +111,11 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "3.0.0-beta.1"
source = "git+https://github.com/clap-rs/clap/#7a652054dc4f89f3b1831dfe69aad7fba6a99309"
source = "git+https://github.com/clap-rs/clap/#37889c661134e8286102f7d2ab3267965d010403"
dependencies = [
"heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -215,7 +210,7 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.1.8"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -307,7 +302,6 @@ dependencies = [
name = "mini-redis"
version = "0.1.0"
dependencies = [
"anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 3.0.0-beta.1 (git+https://github.com/clap-rs/clap/)",
@@ -408,7 +402,7 @@ name = "num_cpus"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -449,7 +443,7 @@ name = "pin-project-internal"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -465,7 +459,7 @@ version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
"version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -476,7 +470,7 @@ name = "proc-macro-error-attr"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
"syn-mid 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -485,7 +479,7 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.9"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -496,7 +490,7 @@ name = "quote"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -506,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "regex"
version = "1.3.5"
version = "1.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -567,7 +561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_json"
version = "1.0.48"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -631,7 +625,7 @@ name = "syn"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -641,7 +635,7 @@ name = "syn-mid"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -698,7 +692,7 @@ dependencies = [
[[package]]
name = "tokio"
version = "0.2.13"
source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23"
source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32"
dependencies = [
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -769,9 +763,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "0.2.5"
source = "git+https://github.com/tokio-rs/tokio#57ba37c97854d32e691ea68006c8d69d58c79b23"
source = "git+https://github.com/tokio-rs/tokio#caa7e180e43fdf914774de86f01f88e6b41f4a32"
dependencies = [
"proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -939,9 +933,9 @@ dependencies = [
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1016,7 +1010,6 @@ dependencies = [
"checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum ansi_term 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
"checksum anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785"
"checksum arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825"
"checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47"
"checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
@@ -1040,7 +1033,7 @@ dependencies = [
"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef"
"checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hermit-abi 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8"
"checksum hermit-abi 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e"
"checksum indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e"
@@ -1069,10 +1062,10 @@ dependencies = [
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum proc-macro-error 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7"
"checksum proc-macro-error-attr 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de"
"checksum proc-macro2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6c09721c6781493a2a492a96b5a5bf19b65917fe6728884e7c44dd0c60ca3435"
"checksum proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)" = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3"
"checksum quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f"
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
"checksum regex 1.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8900ebc1363efa7ea1c399ccc32daed870b4002651e0bed86e72d501ebbe0048"
"checksum regex 1.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7f6946991529684867e47d86474e3a6d0c0ab9b82d5821e314b1ede31fa3a4b3"
"checksum regex-automata 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4"
"checksum regex-syntax 0.6.17 (registry+https://github.com/rust-lang/crates.io-index)" = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae"
"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
@@ -1081,7 +1074,7 @@ dependencies = [
"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
"checksum serde 1.0.105 (registry+https://github.com/rust-lang/crates.io-index)" = "e707fbbf255b8fc8c3b99abb91e7257a622caeb20a9818cbadbeeede4e0932ff"
"checksum serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)" = "9371ade75d4c2d6cb154141b9752cf3781ec9c05e0e5cf35060e1e70ee7b9c25"
"checksum serde_json 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "78a7a12c167809363ec3bd7329fc0a3369056996de43c4b37ef3cd54a6ce4867"
"checksum sharded-slab 0.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1"
"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"

View File

@@ -5,7 +5,6 @@ name = "mini-redis"
version = "0.1.0"
[dependencies]
anyhow = "1.0.27"
atoi = "0.3.2"
bytes = "0.5.4"
clap = { git = "https://github.com/clap-rs/clap/" }

98
src/bin/cli.rs Normal file
View File

@@ -0,0 +1,98 @@
use mini_redis::{client, DEFAULT_PORT};
use bytes::Bytes;
use clap::Clap;
use std::num::ParseIntError;
use std::str;
use std::time::Duration;
#[derive(Clap, Debug)]
#[clap(name = "mini-redis-cli", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Issue Redis commands")]
struct Cli {
#[clap(subcommand)]
command: Command,
#[clap(name = "hostname", long = "--host", default_value = "127.0.0.1")]
host: String,
#[clap(name = "port", long = "--port", default_value = DEFAULT_PORT)]
port: String,
}
#[derive(Clap, Debug)]
enum Command {
/// Get the value of key.
Get {
/// Name of key to get
key: String
},
/// Set key to hold the string value.
Set {
/// Name of key to set
key: String,
/// Value to set.
#[clap(parse(from_str = bytes_from_str))]
value: Bytes,
/// Expire the value after specified amount of time
#[clap(parse(try_from_str = duration_from_ms_str))]
expires: Option<Duration>,
},
}
/// Entry point for CLI tool.
///
/// The `[tokio::main]` annotation signals that the Tokio runtime should be
/// started when the function is called. The body of the function is executed
/// within the newly spawned runtime.
///
/// `basic_scheduler` is used here to avoid spawning background threads. The CLI
/// tool use case benefits more by being lighter instead of multi-threaded.
#[tokio::main(basic_scheduler)]
async fn main() -> mini_redis::Result<()> {
// Enable logging
tracing_subscriber::fmt::try_init()?;
// Parse command line arguments
let cli = Cli::parse();
// Get the remote address to connect to
let addr = format!("{}:{}", cli.host, cli.port);
// Establish a connection
let mut client = client::connect(&addr).await?;
match cli.command {
Command::Get { key } => {
if let Some(value) = client.get(&key).await? {
if let Ok(string) = str::from_utf8(&value) {
println!("\"{}\"", string);
} else {
println!("{:?}", value);
}
} else {
println!("(nil)");
}
}
Command::Set { key, value, expires: None } => {
client.set(&key, value).await?;
println!("OK");
}
Command::Set { key, value, expires: Some(expires) } => {
client.set_expires(&key, value, expires).await?;
println!("OK");
}
}
Ok(())
}
fn duration_from_ms_str(src: &str) -> Result<Duration, ParseIntError> {
let ms = src.parse::<u64>()?;
Ok(Duration::from_millis(ms))
}
fn bytes_from_str(src: &str) -> Bytes {
Bytes::from(src.to_string())
}

View File

@@ -1,49 +0,0 @@
use clap::Clap;
use mini_redis::{client, cmd::Set, DEFAULT_PORT};
use std::str;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
let port = cli.port.unwrap_or(DEFAULT_PORT.to_string());
let mut client = client::connect(&format!("127.0.0.1:{}", port)).await?;
match cli.command {
Client::Get { key } => {
let result = client.get(&key).await?;
if let Some(result) = result {
println!("\"{}\"", str::from_utf8(&result).unwrap());
} else {
println!("(nil)");
}
Ok(())
}
Client::Set(opts) => match client.set_with_opts(opts).await {
Ok(_) => {
println!("OK");
Ok(())
}
Err(e) => {
eprintln!("{}", e);
Err(e)
}
},
}
}
#[derive(Clap, Debug)]
#[clap(name = "mini-redis-client", version = env!("CARGO_PKG_VERSION"), author = env!("CARGO_PKG_AUTHORS"), about = "Opens a connection to a Redis server")]
struct Cli {
#[clap(subcommand)]
command: Client,
#[clap(name = "port", long = "--port")]
port: Option<String>,
}
#[derive(Clap, Debug)]
enum Client {
/// Gets a value associated with a key
Get { key: String },
/// Associates a value with a key
Set(Set),
}

View File

@@ -1,12 +1,12 @@
use anyhow::{anyhow, Result};
use clap::Clap;
use mini_redis::{server, DEFAULT_PORT};
use clap::Clap;
#[tokio::main]
pub async fn main() -> Result<()> {
pub async fn main() -> mini_redis::Result<()> {
// enable logging
// see https://docs.rs/tracing for more info
tracing_subscriber::fmt::try_init().map_err(|e| anyhow!("{:?}", e))?;
tracing_subscriber::fmt::try_init()?;
let cli = Cli::parse();
let port = cli.port.unwrap_or(DEFAULT_PORT.to_string());

View File

@@ -1,22 +1,18 @@
use crate::{
cmd::{
utils::{bytes_from_str, duration_from_ms_str},
Set,
},
frame::Frame,
Command, Connection,
};
use crate::{Connection, Frame};
use crate::cmd::{Get, Set};
use bytes::Bytes;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tracing::{debug, instrument};
/// Mini asynchronous Redis client
pub struct Client {
conn: Connection,
}
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client, Box<dyn std::error::Error>> {
pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
let socket = TcpStream::connect(addr).await?;
let conn = Connection::new(socket);
@@ -24,53 +20,89 @@ pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client, Box<dyn std::e
}
impl Client {
pub async fn get(&mut self, key: &str) -> Result<Option<Bytes>, Box<dyn std::error::Error>> {
unimplemented!();
/// Get the value of a key
#[instrument(skip(self))]
pub async fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
// Create a `Get` command for the `key` and convert it to a frame.
let frame = Get::new(key).into_frame();
debug!(request = ?frame);
// Write the frame to the socket.
self.conn.write_frame(&frame).await?;
// Wait for the response.
match self.read_response().await? {
Frame::Simple(value) => Ok(Some(value.into())),
Frame::Bulk(value) => Ok(Some(value)),
Frame::Null => Ok(None),
frame => Err(frame.to_error()),
}
}
pub async fn set(&mut self, key: &str, value: &str) -> Result<(), Box<dyn std::error::Error>> {
let opts = Set {
/// Set the value of a key to `value`.
#[instrument(skip(self))]
pub async fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.set_cmd(Set {
key: key.to_string(),
value: bytes_from_str(value),
value: value,
expire: None,
};
self.set_with_opts(opts).await
}).await
}
pub async fn set_with_expiration(
/// Set the value of a key to `value`. The value expires after `expiration`.
#[instrument(skip(self))]
pub async fn set_expires(
&mut self,
key: &str,
value: &str,
expiration: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let opts = Set {
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.set_cmd(Set {
key: key.to_string(),
value: bytes_from_str(value),
expire: Some(duration_from_ms_str(expiration)?),
};
self.set_with_opts(opts).await
value: value.into(),
expire: Some(expiration),
}).await
}
pub async fn set_with_opts(&mut self, opts: Set) -> Result<(), Box<dyn std::error::Error>> {
let frame = Command::Set(opts).into_frame()?;
async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> {
// Convert the `Set` command into a frame
let frame = cmd.into_frame();
debug!(request = ?frame);
// Write the frame to the socket
self.conn.write_frame(&frame).await?;
// Read the response
match self.read_response().await? {
Frame::Simple(response) if response == "OK" => Ok(()),
frame => Err(frame.to_error()),
}
}
/// Reads a response frame from the socket. If an `Error` frame is read, it
/// is converted to `Err`.
async fn read_response(&mut self) -> crate::Result<Frame> {
let response = self.conn.read_frame().await?;
if let Some(response) = response {
match response {
Frame::Simple(response) => {
if response == "OK" {
Ok(())
} else {
Err("unexpected response from server".into())
}
}
_ => Err("unexpected response from server".into()),
debug!(?response);
match response {
Some(Frame::Error(msg)) => {
Err(msg.into())
}
Some(frame) => Ok(frame),
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(
ErrorKind::ConnectionReset,
"connection reset by server");
Err(err.into())
}
} else {
Err(Box::new(Error::new(
ErrorKind::ConnectionReset,
"connection reset by server",
)))
}
}
}

View File

@@ -1,6 +1,6 @@
use crate::{Connection, Db, Frame, Parse, ParseError};
use std::io;
use bytes::Bytes;
use tracing::{debug, instrument};
#[derive(Debug)]
@@ -9,10 +9,14 @@ pub struct Get {
}
impl Get {
/// Create a new `Get` command which fetches `key`.
pub(crate) fn new(key: impl ToString) -> Get {
Get { key: key.to_string() }
}
// instrumenting functions will log all of the arguments passed to the function
// with their debug implementations
// see https://docs.rs/tracing/0.1.13/tracing/attr.instrument.html
#[instrument]
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Get, ParseError> {
let key = parse.next_string()?;
@@ -26,14 +30,24 @@ impl Get {
Ok(Get { key })
}
#[instrument]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> {
#[instrument(skip(self, db, dst))]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
let response = if let Some(value) = db.get(&self.key) {
Frame::Bulk(value)
} else {
Frame::Null
};
debug!(?response);
dst.write_frame(&response).await
dst.write_frame(&response).await?;
Ok(())
}
pub(crate) fn into_frame(self) -> Frame {
let mut frame = Frame::array();
frame.push_bulk(Bytes::from("get".as_bytes()));
frame.push_bulk(Bytes::from(self.key.into_bytes()));
frame
}
}

View File

@@ -10,13 +10,8 @@ pub use set::Set;
mod subscribe;
pub use subscribe::{Subscribe, Unsubscribe};
pub(crate) mod utils;
use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};
use std::io;
use tracing::instrument;
#[derive(Debug)]
pub(crate) enum Command {
Get(Get),
@@ -27,7 +22,6 @@ pub(crate) enum Command {
}
impl Command {
#[instrument]
pub(crate) fn from_frame(frame: Frame) -> Result<Command, ParseError> {
let mut parse = Parse::new(frame)?;
@@ -46,20 +40,12 @@ impl Command {
Ok(command)
}
pub(crate) fn into_frame(self) -> Result<Frame, ParseError> {
let frame = match self {
Command::Set(set) => set.into_frame(),
_ => unimplemented!(),
};
Ok(frame)
}
pub(crate) async fn apply(
self,
db: &Db,
dst: &mut Connection,
shutdown: &mut Shutdown,
) -> io::Result<()> {
) -> crate::Result<()> {
use Command::*;
match self {

View File

@@ -1,7 +1,6 @@
use crate::{Connection, Db, Frame, Parse, ParseError};
use bytes::Bytes;
use std::io;
#[derive(Debug)]
pub struct Publish {
@@ -17,11 +16,12 @@ impl Publish {
Ok(Publish { channel, message })
}
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> {
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
// Set the value
let num_subscribers = db.publish(&self.channel, self.message);
let response = Frame::Integer(num_subscribers as u64);
dst.write_frame(&response).await
dst.write_frame(&response).await?;
Ok(())
}
}

View File

@@ -1,26 +1,19 @@
use crate::cmd::{
utils::{bytes_from_str, duration_from_ms_str},
Parse, ParseError,
};
use crate::{Connection, Db, Frame};
use clap::Clap;
use crate::cmd::{Parse, ParseError};
use bytes::Bytes;
use std::io;
use std::time::Duration;
use tracing::{debug, instrument};
#[derive(Clap, Debug)]
#[derive(Debug)]
pub struct Set {
/// the lookup key
pub(crate) key: String,
/// the value to be stored
#[clap(parse(from_str = bytes_from_str))]
pub(crate) value: Bytes,
/// duration in milliseconds
#[clap(parse(try_from_str = duration_from_ms_str))]
/// When to expire the key
pub(crate) expire: Option<Duration>,
}
@@ -52,14 +45,15 @@ impl Set {
Ok(Set { key, value, expire })
}
#[instrument]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> io::Result<()> {
#[instrument(skip(db))]
pub(crate) async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> {
// Set the value
db.set(self.key, self.value, self.expire);
let response = Frame::Simple("OK".to_string());
debug!(?response);
dst.write_frame(&response).await
dst.write_frame(&response).await?;
Ok(())
}
pub(crate) fn into_frame(self) -> Frame {

View File

@@ -1,8 +1,7 @@
use crate::cmd::{Parse, ParseError};
use crate::{Command, Connection, Db, Frame, Shutdown};
use crate::cmd::{Parse, ParseError};
use bytes::Bytes;
use std::io;
use tokio::select;
use tokio::stream::{StreamExt, StreamMap};
@@ -48,7 +47,7 @@ impl Subscribe {
db: &Db,
dst: &mut Connection,
shutdown: &mut Shutdown,
) -> io::Result<()> {
) -> crate::Result<()> {
// Each individual channel subscription is handled using a
// `sync::broadcast` channel. Messages are then fanned out to all
// clients currently subscribed to the channels.

View File

@@ -1,11 +0,0 @@
use bytes::Bytes;
use std::time::Duration;
pub(crate) fn duration_from_ms_str(src: &str) -> Result<Duration, std::num::ParseIntError> {
let millis = src.parse::<u64>()?;
Ok(Duration::from_millis(millis))
}
pub(crate) fn bytes_from_str(src: &str) -> Bytes {
Bytes::from(src.to_string())
}

View File

@@ -4,7 +4,7 @@ use bytes::{Buf, BytesMut};
use std::io::{self, Cursor};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream};
use tokio::net::TcpStream;
use tracing::debug;
#[derive(Debug)]
pub(crate) struct Connection {
stream: BufStream<TcpStream>,
@@ -19,11 +19,10 @@ impl Connection {
}
}
pub(crate) async fn read_frame(&mut self) -> io::Result<Option<Frame>> {
pub(crate) async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
use frame::Error::Incomplete;
loop {
debug!(?self.buffer);
let mut buf = Cursor::new(&self.buffer[..]);
match Frame::check(&mut buf) {

View File

@@ -1,7 +1,6 @@
use tokio::io;
use bytes::{Buf, Bytes};
use std::convert::TryInto;
use std::fmt;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;
@@ -16,6 +15,7 @@ pub(crate) enum Frame {
Array(Vec<Box<Frame>>),
}
#[derive(Debug)]
pub(crate) enum Error {
/// Not enough data is available to parse a message
Incomplete,
@@ -113,7 +113,7 @@ impl Frame {
if b'-' == peek_u8(src)? {
let line = get_line(src)?;
if line != b"-1\r\n" {
if line != b"-1" {
return Err(Error::Invalid);
}
@@ -149,19 +149,34 @@ impl Frame {
}
}
pub(crate) fn try_as_str(&self) -> Result<String, String> {
match &self {
Frame::Simple(response) => Ok(response.to_string()),
Frame::Error(response) => Err(response.to_string()),
Frame::Integer(response) => Ok(format!("{}", response)),
Frame::Bulk(response) => Ok(format!("{:?}", response)),
Frame::Null => Ok("(nil)".to_string()),
Frame::Array(response) => {
let mut msg = "".to_string();
for item in response {
msg.push_str(&item.try_as_str()?)
/// Converts the frame to an "unexpected frame" error
pub(crate) fn to_error(&self) -> crate::Error {
format!("unexpected frame: {}", self).into()
}
}
impl fmt::Display for Frame {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::str;
match self {
Frame::Simple(response) => response.fmt(fmt),
Frame::Error(msg) => write!(fmt, "error: {}", msg),
Frame::Integer(num) => num.fmt(fmt),
Frame::Bulk(msg) => match str::from_utf8(msg) {
Ok(string) => string.fmt(fmt),
Err(_) => write!(fmt, "{:?}", msg),
},
Frame::Null => "(nil)".fmt(fmt),
Frame::Array(parts) => {
for (i, part) in parts.iter().enumerate() {
if i > 0 {
write!(fmt, " ")?;
part.fmt(fmt)?;
}
}
Ok(msg)
Ok(())
}
}
}
@@ -221,12 +236,6 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
Err(Error::Incomplete)
}
impl From<Error> for io::Error {
fn from(_src: Error) -> io::Error {
unimplemented!();
}
}
impl From<FromUtf8Error> for Error {
fn from(_src: FromUtf8Error) -> Error {
unimplemented!();
@@ -238,3 +247,14 @@ impl From<TryFromIntError> for Error {
unimplemented!();
}
}
impl std::error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Incomplete => "stream ended early".fmt(fmt),
Error::Invalid => "invalid frame format".fmt(fmt),
}
}
}

View File

@@ -21,3 +21,9 @@ pub mod server;
mod shutdown;
use shutdown::Shutdown;
/// Error returned by most functions.
pub type Error = Box<dyn std::error::Error + Send + Sync>;
/// A specialized `Result` type for mini-redis operations.
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -1,11 +1,11 @@
use crate::{Command, Connection, Db, Shutdown};
use anyhow::Result;
use tokio::io;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::sync::broadcast;
use tracing::{debug, error, instrument, info};
#[derive(Debug)]
struct Server {
/// Database state
db: Db,
@@ -18,6 +18,7 @@ struct Server {
}
/// Handles a connections
#[derive(Debug)]
struct Handler {
/// Database state
db: Db,
@@ -30,7 +31,7 @@ struct Handler {
}
/// Run the mini-redis server.
pub async fn run(port: &str) -> Result<()> {
pub async fn run(port: &str) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let mut server = Server {
@@ -43,11 +44,11 @@ pub async fn run(port: &str) -> Result<()> {
res = server.run() => {
if let Err(err) = res {
// TODO: gracefully handle this error
eprintln!("failed to accept; err = {}", err);
error!(cause = %err, "failed to accept");
}
}
_ = signal::ctrl_c() => {
println!("shutting down");
info!("shutting down");
}
}
@@ -56,7 +57,9 @@ pub async fn run(port: &str) -> Result<()> {
impl Server {
/// Run the server
async fn run(&mut self) -> io::Result<()> {
async fn run(&mut self) -> crate::Result<()> {
info!("accepting inbound connections");
loop {
let (socket, _) = self.listener.accept().await?;
@@ -67,8 +70,8 @@ impl Server {
};
tokio::spawn(async move {
if let Err(e) = handler.run().await {
eprintln!("client err = {:?}", e);
if let Err(err) = handler.run().await {
error!(cause = ?err, "connection error");
}
});
}
@@ -76,7 +79,8 @@ impl Server {
}
impl Handler {
async fn run(&mut self) -> io::Result<()> {
#[instrument(skip(self))]
async fn run(&mut self) -> crate::Result<()> {
while !self.shutdown.is_shutdown() {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
@@ -92,6 +96,8 @@ impl Handler {
let cmd = Command::from_frame(frame)?;
debug!(?cmd);
cmd.apply(&self.db, &mut self.connection, &mut self.shutdown)
.await?;
}

View File

@@ -1,5 +1,6 @@
use tokio::sync::broadcast;
#[derive(Debug)]
pub(crate) struct Shutdown {
shutdown: bool,
notify: broadcast::Receiver<()>,