Files
mini-redis/src/frame.rs
2020-02-11 13:27:49 -08:00

224 lines
5.6 KiB
Rust

use tokio::io;
use bytes::{Buf, Bytes};
use std::convert::TryInto;
use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;
#[derive(Debug)]
pub(crate) enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Box<Frame>>),
}
pub(crate) enum Error {
/// Not enough data is available to parse a message
Incomplete,
/// Invalid message encoding
Invalid,
}
impl Frame {
/// Returns an empty array
pub(crate) fn array() -> Frame {
Frame::Array(vec![])
}
/// Push a "bulk" frame into the array. `self` must be an Array frame
///
/// # Panics
///
/// panics if `self` is not an array
pub(crate) fn push_bulk(&mut self, bytes: Bytes) {
match self {
Frame::Array(vec) => {
vec.push(Box::new(Frame::Bulk(bytes)));
}
_ => panic!("not an array frame"),
}
}
/// Checks if an entire message can be decoded from `src`
pub(crate) fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> {
match get_u8(src)? {
b'+' => {
get_line(src)?;
Ok(())
}
b'-' => {
get_line(src)?;
Ok(())
}
b':' => {
let _ = get_decimal(src)?;
Ok(())
}
b'$' => {
if b'-' == peek_u8(src)? {
// Skip '-1\r\n'
skip(src, 4)
} else {
// Read the bulk string
let len: usize = get_decimal(src)?.try_into()?;
// skip that number of bytes + 2 (\r\n).
skip(src, len + 2)
}
}
b'*' => {
let len = get_decimal(src)?;
for _ in 0..len {
Frame::check(src)?;
}
Ok(())
}
_ => unimplemented!(),
}
}
/// The message has already been validated with `scan`.
pub(crate) fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> {
match get_u8(src)? {
b'+' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
// Convert the line to a String
let string = String::from_utf8(line)?;
Ok(Frame::Simple(string))
}
b'-' => {
// Read the line and convert it to `Vec<u8>`
let line = get_line(src)?.to_vec();
// Convert the line to a String
let string = String::from_utf8(line)?;
Ok(Frame::Error(string))
}
b':' => {
let len = get_decimal(src)?;
Ok(Frame::Integer(len))
}
b'$' => {
if b'-' == peek_u8(src)? {
let line = get_line(src)?;
if line != b"-1\r\n" {
return Err(Error::Invalid);
}
Ok(Frame::Null)
} else {
// Read the bulk string
let len = get_decimal(src)?.try_into()?;
let n = len + 2;
if src.remaining() < n {
return Err(Error::Incomplete);
}
let data = Bytes::copy_from_slice(&src.bytes()[..len]);
// skip that number of bytes + 2 (\r\n).
skip(src, n)?;
Ok(Frame::Bulk(data))
}
}
b'*' => {
let len = get_decimal(src)?.try_into()?;
let mut out = Vec::with_capacity(len);
for _ in 0..len {
out.push(Box::new(Frame::parse(src)?));
}
Ok(Frame::Array(out))
}
_ => unimplemented!(),
}
}
}
fn peek_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() {
return Err(Error::Incomplete);
}
Ok(src.bytes()[0])
}
fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !src.has_remaining() {
return Err(Error::Incomplete);
}
Ok(src.get_u8())
}
fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> {
if src.remaining() < n {
return Err(Error::Incomplete);
}
src.advance(n);
Ok(())
}
/// Read a new-line terminated decimal
fn get_decimal(src: &mut Cursor<&[u8]>) -> Result<u64, Error> {
use atoi::atoi;
let line = get_line(src)?;
atoi::<u64>(line).ok_or(Error::Invalid)
}
/// Find a line
fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], Error> {
// Scan the bytes directly
let start = src.position() as usize;
// Scan to the second to last byte
let end = src.get_ref().len() - 1;
for i in start..end {
if src.get_ref()[i] == b'\r' && src.get_ref()[i + 1] == b'\n' {
// We found a line, update the position to be *after* the \n
src.set_position((i + 2) as u64);
// Return the line
return Ok(&src.get_ref()[start..i]);
}
}
Err(Error::Incomplete)
}
impl From<Error> for io::Error {
fn from(_src: Error) -> io::Error {
unimplemented!();
}
}
impl From<FromUtf8Error> for Error {
fn from(_src: FromUtf8Error) -> Error {
unimplemented!();
}
}
impl From<TryFromIntError> for Error {
fn from(_src: TryFromIntError) -> Error {
unimplemented!();
}
}