diff --git a/examples/chat.rs b/examples/chat.rs index 17a3c80..607aa27 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -1,4 +1,3 @@ - #[tokio::main] async fn main() { unimplemented!(); diff --git a/examples/hello_world.rs b/examples/hello_world.rs index e69de29..8b13789 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -0,0 +1 @@ + diff --git a/examples/pub.rs b/examples/pub.rs index e69de29..8b13789 100644 --- a/examples/pub.rs +++ b/examples/pub.rs @@ -0,0 +1 @@ + diff --git a/src/client.rs b/src/client.rs index bfea0cb..fd130d3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,8 +2,8 @@ use crate::Connection; use bytes::Bytes; -use tokio::net::{TcpStream, ToSocketAddrs}; use std::io; +use tokio::net::{TcpStream, ToSocketAddrs}; /// Mini asynchronous Redis client pub struct Client { diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 9c617e0..60e0c32 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -1,5 +1,5 @@ -use crate::{Connection, Frame, Kv}; use crate::cmd::{Parse, ParseError}; +use crate::{Connection, Frame, Kv}; use bytes::Bytes; use std::io; diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index b18e6a6..bb711ce 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -1,10 +1,10 @@ -use crate::{Command, Connection, Frame, Kv, Shutdown}; use crate::cmd::{Parse, ParseError}; +use crate::{Command, Connection, Frame, Kv, Shutdown}; use bytes::Bytes; -use tokio::select; -use tokio::stream::{StreamMap, StreamExt}; use std::io; +use tokio::select; +use tokio::stream::{StreamExt, StreamMap}; #[derive(Debug)] pub struct Subscribe { @@ -49,7 +49,6 @@ impl Subscribe { dst: &mut Connection, shutdown: &mut Shutdown, ) -> io::Result<()> { - // Each individual channel subscription is handled using a // `sync::broadcast` channel. Messages are then fanned out to all // clients currently subscribed to the channels. diff --git a/src/conn.rs b/src/conn.rs index f76406a..00a730a 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,9 +1,9 @@ use crate::frame::{self, Frame}; use bytes::{Buf, BytesMut}; -use tokio::io::{BufStream, AsyncReadExt, AsyncWriteExt}; -use tokio::net::TcpStream; use std::io::{self, Cursor}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream}; +use tokio::net::TcpStream; #[derive(Debug)] pub(crate) struct Connection { diff --git a/src/frame.rs b/src/frame.rs index 68fafc8..21735cc 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -74,7 +74,7 @@ impl Frame { b'*' => { let len = get_decimal(src)?; - for _ in 0.. len { + for _ in 0..len { Frame::check(src)?; } @@ -139,7 +139,7 @@ impl Frame { let len = get_decimal(src)?.try_into()?; let mut out = Vec::with_capacity(len); - for _ in 0.. len { + for _ in 0..len { out.push(Box::new(Frame::parse(src)?)); } @@ -181,8 +181,7 @@ fn get_decimal(src: &mut Cursor<&[u8]>) -> Result { let line = get_line(src)?; - atoi::(line) - .ok_or(Error::Invalid) + atoi::(line).ok_or(Error::Invalid) } /// Find a line @@ -193,7 +192,7 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { let end = src.get_ref().len() - 1; for i in start..end { - if src.get_ref()[i] == b'\r' && src.get_ref()[i+1] == b'\n' { + if src.get_ref()[i] == b'\r' && src.get_ref()[i + 1] == b'\n' { // We found a line, update the position to be *after* the \n src.set_position((i + 2) as u64); diff --git a/src/kv.rs b/src/kv.rs index 70cebf0..b170248 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -1,8 +1,8 @@ use bytes::Bytes; -use tokio::sync::broadcast; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::sync::broadcast; #[derive(Debug, Clone)] pub(crate) struct Kv { @@ -44,9 +44,7 @@ impl Kv { let mut shared = self.shared.lock().unwrap(); match shared.pub_sub.entry(key) { - Entry::Occupied(e) => { - e.get().subscribe() - } + Entry::Occupied(e) => e.get().subscribe(), Entry::Vacant(e) => { let (tx, rx) = broadcast::channel(1028); e.insert(tx); @@ -58,7 +56,9 @@ impl Kv { pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize { let shared = self.shared.lock().unwrap(); - shared.pub_sub.get(key) + shared + .pub_sub + .get(key) .map(|tx| tx.send(value).unwrap_or(0)) .unwrap_or(0) } diff --git a/src/parse.rs b/src/parse.rs index 4e2d2bb..227a38b 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -23,11 +23,14 @@ impl Parse { _ => return Err(ParseError::Invalid), }; - Ok(Parse { parts: array.into_iter() }) + Ok(Parse { + parts: array.into_iter(), + }) } fn next(&mut self) -> Result { - self.parts.next() + self.parts + .next() .map(|frame| *frame) .ok_or(ParseError::EndOfStream) } @@ -35,11 +38,9 @@ impl Parse { pub(crate) fn next_string(&mut self) -> Result { match self.next()? { Frame::Simple(s) => Ok(s), - Frame::Bulk(data) => { - str::from_utf8(&data[..]) - .map(|s| s.to_string()) - .map_err(|_| ParseError::Invalid) - } + Frame::Bulk(data) => str::from_utf8(&data[..]) + .map(|s| s.to_string()) + .map_err(|_| ParseError::Invalid), _ => Err(ParseError::Invalid), } } @@ -79,6 +80,7 @@ impl From for io::Error { EndOfStream => "end of stream".to_string(), Invalid => "invalid".to_string(), UnknownCommand(cmd) => format!("unknown command `{}`", cmd), - }) + }, + ) } } diff --git a/src/server.rs b/src/server.rs index a07fdf4..dfe4f39 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use crate::{Connection, Command, Kv, Shutdown}; +use crate::{Command, Connection, Kv, Shutdown}; use tokio::io; use tokio::net::TcpListener; @@ -90,10 +90,8 @@ impl Handler { let cmd = Command::from_frame(frame)?; - cmd.apply( - &self.kv, - &mut self.connection, - &mut self.shutdown).await?; + cmd.apply(&self.kv, &mut self.connection, &mut self.shutdown) + .await?; } Ok(())