Initial commit

This commit is contained in:
Carl Lerche
2019-12-03 21:49:10 -08:00
commit 358e95e57c
22 changed files with 1420 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
**/*.rs.bk

382
Cargo.lock generated Normal file
View File

@@ -0,0 +1,382 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "arc-swap"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "atoi"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bytes"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fnv"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures-core"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "hermit-abi"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "libc"
version = "0.2.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "log"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "memchr"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "mini-redis"
version = "0.1.0"
dependencies = [
"atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.11 (git+https://github.com/tokio-rs/tokio)",
]
[[package]]
name = "mio"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mio-named-pipes"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mio-uds"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miow"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miow"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "net2"
version = "0.2.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "num-traits"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "num_cpus"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pin-project-lite"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "proc-macro2"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "quote"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "redox_syscall"
version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "signal-hook-registry"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "socket2"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "syn"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio"
version = "0.2.11"
source = "git+https://github.com/tokio-rs/tokio#55b5e1b6adcc3f85a35d16444e496b203648c99d"
dependencies = [
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-macros 0.2.4 (git+https://github.com/tokio-rs/tokio)",
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-macros"
version = "0.2.4"
source = "git+https://github.com/tokio-rs/tokio#55b5e1b6adcc3f85a35d16444e496b203648c99d"
dependencies = [
"proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "unicode-xid"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff"
"checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47"
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures-core 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b597b16aa1a19ce2dfde5128a7c656d75346b35601a640be2d9efd4e9c83609d"
"checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
"checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223"
"checksum mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f"
"checksum mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3"
"checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum miow 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226"
"checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88"
"checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
"checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548"
"checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe"
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85"
"checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5"
"checksum tokio 0.2.11 (git+https://github.com/tokio-rs/tokio)" = "<none>"
"checksum tokio-macros 0.2.4 (git+https://github.com/tokio-rs/tokio)" = "<none>"
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"

13
Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "mini-redis"
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "0.5"
tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] }
atoi = "0.3.2"

4
README.md Normal file
View File

@@ -0,0 +1,4 @@
* get / set
* timeouts?

5
examples/chat.rs Normal file
View File

@@ -0,0 +1,5 @@
#[tokio::main]
async fn main() {
unimplemented!();
}

0
examples/hello_world.rs Normal file
View File

0
examples/pub.rs Normal file
View File

6
examples/sub.rs Normal file
View File

@@ -0,0 +1,6 @@
/// Subscribe to a redis channel
#[tokio::main]
async fn main() {
unimplemented!();
}

28
src/client.rs Normal file
View File

@@ -0,0 +1,28 @@
#![cfg(nope)]
use crate::Connection;
use bytes::Bytes;
use tokio::net::{TcpStream, ToSocketAddrs};
use std::io;
/// Mini asynchronous Redis client
pub struct Client {
conn: Connection,
}
pub async fn connect<T: ToSocketAddrs>(addr: T) -> io::Result<Client> {
let socket = TcpStream::connect(addr).await?;
let conn = Connection::new(socket);
Ok(Client { conn })
}
impl Client {
pub async fn get(&mut self, key: &str) -> io::Result<Option<Bytes>> {
unimplemented!();
}
pub async fn set(&mut self, key: &str, val: Bytes) -> io::Result<()> {
unimplemented!();
}
}

25
src/cmd/get.rs Normal file
View File

@@ -0,0 +1,25 @@
use crate::{Connection, Frame, Kv, Parse, ParseError};
use std::io;
#[derive(Debug)]
pub struct Get {
key: String,
}
impl Get {
pub(crate) fn parse(parse: &mut Parse) -> Result<Get, ParseError> {
let key = parse.next_string()?;
Ok(Get { key })
}
pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> {
let response = if let Some(value) = kv.get(&self.key[..]) {
Frame::Bulk(value)
} else {
Frame::Null
};
dst.write_frame(&response).await
}
}

63
src/cmd/mod.rs Normal file
View File

@@ -0,0 +1,63 @@
mod get;
pub use get::Get;
mod publish;
pub use publish::Publish;
mod set;
pub use set::Set;
mod subscribe;
pub use subscribe::{Subscribe, Unsubscribe};
use crate::{Connection, Frame, Kv, Parse, ParseError, Shutdown};
use std::io;
#[derive(Debug)]
pub(crate) enum Command {
Get(Get),
Publish(Publish),
Set(Set),
Subscribe(Subscribe),
Unsubscribe(Unsubscribe),
}
impl Command {
pub(crate) fn from_frame(frame: Frame) -> Result<Command, ParseError> {
let mut parse = Parse::new(frame)?;
let command_name = parse.next_string()?;
let command = match &command_name[..] {
"GET" => Command::Get(Get::parse(&mut parse)?),
"PUBLISH" => Command::Publish(Publish::parse(&mut parse)?),
"SET" => Command::Set(Set::parse(&mut parse)?),
"SUBSCRIBE" => Command::Subscribe(Subscribe::parse(&mut parse)?),
"UNSUBSCRIBE" => Command::Unsubscribe(Unsubscribe::parse(&mut parse)?),
_ => return Err(ParseError::UnknownCommand(command_name)),
};
parse.finish()?;
Ok(command)
}
pub(crate) async fn apply(
self,
kv: &Kv,
dst: &mut Connection,
shutdown: &mut Shutdown,
) -> io::Result<()> {
use Command::*;
match self {
Get(cmd) => cmd.apply(kv, dst).await,
Publish(cmd) => cmd.apply(kv, dst).await,
Set(cmd) => cmd.apply(kv, dst).await,
Subscribe(cmd) => cmd.apply(kv, dst, shutdown).await,
// `Unsubscribe` cannot be applied. It may only be received from the
// context of a `Subscribe` command.
Unsubscribe(_) => unimplemented!(),
}
}
}

27
src/cmd/publish.rs Normal file
View File

@@ -0,0 +1,27 @@
use crate::{Connection, Frame, Kv, Parse, ParseError};
use bytes::Bytes;
use std::io;
#[derive(Debug)]
pub struct Publish {
channel: String,
message: Bytes,
}
impl Publish {
pub(crate) fn parse(parse: &mut Parse) -> Result<Publish, ParseError> {
let channel = parse.next_string()?;
let message = parse.next_bytes()?;
Ok(Publish { channel, message })
}
pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> {
// Set the value
let num_subscribers = kv.publish(&self.channel, self.message);
let response = Frame::Integer(num_subscribers as u64);
dst.write_frame(&response).await
}
}

47
src/cmd/set.rs Normal file
View File

@@ -0,0 +1,47 @@
use crate::{Connection, Frame, Kv};
use crate::cmd::{Parse, ParseError};
use bytes::Bytes;
use std::io;
use std::time::Duration;
#[derive(Debug)]
pub struct Set {
key: String,
value: Bytes,
expire: Option<Duration>,
}
impl Set {
pub(crate) fn parse(parse: &mut Parse) -> Result<Set, ParseError> {
use ParseError::EndOfStream;
let key = parse.next_string()?;
let value = parse.next_bytes()?;
let mut expire = None;
match parse.next_string() {
Ok(s) if s == "EX" => {
let secs = parse.next_int()?;
expire = Some(Duration::from_secs(secs));
}
Ok(s) if s == "PX" => {
let ms = parse.next_int()?;
expire = Some(Duration::from_millis(ms));
}
Ok(_) => unimplemented!(),
Err(EndOfStream) => {}
Err(err) => return Err(err),
}
Ok(Set { key, value, expire })
}
pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> {
// Set the value
kv.set(self.key, self.value, self.expire);
let response = Frame::Simple("OK".to_string());
dst.write_frame(&response).await
}
}

171
src/cmd/subscribe.rs Normal file
View File

@@ -0,0 +1,171 @@
use crate::{Command, Connection, Frame, Kv, Shutdown};
use crate::cmd::{Parse, ParseError};
use bytes::Bytes;
use tokio::select;
use tokio::stream::{StreamMap, StreamExt};
use std::io;
#[derive(Debug)]
pub struct Subscribe {
channels: Vec<String>,
}
#[derive(Debug)]
pub struct Unsubscribe {
channels: Vec<String>,
}
impl Subscribe {
pub(crate) fn parse(parse: &mut Parse) -> Result<Subscribe, ParseError> {
use ParseError::EndOfStream;
// There must be at least one channel
let mut channels = vec![parse.next_string()?];
loop {
match parse.next_string() {
Ok(s) => channels.push(s),
Err(EndOfStream) => break,
Err(err) => return Err(err),
}
}
Ok(Subscribe { channels })
}
/// Implements the "subscribe" half of Redis' Pub/Sub feature documented
/// [here].
///
/// This function is the entry point and includes the initial list of
/// channels to subscribe to. Additional `subscribe` and `unsubscribe`
/// commands may be received from the client and the list of subscriptions
/// are updated accordingly.
///
/// [here]: https://redis.io/topics/pubsub
pub(crate) async fn apply(
mut self,
kv: &Kv,
dst: &mut Connection,
shutdown: &mut Shutdown,
) -> io::Result<()> {
// Each individual channel subscription is handled using a
// `sync::broadcast` channel. Messages are then fanned out to all
// clients currently subscribed to the channels.
//
// An individual client may subscribe to multiple channels and may
// dynamically add and remove channels from its subscription set. To
// handle this, a `StreamMap` is used to track active subscriptions. The
// `StreamMap` merges messages from individual broadcast channels as
// they are received.
let mut subscriptions = StreamMap::new();
loop {
// `self.channels` is used to track additional channels to subscribe
// to. When new `SUBSCRIBE` commands are received during the
// execution of `apply`, the new channels are pushed onto this vec.
for channel in self.channels.drain(..) {
// Build response frame to respond to the client with.
let mut response = Frame::array();
response.push_bulk(Bytes::from_static(b"subscribe"));
response.push_bulk(Bytes::copy_from_slice(channel.as_bytes()));
// Subscribe to channel
let rx = kv.subscribe(channel.clone());
// Track subscription in this client's subscription set.
subscriptions.insert(channel, rx);
// Respond with the successful subscription
dst.write_frame(&response).await?;
}
// Wait for one of the following to happen:
//
// - Receive a message from one of the subscribed channels.
// - Receive a subscribe or unsubscribe command from the client.
// - A server shutdown signal.
select! {
// Receive messages from subscribed channels
Some((channel, msg)) = subscriptions.next() => {
let mut response = Frame::array();
response.push_bulk(Bytes::from_static(b"message"));
response.push_bulk(Bytes::copy_from_slice(channel.as_bytes()));
// TODO: handle lag error
response.push_bulk(msg.unwrap());
dst.write_frame(&response).await?;
}
res = dst.read_frame() => {
let frame = match res? {
Some(frame) => frame,
// How to handle remote client closing write half
None => unimplemented!(),
};
// A command has been received from the client.
//
// Only `SUBSCRIBE` and `UNSUBSCRIBE` commands are permitted
// in this context.
match Command::from_frame(frame)? {
Command::Subscribe(subscribe) => {
// Subscribe to the channels on next iteration
self.channels.extend(subscribe.channels.into_iter());
}
Command::Unsubscribe(mut unsubscribe) => {
// If no channels are specified, this requests
// unsubscribing from **all** channels. To implement
// this, the `unsubscribe.channels` vec is populated
// with the list of channels currently subscribed
// to.
if unsubscribe.channels.is_empty() {
unsubscribe.channels = subscriptions
.keys()
.map(|channel| channel.to_string())
.collect();
}
for channel in unsubscribe.channels.drain(..) {
subscriptions.remove(&channel);
let mut response = Frame::array();
response.push_bulk(Bytes::from_static(b"unsubscribe"));
response.push_bulk(Bytes::copy_from_slice(channel.as_bytes()));
dst.write_frame(&response).await?;
}
}
_ => {
// TODO: received invalid command
unimplemented!();
}
}
}
// Receive additional commands from the client
_ = shutdown.recv() => {
return Ok(());
}
};
}
}
}
impl Unsubscribe {
pub(crate) fn parse(parse: &mut Parse) -> Result<Unsubscribe, ParseError> {
use ParseError::EndOfStream;
// There may be no channels listed.
let mut channels = vec![];
loop {
match parse.next_string() {
Ok(s) => channels.push(s),
Err(EndOfStream) => break,
Err(err) => return Err(err),
}
}
Ok(Unsubscribe { channels })
}
}

116
src/conn.rs Normal file
View File

@@ -0,0 +1,116 @@
use crate::frame::{self, Frame};
use bytes::{Buf, BytesMut};
use tokio::io::{BufStream, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use std::io::{self, Cursor};
#[derive(Debug)]
pub(crate) struct Connection {
stream: BufStream<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub(crate) fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufStream::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
pub(crate) async fn read_frame(&mut self) -> io::Result<Option<Frame>> {
use frame::Error::Incomplete;
loop {
let mut buf = Cursor::new(&self.buffer[..]);
match Frame::check(&mut buf) {
Ok(_) => {
// Get the length of the message
let len = buf.position() as usize;
// Reset the position
buf.set_position(0);
let frame = Frame::parse(&mut buf)?;
// Clear data from the buffer
self.buffer.advance(len);
return Ok(Some(frame));
}
Err(Incomplete) => {}
Err(e) => return Err(e.into()),
}
if 0 == self.stream.read_buf(&mut self.buffer).await? {
return Ok(None);
}
}
}
pub(crate) async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> {
match frame {
Frame::Array(val) => {
self.stream.write_u8(b'*').await?;
self.write_decimal(val.len() as u64).await?;
for entry in &**val {
self.write_value(entry).await?;
}
}
_ => self.write_value(frame).await?,
}
self.stream.flush().await
}
async fn write_value(&mut self, frame: &Frame) -> io::Result<()> {
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unreachable!(),
}
Ok(())
}
async fn write_decimal(&mut self, val: u64) -> io::Result<()> {
use std::io::Write;
// Convert the value to a string
let mut buf = [0u8; 12];
let mut buf = Cursor::new(&mut buf[..]);
write!(&mut buf, "{}", val)?;
let pos = buf.position() as usize;
self.stream.write_all(&buf.get_ref()[..pos]).await?;
self.stream.write_all(b"\r\n").await?;
Ok(())
}
}

224
src/frame.rs Normal file
View File

@@ -0,0 +1,224 @@
use tokio::io;
use bytes::{Buf, Bytes};
use std::convert::TryInto;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;
#[derive(Debug)]
pub(crate) enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Box<Frame>>),
}
pub(crate) enum Error {
/// Not enough data is available to parse a message
Incomplete,
/// Invalid message encoding
Invalid,
}
impl Frame {
/// Returns an empty array
pub(crate) fn array() -> Frame {
Frame::Array(vec![])
}
/// Push a "bulk" frame into the array. `self` must be an Array frame
///
/// # Panics
///
/// panics if `self` is not an array
pub(crate) fn push_bulk(&mut self, bytes: Bytes) {
match self {
Frame::Array(vec) => {
vec.push(Box::new(Frame::Bulk(bytes)));
}
_ => panic!("not an array frame"),
}
}
/// Checks if an entire message can be decoded from `src`
pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
match get_u8(src)? {
b'+' => {
get_line(src)?;
Ok(())
}
b'-' => {
get_line(src)?;
Ok(())
}
b':' => {
let _ = get_decimal(src)?;
Ok(())
}
b'$' => {
if b'-' == peek_u8(src)? {
// Skip '-1\r\n'
skip(src, 4)
} else {
// Read the bulk string
let len: usize = get_decimal(src)?.try_into()?;
// skip that number of bytes + 2 (\r\n).
skip(src, len + 2)
}
}
b'*' => {
let len = get_decimal(src)?;
for _ in 0.. len {
Frame::check(src)?;
}
Ok(())
}
_ => unimplemented!(),
}
}
/// The message has already been validated with `scan`.
pub(crate) fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> {
match get_u8(src)? {
b'+' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
// Convert the line to a String
let string = String::from_utf8(line)?;
Ok(Frame::Simple(string))
}
b'-' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
// Convert the line to a String
let string = String::from_utf8(line)?;
Ok(Frame::Error(string))
}
b':' => {
let len = get_decimal(src)?;
Ok(Frame::Integer(len))
}
b'$' => {
if b'-' == peek_u8(src)? {
let line = get_line(src)?;
if line != b"-1\r\n" {
return Err(Error::Invalid);
}
Ok(Frame::Null)
} else {
// Read the bulk string
let len = get_decimal(src)?.try_into()?;
let n = len + 2;
if src.remaining() < n {
return Err(Error::Incomplete);
}
let data = Bytes::copy_from_slice(&src.bytes()[..len]);
// skip that number of bytes + 2 (\r\n).
skip(src, n)?;
Ok(Frame::Bulk(data))
}
}
b'*' => {
let len = get_decimal(src)?.try_into()?;
let mut out = Vec::with_capacity(len);
for _ in 0.. len {
out.push(Box::new(Frame::parse(src)?));
}
Ok(Frame::Array(out))
}
_ => unimplemented!(),
}
}
}
fn peek_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() {
return Err(Error::Incomplete);
}
Ok(src.bytes()[0])
}
fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() {
return Err(Error::Incomplete);
}
Ok(src.get_u8())
}
fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> {
if src.remaining() < n {
return Err(Error::Incomplete);
}
src.advance(n);
Ok(())
}
/// Read a new-line terminated decimal
fn get_decimal(src: &mut Cursor<&[u8]>) -> Result<u64, Error> {
use atoi::atoi;
let line = get_line(src)?;
atoi::<u64>(line)
.ok_or(Error::Invalid)
}
/// Find a line
fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
// Scan the bytes directly
let start = src.position() as usize;
// Scan to the second to last byte
let end = src.get_ref().len() - 1;
for i in start..end {
if src.get_ref()[i] == b'\r' && src.get_ref()[i+1] == b'\n' {
// We found a line, update the position to be *after* the \n
src.set_position((i + 2) as u64);
// Return the line
return Ok(&src.get_ref()[start..i]);
}
}
Err(Error::Incomplete)
}
impl From<Error> for io::Error {
fn from(_src: Error) -> io::Error {
unimplemented!();
}
}
impl From<FromUtf8Error> for Error {
fn from(_src: FromUtf8Error) -> Error {
unimplemented!();
}
}
impl From<TryFromIntError> for Error {
fn from(_src: TryFromIntError) -> Error {
unimplemented!();
}
}

65
src/kv.rs Normal file
View File

@@ -0,0 +1,65 @@
use bytes::Bytes;
use tokio::sync::broadcast;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
pub(crate) struct Kv {
shared: Arc<Mutex<Shared>>,
}
#[derive(Debug)]
struct Shared {
// The key-value data
data: HashMap<String, Bytes>,
// The pub/sub key-space
pub_sub: HashMap<String, broadcast::Sender<Bytes>>,
}
impl Kv {
pub(crate) fn new() -> Kv {
Kv {
shared: Arc::new(Mutex::new(Shared {
data: HashMap::new(),
pub_sub: HashMap::new(),
})),
}
}
pub(crate) fn get(&self, key: &str) -> Option<Bytes> {
let shared = self.shared.lock().unwrap();
shared.data.get(key).map(|data| data.clone())
}
pub(crate) fn set(&self, key: String, value: Bytes, _expire: Option<Duration>) {
let mut shared = self.shared.lock().unwrap();
shared.data.insert(key, value);
}
pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
use std::collections::hash_map::Entry;
let mut shared = self.shared.lock().unwrap();
match shared.pub_sub.entry(key) {
Entry::Occupied(e) => {
e.get().subscribe()
}
Entry::Vacant(e) => {
let (tx, rx) = broadcast::channel(1028);
e.insert(tx);
rx
}
}
}
pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize {
let shared = self.shared.lock().unwrap();
shared.pub_sub.get(key)
.map(|tx| tx.send(value).unwrap_or(0))
.unwrap_or(0)
}
}

21
src/lib.rs Normal file
View File

@@ -0,0 +1,21 @@
pub mod client;
mod cmd;
use cmd::Command;
mod conn;
use conn::Connection;
mod frame;
use frame::Frame;
mod kv;
use kv::Kv;
mod parse;
use parse::{Parse, ParseError};
pub mod server;
mod shutdown;
use shutdown::Shutdown;

6
src/main.rs Normal file
View File

@@ -0,0 +1,6 @@
use std::io;
#[tokio::main]
async fn main() -> io::Result<()> {
mini_redis::server::run().await
}

84
src/parse.rs Normal file
View File

@@ -0,0 +1,84 @@
use crate::Frame;
use bytes::Bytes;
use std::{io, str, vec};
/// Utility for parsing a command
#[derive(Debug)]
pub(crate) struct Parse {
parts: vec::IntoIter<Box<Frame>>,
}
#[derive(Debug)]
pub(crate) enum ParseError {
EndOfStream,
Invalid,
UnknownCommand(String),
}
impl Parse {
pub(crate) fn new(frame: Frame) -> Result<Parse, ParseError> {
let array = match frame {
Frame::Array(array) => array,
_ => return Err(ParseError::Invalid),
};
Ok(Parse { parts: array.into_iter() })
}
fn next(&mut self) -> Result<Frame, ParseError> {
self.parts.next()
.map(|frame| *frame)
.ok_or(ParseError::EndOfStream)
}
pub(crate) fn next_string(&mut self) -> Result<String, ParseError> {
match self.next()? {
Frame::Simple(s) => Ok(s),
Frame::Bulk(data) => {
str::from_utf8(&data[..])
.map(|s| s.to_string())
.map_err(|_| ParseError::Invalid)
}
_ => Err(ParseError::Invalid),
}
}
pub(crate) fn next_bytes(&mut self) -> Result<Bytes, ParseError> {
match self.next()? {
Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())),
Frame::Bulk(data) => Ok(data),
_ => Err(ParseError::Invalid),
}
}
pub(crate) fn next_int(&mut self) -> Result<u64, ParseError> {
match self.next()? {
Frame::Integer(v) => Ok(v),
_ => Err(ParseError::Invalid),
}
}
/// Ensure there are no more entries in the array
pub(crate) fn finish(&mut self) -> Result<(), ParseError> {
if self.parts.next().is_none() {
Ok(())
} else {
Err(ParseError::Invalid)
}
}
}
impl From<ParseError> for io::Error {
fn from(src: ParseError) -> io::Error {
use ParseError::*;
io::Error::new(
io::ErrorKind::Other,
match src {
EndOfStream => "end of stream".to_string(),
Invalid => "invalid".to_string(),
UnknownCommand(cmd) => format!("unknown command `{}`", cmd),
})
}
}

101
src/server.rs Normal file
View File

@@ -0,0 +1,101 @@
use crate::{Connection, Command, Kv, Shutdown};
use tokio::io;
use tokio::net::TcpListener;
use tokio::signal;
use tokio::sync::broadcast;
struct Server {
/// Database state
kv: Kv,
/// TCP listener
listener: TcpListener,
/// Listen for shutdown
notify_shutdown: broadcast::Sender<()>,
}
/// Handles a connections
struct Handler {
/// Database state
kv: Kv,
/// The TCP connection decorated with the redis protocol encoder / decoder
connection: Connection,
/// Listen for shutdown notifications
shutdown: Shutdown,
}
/// Run the mini-redis server.
pub async fn run() -> io::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let mut server = Server {
listener: TcpListener::bind("127.0.0.1:6379").await?,
kv: Kv::new(),
notify_shutdown,
};
tokio::select! {
res = server.run() => {
if let Err(err) = res {
eprintln!("failed to accept; err = {}", err);
}
}
_ = signal::ctrl_c() => {
println!("shutting down");
}
}
Ok(())
}
impl Server {
/// Run the server
async fn run(&mut self) -> io::Result<()> {
loop {
let (socket, _) = self.listener.accept().await?;
let mut handler = Handler {
kv: self.kv.clone(),
connection: Connection::new(socket),
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
};
tokio::spawn(async move {
if let Err(e) = handler.run().await {
eprintln!("client err = {:?}", e);
}
});
}
}
}
impl Handler {
async fn run(&mut self) -> io::Result<()> {
while !self.shutdown.is_shutdown() {
let maybe_frame = tokio::select! {
res = self.connection.read_frame() => res?,
_ = self.shutdown.recv() => {
break;
}
};
let frame = match maybe_frame {
Some(frame) => frame,
None => return Ok(()),
};
let cmd = Command::from_frame(frame)?;
cmd.apply(
&self.kv,
&mut self.connection,
&mut self.shutdown).await?;
}
Ok(())
}
}

30
src/shutdown.rs Normal file
View File

@@ -0,0 +1,30 @@
use tokio::sync::broadcast;
pub(crate) struct Shutdown {
shutdown: bool,
notify: broadcast::Receiver<()>,
}
impl Shutdown {
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
/// Receive the shutdown notice
pub(crate) async fn recv(&mut self) {
if self.shutdown {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
self.shutdown = true;
}
}