From 358e95e57c6357d4265c371a0adc9df39f31d751 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 3 Dec 2019 21:49:10 -0800 Subject: [PATCH] Initial commit --- .gitignore | 2 + Cargo.lock | 382 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 13 ++ README.md | 4 + examples/chat.rs | 5 + examples/hello_world.rs | 0 examples/pub.rs | 0 examples/sub.rs | 6 + src/client.rs | 28 +++ src/cmd/get.rs | 25 +++ src/cmd/mod.rs | 63 +++++++ src/cmd/publish.rs | 27 +++ src/cmd/set.rs | 47 +++++ src/cmd/subscribe.rs | 171 ++++++++++++++++++ src/conn.rs | 116 ++++++++++++ src/frame.rs | 224 +++++++++++++++++++++++ src/kv.rs | 65 +++++++ src/lib.rs | 21 +++ src/main.rs | 6 + src/parse.rs | 84 +++++++++ src/server.rs | 101 +++++++++++ src/shutdown.rs | 30 ++++ 22 files changed, 1420 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 examples/chat.rs create mode 100644 examples/hello_world.rs create mode 100644 examples/pub.rs create mode 100644 examples/sub.rs create mode 100644 src/client.rs create mode 100644 src/cmd/get.rs create mode 100644 src/cmd/mod.rs create mode 100644 src/cmd/publish.rs create mode 100644 src/cmd/set.rs create mode 100644 src/cmd/subscribe.rs create mode 100644 src/conn.rs create mode 100644 src/frame.rs create mode 100644 src/kv.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 src/parse.rs create mode 100644 src/server.rs create mode 100644 src/shutdown.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..53eaa21 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +**/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..fcaae9c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,382 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "arc-swap" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "atoi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "fnv" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-core" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "hermit-abi" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "libc" +version = "0.2.66" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memchr" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "mini-redis" +version = "0.1.0" +dependencies = [ + "atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.11 (git+https://github.com/tokio-rs/tokio)", +] + +[[package]] +name = "mio" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "mio-named-pipes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "mio-uds" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "miow" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "miow" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "net2" +version = "0.2.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num-traits" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "num_cpus" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "pin-project-lite" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "proc-macro2" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "quote" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "signal-hook-registry" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "socket2" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "syn" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio" +version = "0.2.11" +source = "git+https://github.com/tokio-rs/tokio#55b5e1b6adcc3f85a35d16444e496b203648c99d" +dependencies = [ + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-macros 0.2.4 (git+https://github.com/tokio-rs/tokio)", + "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-macros" +version = "0.2.4" +source = "git+https://github.com/tokio-rs/tokio#55b5e1b6adcc3f85a35d16444e496b203648c99d" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "unicode-xid" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[metadata] +"checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff" +"checksum atoi 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e0afb7287b68575f5ca0e5c7e40191cbd4be59d325781f46faa603e176eaef47" +"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" +"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +"checksum futures-core 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b597b16aa1a19ce2dfde5128a7c656d75346b35601a640be2d9efd4e9c83609d" +"checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772" +"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +"checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558" +"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +"checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223" +"checksum mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f" +"checksum mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" +"checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" +"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum miow 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226" +"checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" +"checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +"checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" +"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" +"checksum proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548" +"checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" +"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" +"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" +"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +"checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" +"checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5" +"checksum tokio 0.2.11 (git+https://github.com/tokio-rs/tokio)" = "" +"checksum tokio-macros 0.2.4 (git+https://github.com/tokio-rs/tokio)" = "" +"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" +"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" +"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e652c2f --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "mini-redis" +version = "0.1.0" +authors = ["Carl Lerche "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "0.5" +tokio = { git = "https://github.com/tokio-rs/tokio", features = ["full"] } + +atoi = "0.3.2" diff --git a/README.md b/README.md new file mode 100644 index 0000000..6de00cf --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ + +* get / set + +* timeouts? diff --git a/examples/chat.rs b/examples/chat.rs new file mode 100644 index 0000000..17a3c80 --- /dev/null +++ b/examples/chat.rs @@ -0,0 +1,5 @@ + +#[tokio::main] +async fn main() { + unimplemented!(); +} diff --git a/examples/hello_world.rs b/examples/hello_world.rs new file mode 100644 index 0000000..e69de29 diff --git a/examples/pub.rs b/examples/pub.rs new file mode 100644 index 0000000..e69de29 diff --git a/examples/sub.rs b/examples/sub.rs new file mode 100644 index 0000000..eda175c --- /dev/null +++ b/examples/sub.rs @@ -0,0 +1,6 @@ +/// Subscribe to a redis channel + +#[tokio::main] +async fn main() { + unimplemented!(); +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..bfea0cb --- /dev/null +++ b/src/client.rs @@ -0,0 +1,28 @@ +#![cfg(nope)] +use crate::Connection; + +use bytes::Bytes; +use tokio::net::{TcpStream, ToSocketAddrs}; +use std::io; + +/// Mini asynchronous Redis client +pub struct Client { + conn: Connection, +} + +pub async fn connect(addr: T) -> io::Result { + let socket = TcpStream::connect(addr).await?; + let conn = Connection::new(socket); + + Ok(Client { conn }) +} + +impl Client { + pub async fn get(&mut self, key: &str) -> io::Result> { + unimplemented!(); + } + + pub async fn set(&mut self, key: &str, val: Bytes) -> io::Result<()> { + unimplemented!(); + } +} diff --git a/src/cmd/get.rs b/src/cmd/get.rs new file mode 100644 index 0000000..20fc85b --- /dev/null +++ b/src/cmd/get.rs @@ -0,0 +1,25 @@ +use crate::{Connection, Frame, Kv, Parse, ParseError}; + +use std::io; + +#[derive(Debug)] +pub struct Get { + key: String, +} + +impl Get { + pub(crate) fn parse(parse: &mut Parse) -> Result { + let key = parse.next_string()?; + Ok(Get { key }) + } + + pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> { + let response = if let Some(value) = kv.get(&self.key[..]) { + Frame::Bulk(value) + } else { + Frame::Null + }; + + dst.write_frame(&response).await + } +} diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs new file mode 100644 index 0000000..3e7a7b1 --- /dev/null +++ b/src/cmd/mod.rs @@ -0,0 +1,63 @@ +mod get; +pub use get::Get; + +mod publish; +pub use publish::Publish; + +mod set; +pub use set::Set; + +mod subscribe; +pub use subscribe::{Subscribe, Unsubscribe}; + +use crate::{Connection, Frame, Kv, Parse, ParseError, Shutdown}; + +use std::io; + +#[derive(Debug)] +pub(crate) enum Command { + Get(Get), + Publish(Publish), + Set(Set), + Subscribe(Subscribe), + Unsubscribe(Unsubscribe), +} + +impl Command { + pub(crate) fn from_frame(frame: Frame) -> Result { + let mut parse = Parse::new(frame)?; + + let command_name = parse.next_string()?; + + let command = match &command_name[..] { + "GET" => Command::Get(Get::parse(&mut parse)?), + "PUBLISH" => Command::Publish(Publish::parse(&mut parse)?), + "SET" => Command::Set(Set::parse(&mut parse)?), + "SUBSCRIBE" => Command::Subscribe(Subscribe::parse(&mut parse)?), + "UNSUBSCRIBE" => Command::Unsubscribe(Unsubscribe::parse(&mut parse)?), + _ => return Err(ParseError::UnknownCommand(command_name)), + }; + + parse.finish()?; + Ok(command) + } + + pub(crate) async fn apply( + self, + kv: &Kv, + dst: &mut Connection, + shutdown: &mut Shutdown, + ) -> io::Result<()> { + use Command::*; + + match self { + Get(cmd) => cmd.apply(kv, dst).await, + Publish(cmd) => cmd.apply(kv, dst).await, + Set(cmd) => cmd.apply(kv, dst).await, + Subscribe(cmd) => cmd.apply(kv, dst, shutdown).await, + // `Unsubscribe` cannot be applied. It may only be received from the + // context of a `Subscribe` command. + Unsubscribe(_) => unimplemented!(), + } + } +} diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs new file mode 100644 index 0000000..423af99 --- /dev/null +++ b/src/cmd/publish.rs @@ -0,0 +1,27 @@ +use crate::{Connection, Frame, Kv, Parse, ParseError}; + +use bytes::Bytes; +use std::io; + +#[derive(Debug)] +pub struct Publish { + channel: String, + message: Bytes, +} + +impl Publish { + pub(crate) fn parse(parse: &mut Parse) -> Result { + let channel = parse.next_string()?; + let message = parse.next_bytes()?; + + Ok(Publish { channel, message }) + } + + pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> { + // Set the value + let num_subscribers = kv.publish(&self.channel, self.message); + + let response = Frame::Integer(num_subscribers as u64); + dst.write_frame(&response).await + } +} diff --git a/src/cmd/set.rs b/src/cmd/set.rs new file mode 100644 index 0000000..9c617e0 --- /dev/null +++ b/src/cmd/set.rs @@ -0,0 +1,47 @@ +use crate::{Connection, Frame, Kv}; +use crate::cmd::{Parse, ParseError}; + +use bytes::Bytes; +use std::io; +use std::time::Duration; + +#[derive(Debug)] +pub struct Set { + key: String, + value: Bytes, + expire: Option, +} + +impl Set { + pub(crate) fn parse(parse: &mut Parse) -> Result { + use ParseError::EndOfStream; + + let key = parse.next_string()?; + let value = parse.next_bytes()?; + let mut expire = None; + + match parse.next_string() { + Ok(s) if s == "EX" => { + let secs = parse.next_int()?; + expire = Some(Duration::from_secs(secs)); + } + Ok(s) if s == "PX" => { + let ms = parse.next_int()?; + expire = Some(Duration::from_millis(ms)); + } + Ok(_) => unimplemented!(), + Err(EndOfStream) => {} + Err(err) => return Err(err), + } + + Ok(Set { key, value, expire }) + } + + pub(crate) async fn apply(self, kv: &Kv, dst: &mut Connection) -> io::Result<()> { + // Set the value + kv.set(self.key, self.value, self.expire); + + let response = Frame::Simple("OK".to_string()); + dst.write_frame(&response).await + } +} diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs new file mode 100644 index 0000000..b18e6a6 --- /dev/null +++ b/src/cmd/subscribe.rs @@ -0,0 +1,171 @@ +use crate::{Command, Connection, Frame, Kv, Shutdown}; +use crate::cmd::{Parse, ParseError}; + +use bytes::Bytes; +use tokio::select; +use tokio::stream::{StreamMap, StreamExt}; +use std::io; + +#[derive(Debug)] +pub struct Subscribe { + channels: Vec, +} + +#[derive(Debug)] +pub struct Unsubscribe { + channels: Vec, +} + +impl Subscribe { + pub(crate) fn parse(parse: &mut Parse) -> Result { + use ParseError::EndOfStream; + + // There must be at least one channel + let mut channels = vec![parse.next_string()?]; + + loop { + match parse.next_string() { + Ok(s) => channels.push(s), + Err(EndOfStream) => break, + Err(err) => return Err(err), + } + } + + Ok(Subscribe { channels }) + } + + /// Implements the "subscribe" half of Redis' Pub/Sub feature documented + /// [here]. + /// + /// This function is the entry point and includes the initial list of + /// channels to subscribe to. Additional `subscribe` and `unsubscribe` + /// commands may be received from the client and the list of subscriptions + /// are updated accordingly. + /// + /// [here]: https://redis.io/topics/pubsub + pub(crate) async fn apply( + mut self, + kv: &Kv, + dst: &mut Connection, + shutdown: &mut Shutdown, + ) -> io::Result<()> { + + // Each individual channel subscription is handled using a + // `sync::broadcast` channel. Messages are then fanned out to all + // clients currently subscribed to the channels. + // + // An individual client may subscribe to multiple channels and may + // dynamically add and remove channels from its subscription set. To + // handle this, a `StreamMap` is used to track active subscriptions. The + // `StreamMap` merges messages from individual broadcast channels as + // they are received. + let mut subscriptions = StreamMap::new(); + + loop { + // `self.channels` is used to track additional channels to subscribe + // to. When new `SUBSCRIBE` commands are received during the + // execution of `apply`, the new channels are pushed onto this vec. + for channel in self.channels.drain(..) { + // Build response frame to respond to the client with. + let mut response = Frame::array(); + response.push_bulk(Bytes::from_static(b"subscribe")); + response.push_bulk(Bytes::copy_from_slice(channel.as_bytes())); + + // Subscribe to channel + let rx = kv.subscribe(channel.clone()); + + // Track subscription in this client's subscription set. + subscriptions.insert(channel, rx); + + // Respond with the successful subscription + dst.write_frame(&response).await?; + } + + // Wait for one of the following to happen: + // + // - Receive a message from one of the subscribed channels. + // - Receive a subscribe or unsubscribe command from the client. + // - A server shutdown signal. + select! { + // Receive messages from subscribed channels + Some((channel, msg)) = subscriptions.next() => { + let mut response = Frame::array(); + response.push_bulk(Bytes::from_static(b"message")); + response.push_bulk(Bytes::copy_from_slice(channel.as_bytes())); + // TODO: handle lag error + response.push_bulk(msg.unwrap()); + + dst.write_frame(&response).await?; + } + res = dst.read_frame() => { + let frame = match res? { + Some(frame) => frame, + // How to handle remote client closing write half + None => unimplemented!(), + }; + + // A command has been received from the client. + // + // Only `SUBSCRIBE` and `UNSUBSCRIBE` commands are permitted + // in this context. + match Command::from_frame(frame)? { + Command::Subscribe(subscribe) => { + // Subscribe to the channels on next iteration + self.channels.extend(subscribe.channels.into_iter()); + } + Command::Unsubscribe(mut unsubscribe) => { + // If no channels are specified, this requests + // unsubscribing from **all** channels. To implement + // this, the `unsubscribe.channels` vec is populated + // with the list of channels currently subscribed + // to. + if unsubscribe.channels.is_empty() { + unsubscribe.channels = subscriptions + .keys() + .map(|channel| channel.to_string()) + .collect(); + } + + for channel in unsubscribe.channels.drain(..) { + subscriptions.remove(&channel); + + let mut response = Frame::array(); + response.push_bulk(Bytes::from_static(b"unsubscribe")); + response.push_bulk(Bytes::copy_from_slice(channel.as_bytes())); + + dst.write_frame(&response).await?; + } + } + _ => { + // TODO: received invalid command + unimplemented!(); + } + } + } + // Receive additional commands from the client + _ = shutdown.recv() => { + return Ok(()); + } + }; + } + } +} + +impl Unsubscribe { + pub(crate) fn parse(parse: &mut Parse) -> Result { + use ParseError::EndOfStream; + + // There may be no channels listed. + let mut channels = vec![]; + + loop { + match parse.next_string() { + Ok(s) => channels.push(s), + Err(EndOfStream) => break, + Err(err) => return Err(err), + } + } + + Ok(Unsubscribe { channels }) + } +} diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..f76406a --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,116 @@ +use crate::frame::{self, Frame}; + +use bytes::{Buf, BytesMut}; +use tokio::io::{BufStream, AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use std::io::{self, Cursor}; + +#[derive(Debug)] +pub(crate) struct Connection { + stream: BufStream, + buffer: BytesMut, +} + +impl Connection { + pub(crate) fn new(socket: TcpStream) -> Connection { + Connection { + stream: BufStream::new(socket), + buffer: BytesMut::with_capacity(4 * 1024), + } + } + + pub(crate) async fn read_frame(&mut self) -> io::Result> { + use frame::Error::Incomplete; + + loop { + let mut buf = Cursor::new(&self.buffer[..]); + + match Frame::check(&mut buf) { + Ok(_) => { + // Get the length of the message + let len = buf.position() as usize; + + // Reset the position + buf.set_position(0); + + let frame = Frame::parse(&mut buf)?; + + // Clear data from the buffer + self.buffer.advance(len); + + return Ok(Some(frame)); + } + Err(Incomplete) => {} + Err(e) => return Err(e.into()), + } + + if 0 == self.stream.read_buf(&mut self.buffer).await? { + return Ok(None); + } + } + } + + pub(crate) async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { + match frame { + Frame::Array(val) => { + self.stream.write_u8(b'*').await?; + self.write_decimal(val.len() as u64).await?; + + for entry in &**val { + self.write_value(entry).await?; + } + } + _ => self.write_value(frame).await?, + } + + self.stream.flush().await + } + + async fn write_value(&mut self, frame: &Frame) -> io::Result<()> { + match frame { + Frame::Simple(val) => { + self.stream.write_u8(b'+').await?; + self.stream.write_all(val.as_bytes()).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Error(val) => { + self.stream.write_u8(b'-').await?; + self.stream.write_all(val.as_bytes()).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Integer(val) => { + self.stream.write_u8(b':').await?; + self.write_decimal(*val).await?; + } + Frame::Null => { + self.stream.write_all(b"$-1\r\n").await?; + } + Frame::Bulk(val) => { + let len = val.len(); + + self.stream.write_u8(b'$').await?; + self.write_decimal(len as u64).await?; + self.stream.write_all(val).await?; + self.stream.write_all(b"\r\n").await?; + } + Frame::Array(_val) => unreachable!(), + } + + Ok(()) + } + + async fn write_decimal(&mut self, val: u64) -> io::Result<()> { + use std::io::Write; + + // Convert the value to a string + let mut buf = [0u8; 12]; + let mut buf = Cursor::new(&mut buf[..]); + write!(&mut buf, "{}", val)?; + + let pos = buf.position() as usize; + self.stream.write_all(&buf.get_ref()[..pos]).await?; + self.stream.write_all(b"\r\n").await?; + + Ok(()) + } +} diff --git a/src/frame.rs b/src/frame.rs new file mode 100644 index 0000000..68fafc8 --- /dev/null +++ b/src/frame.rs @@ -0,0 +1,224 @@ +use tokio::io; + +use bytes::{Buf, Bytes}; +use std::convert::TryInto; +use std::io::Cursor; +use std::num::TryFromIntError; +use std::string::FromUtf8Error; + +#[derive(Debug)] +pub(crate) enum Frame { + Simple(String), + Error(String), + Integer(u64), + Bulk(Bytes), + Null, + Array(Vec>), +} + +pub(crate) enum Error { + /// Not enough data is available to parse a message + Incomplete, + + /// Invalid message encoding + Invalid, +} + +impl Frame { + /// Returns an empty array + pub(crate) fn array() -> Frame { + Frame::Array(vec![]) + } + + /// Push a "bulk" frame into the array. `self` must be an Array frame + /// + /// # Panics + /// + /// panics if `self` is not an array + pub(crate) fn push_bulk(&mut self, bytes: Bytes) { + match self { + Frame::Array(vec) => { + vec.push(Box::new(Frame::Bulk(bytes))); + } + _ => panic!("not an array frame"), + } + } + + /// Checks if an entire message can be decoded from `src` + pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { + match get_u8(src)? { + b'+' => { + get_line(src)?; + Ok(()) + } + b'-' => { + get_line(src)?; + Ok(()) + } + b':' => { + let _ = get_decimal(src)?; + Ok(()) + } + b'$' => { + if b'-' == peek_u8(src)? { + // Skip '-1\r\n' + skip(src, 4) + } else { + // Read the bulk string + let len: usize = get_decimal(src)?.try_into()?; + + // skip that number of bytes + 2 (\r\n). + skip(src, len + 2) + } + } + b'*' => { + let len = get_decimal(src)?; + + for _ in 0.. len { + Frame::check(src)?; + } + + Ok(()) + } + _ => unimplemented!(), + } + } + + /// The message has already been validated with `scan`. + pub(crate) fn parse(src: &mut Cursor<&[u8]>) -> Result { + match get_u8(src)? { + b'+' => { + // Read the line and convert it to `Vec` + let line = get_line(src)?.to_vec(); + + // Convert the line to a String + let string = String::from_utf8(line)?; + + Ok(Frame::Simple(string)) + } + b'-' => { + // Read the line and convert it to `Vec` + let line = get_line(src)?.to_vec(); + + // Convert the line to a String + let string = String::from_utf8(line)?; + + Ok(Frame::Error(string)) + } + b':' => { + let len = get_decimal(src)?; + Ok(Frame::Integer(len)) + } + b'$' => { + if b'-' == peek_u8(src)? { + let line = get_line(src)?; + + if line != b"-1\r\n" { + return Err(Error::Invalid); + } + + Ok(Frame::Null) + } else { + // Read the bulk string + let len = get_decimal(src)?.try_into()?; + let n = len + 2; + + if src.remaining() < n { + return Err(Error::Incomplete); + } + + let data = Bytes::copy_from_slice(&src.bytes()[..len]); + + // skip that number of bytes + 2 (\r\n). + skip(src, n)?; + + Ok(Frame::Bulk(data)) + } + } + b'*' => { + let len = get_decimal(src)?.try_into()?; + let mut out = Vec::with_capacity(len); + + for _ in 0.. len { + out.push(Box::new(Frame::parse(src)?)); + } + + Ok(Frame::Array(out)) + } + _ => unimplemented!(), + } + } +} + +fn peek_u8(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + return Err(Error::Incomplete); + } + + Ok(src.bytes()[0]) +} + +fn get_u8(src: &mut Cursor<&[u8]>) -> Result { + if !src.has_remaining() { + return Err(Error::Incomplete); + } + + Ok(src.get_u8()) +} + +fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { + if src.remaining() < n { + return Err(Error::Incomplete); + } + + src.advance(n); + Ok(()) +} + +/// Read a new-line terminated decimal +fn get_decimal(src: &mut Cursor<&[u8]>) -> Result { + use atoi::atoi; + + let line = get_line(src)?; + + atoi::(line) + .ok_or(Error::Invalid) +} + +/// Find a line +fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { + // Scan the bytes directly + let start = src.position() as usize; + // Scan to the second to last byte + let end = src.get_ref().len() - 1; + + for i in start..end { + if src.get_ref()[i] == b'\r' && src.get_ref()[i+1] == b'\n' { + // We found a line, update the position to be *after* the \n + src.set_position((i + 2) as u64); + + // Return the line + return Ok(&src.get_ref()[start..i]); + } + } + + Err(Error::Incomplete) +} + +impl From for io::Error { + fn from(_src: Error) -> io::Error { + unimplemented!(); + } +} + +impl From for Error { + fn from(_src: FromUtf8Error) -> Error { + unimplemented!(); + } +} + +impl From for Error { + fn from(_src: TryFromIntError) -> Error { + unimplemented!(); + } +} diff --git a/src/kv.rs b/src/kv.rs new file mode 100644 index 0000000..70cebf0 --- /dev/null +++ b/src/kv.rs @@ -0,0 +1,65 @@ +use bytes::Bytes; +use tokio::sync::broadcast; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub(crate) struct Kv { + shared: Arc>, +} + +#[derive(Debug)] +struct Shared { + // The key-value data + data: HashMap, + + // The pub/sub key-space + pub_sub: HashMap>, +} + +impl Kv { + pub(crate) fn new() -> Kv { + Kv { + shared: Arc::new(Mutex::new(Shared { + data: HashMap::new(), + pub_sub: HashMap::new(), + })), + } + } + + pub(crate) fn get(&self, key: &str) -> Option { + let shared = self.shared.lock().unwrap(); + shared.data.get(key).map(|data| data.clone()) + } + + pub(crate) fn set(&self, key: String, value: Bytes, _expire: Option) { + let mut shared = self.shared.lock().unwrap(); + shared.data.insert(key, value); + } + + pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver { + use std::collections::hash_map::Entry; + + let mut shared = self.shared.lock().unwrap(); + + match shared.pub_sub.entry(key) { + Entry::Occupied(e) => { + e.get().subscribe() + } + Entry::Vacant(e) => { + let (tx, rx) = broadcast::channel(1028); + e.insert(tx); + rx + } + } + } + + pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize { + let shared = self.shared.lock().unwrap(); + + shared.pub_sub.get(key) + .map(|tx| tx.send(value).unwrap_or(0)) + .unwrap_or(0) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3842edc --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,21 @@ +pub mod client; + +mod cmd; +use cmd::Command; + +mod conn; +use conn::Connection; + +mod frame; +use frame::Frame; + +mod kv; +use kv::Kv; + +mod parse; +use parse::{Parse, ParseError}; + +pub mod server; + +mod shutdown; +use shutdown::Shutdown; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a4ab200 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,6 @@ +use std::io; + +#[tokio::main] +async fn main() -> io::Result<()> { + mini_redis::server::run().await +} diff --git a/src/parse.rs b/src/parse.rs new file mode 100644 index 0000000..4e2d2bb --- /dev/null +++ b/src/parse.rs @@ -0,0 +1,84 @@ +use crate::Frame; + +use bytes::Bytes; +use std::{io, str, vec}; + +/// Utility for parsing a command +#[derive(Debug)] +pub(crate) struct Parse { + parts: vec::IntoIter>, +} + +#[derive(Debug)] +pub(crate) enum ParseError { + EndOfStream, + Invalid, + UnknownCommand(String), +} + +impl Parse { + pub(crate) fn new(frame: Frame) -> Result { + let array = match frame { + Frame::Array(array) => array, + _ => return Err(ParseError::Invalid), + }; + + Ok(Parse { parts: array.into_iter() }) + } + + fn next(&mut self) -> Result { + self.parts.next() + .map(|frame| *frame) + .ok_or(ParseError::EndOfStream) + } + + pub(crate) fn next_string(&mut self) -> Result { + match self.next()? { + Frame::Simple(s) => Ok(s), + Frame::Bulk(data) => { + str::from_utf8(&data[..]) + .map(|s| s.to_string()) + .map_err(|_| ParseError::Invalid) + } + _ => Err(ParseError::Invalid), + } + } + + pub(crate) fn next_bytes(&mut self) -> Result { + match self.next()? { + Frame::Simple(s) => Ok(Bytes::from(s.into_bytes())), + Frame::Bulk(data) => Ok(data), + _ => Err(ParseError::Invalid), + } + } + + pub(crate) fn next_int(&mut self) -> Result { + match self.next()? { + Frame::Integer(v) => Ok(v), + _ => Err(ParseError::Invalid), + } + } + + /// Ensure there are no more entries in the array + pub(crate) fn finish(&mut self) -> Result<(), ParseError> { + if self.parts.next().is_none() { + Ok(()) + } else { + Err(ParseError::Invalid) + } + } +} + +impl From for io::Error { + fn from(src: ParseError) -> io::Error { + use ParseError::*; + + io::Error::new( + io::ErrorKind::Other, + match src { + EndOfStream => "end of stream".to_string(), + Invalid => "invalid".to_string(), + UnknownCommand(cmd) => format!("unknown command `{}`", cmd), + }) + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..a07fdf4 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,101 @@ +use crate::{Connection, Command, Kv, Shutdown}; + +use tokio::io; +use tokio::net::TcpListener; +use tokio::signal; +use tokio::sync::broadcast; + +struct Server { + /// Database state + kv: Kv, + + /// TCP listener + listener: TcpListener, + + /// Listen for shutdown + notify_shutdown: broadcast::Sender<()>, +} + +/// Handles a connections +struct Handler { + /// Database state + kv: Kv, + + /// The TCP connection decorated with the redis protocol encoder / decoder + connection: Connection, + + /// Listen for shutdown notifications + shutdown: Shutdown, +} + +/// Run the mini-redis server. +pub async fn run() -> io::Result<()> { + let (notify_shutdown, _) = broadcast::channel(1); + + let mut server = Server { + listener: TcpListener::bind("127.0.0.1:6379").await?, + kv: Kv::new(), + notify_shutdown, + }; + + tokio::select! { + res = server.run() => { + if let Err(err) = res { + eprintln!("failed to accept; err = {}", err); + } + } + _ = signal::ctrl_c() => { + println!("shutting down"); + } + } + + Ok(()) +} + +impl Server { + /// Run the server + async fn run(&mut self) -> io::Result<()> { + loop { + let (socket, _) = self.listener.accept().await?; + + let mut handler = Handler { + kv: self.kv.clone(), + connection: Connection::new(socket), + shutdown: Shutdown::new(self.notify_shutdown.subscribe()), + }; + + tokio::spawn(async move { + if let Err(e) = handler.run().await { + eprintln!("client err = {:?}", e); + } + }); + } + } +} + +impl Handler { + async fn run(&mut self) -> io::Result<()> { + while !self.shutdown.is_shutdown() { + let maybe_frame = tokio::select! { + res = self.connection.read_frame() => res?, + _ = self.shutdown.recv() => { + break; + } + }; + + let frame = match maybe_frame { + Some(frame) => frame, + None => return Ok(()), + }; + + let cmd = Command::from_frame(frame)?; + + cmd.apply( + &self.kv, + &mut self.connection, + &mut self.shutdown).await?; + } + + Ok(()) + } +} diff --git a/src/shutdown.rs b/src/shutdown.rs new file mode 100644 index 0000000..284abcc --- /dev/null +++ b/src/shutdown.rs @@ -0,0 +1,30 @@ +use tokio::sync::broadcast; + +pub(crate) struct Shutdown { + shutdown: bool, + notify: broadcast::Receiver<()>, +} + +impl Shutdown { + pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { + Shutdown { + shutdown: false, + notify, + } + } + + pub(crate) fn is_shutdown(&self) -> bool { + self.shutdown + } + + /// Receive the shutdown notice + pub(crate) async fn recv(&mut self) { + if self.shutdown { + return; + } + + // Cannot receive a "lag error" as only one value is ever sent. + let _ = self.notify.recv().await; + self.shutdown = true; + } +}