runs cargofmt (#3)
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
unimplemented!();
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
use crate::Connection;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||
use std::io;
|
||||
use tokio::net::{TcpStream, ToSocketAddrs};
|
||||
|
||||
/// Mini asynchronous Redis client
|
||||
pub struct Client {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{Connection, Frame, Kv};
|
||||
use crate::cmd::{Parse, ParseError};
|
||||
use crate::{Connection, Frame, Kv};
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::io;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::{Command, Connection, Frame, Kv, Shutdown};
|
||||
use crate::cmd::{Parse, ParseError};
|
||||
use crate::{Command, Connection, Frame, Kv, Shutdown};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::select;
|
||||
use tokio::stream::{StreamMap, StreamExt};
|
||||
use std::io;
|
||||
use tokio::select;
|
||||
use tokio::stream::{StreamExt, StreamMap};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Subscribe {
|
||||
@@ -49,7 +49,6 @@ impl Subscribe {
|
||||
dst: &mut Connection,
|
||||
shutdown: &mut Shutdown,
|
||||
) -> io::Result<()> {
|
||||
|
||||
// Each individual channel subscription is handled using a
|
||||
// `sync::broadcast` channel. Messages are then fanned out to all
|
||||
// clients currently subscribed to the channels.
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use crate::frame::{self, Frame};
|
||||
|
||||
use bytes::{Buf, BytesMut};
|
||||
use tokio::io::{BufStream, AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use std::io::{self, Cursor};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Connection {
|
||||
|
||||
@@ -74,7 +74,7 @@ impl Frame {
|
||||
b'*' => {
|
||||
let len = get_decimal(src)?;
|
||||
|
||||
for _ in 0.. len {
|
||||
for _ in 0..len {
|
||||
Frame::check(src)?;
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ impl Frame {
|
||||
let len = get_decimal(src)?.try_into()?;
|
||||
let mut out = Vec::with_capacity(len);
|
||||
|
||||
for _ in 0.. len {
|
||||
for _ in 0..len {
|
||||
out.push(Box::new(Frame::parse(src)?));
|
||||
}
|
||||
|
||||
@@ -181,8 +181,7 @@ fn get_decimal(src: &mut Cursor<&[u8]>) -> Result<u64, Error> {
|
||||
|
||||
let line = get_line(src)?;
|
||||
|
||||
atoi::<u64>(line)
|
||||
.ok_or(Error::Invalid)
|
||||
atoi::<u64>(line).ok_or(Error::Invalid)
|
||||
}
|
||||
|
||||
/// Find a line
|
||||
@@ -193,7 +192,7 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
|
||||
let end = src.get_ref().len() - 1;
|
||||
|
||||
for i in start..end {
|
||||
if src.get_ref()[i] == b'\r' && src.get_ref()[i+1] == b'\n' {
|
||||
if src.get_ref()[i] == b'\r' && src.get_ref()[i + 1] == b'\n' {
|
||||
// We found a line, update the position to be *after* the \n
|
||||
src.set_position((i + 2) as u64);
|
||||
|
||||
|
||||
10
src/kv.rs
10
src/kv.rs
@@ -1,8 +1,8 @@
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::broadcast;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Kv {
|
||||
@@ -44,9 +44,7 @@ impl Kv {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
|
||||
match shared.pub_sub.entry(key) {
|
||||
Entry::Occupied(e) => {
|
||||
e.get().subscribe()
|
||||
}
|
||||
Entry::Occupied(e) => e.get().subscribe(),
|
||||
Entry::Vacant(e) => {
|
||||
let (tx, rx) = broadcast::channel(1028);
|
||||
e.insert(tx);
|
||||
@@ -58,7 +56,9 @@ impl Kv {
|
||||
pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize {
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
shared.pub_sub.get(key)
|
||||
shared
|
||||
.pub_sub
|
||||
.get(key)
|
||||
.map(|tx| tx.send(value).unwrap_or(0))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
18
src/parse.rs
18
src/parse.rs
@@ -23,11 +23,14 @@ impl Parse {
|
||||
_ => return Err(ParseError::Invalid),
|
||||
};
|
||||
|
||||
Ok(Parse { parts: array.into_iter() })
|
||||
Ok(Parse {
|
||||
parts: array.into_iter(),
|
||||
})
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Result<Frame, ParseError> {
|
||||
self.parts.next()
|
||||
self.parts
|
||||
.next()
|
||||
.map(|frame| *frame)
|
||||
.ok_or(ParseError::EndOfStream)
|
||||
}
|
||||
@@ -35,11 +38,9 @@ impl Parse {
|
||||
pub(crate) fn next_string(&mut self) -> Result<String, ParseError> {
|
||||
match self.next()? {
|
||||
Frame::Simple(s) => Ok(s),
|
||||
Frame::Bulk(data) => {
|
||||
str::from_utf8(&data[..])
|
||||
.map(|s| s.to_string())
|
||||
.map_err(|_| ParseError::Invalid)
|
||||
}
|
||||
Frame::Bulk(data) => str::from_utf8(&data[..])
|
||||
.map(|s| s.to_string())
|
||||
.map_err(|_| ParseError::Invalid),
|
||||
_ => Err(ParseError::Invalid),
|
||||
}
|
||||
}
|
||||
@@ -79,6 +80,7 @@ impl From<ParseError> for io::Error {
|
||||
EndOfStream => "end of stream".to_string(),
|
||||
Invalid => "invalid".to_string(),
|
||||
UnknownCommand(cmd) => format!("unknown command `{}`", cmd),
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Connection, Command, Kv, Shutdown};
|
||||
use crate::{Command, Connection, Kv, Shutdown};
|
||||
|
||||
use tokio::io;
|
||||
use tokio::net::TcpListener;
|
||||
@@ -90,10 +90,8 @@ impl Handler {
|
||||
|
||||
let cmd = Command::from_frame(frame)?;
|
||||
|
||||
cmd.apply(
|
||||
&self.kv,
|
||||
&mut self.connection,
|
||||
&mut self.shutdown).await?;
|
||||
cmd.apply(&self.kv, &mut self.connection, &mut self.shutdown)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user