diff --git a/src/connection.rs b/src/connection.rs
index 19a7274..91535dc 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -57,66 +57,11 @@ impl Connection {
/// is closed in a way that doesn't break a frame in half, it returns
/// `None`. Otherwise, an error is returned.
pub async fn read_frame(&mut self) -> crate::Result> {
- use frame::Error::Incomplete;
-
loop {
- // Cursor is used to track the "current" location in the
- // buffer. Cursor also implements `Buf` from the `bytes` crate
- // which provides a number of helpful utilities for working
- // with bytes.
- let mut buf = Cursor::new(&self.buffer[..]);
-
- // The first step is to check if enough data has been buffered to
- // parse a single frame. This step is usually much faster than doing
- // a full parse of the frame, and allows us to skip allocating data
- // structures to hold the frame data unless we know the full frame
- // has been received.
- match Frame::check(&mut buf) {
- Ok(_) => {
- // The `check` function will have advanced the cursor until
- // the end of the frame. Since the cursor had position set
- // to zero before `Frame::check` was called, we obtain the
- // length of the frame by checking the cursor position.
- let len = buf.position() as usize;
-
- // Reset the position to zero before passing the cursor to
- // `Frame::parse`.
- buf.set_position(0);
-
- // Parse the frame from the buffer. This allocates the
- // necessary structures to represent the frame and returns
- // the frame value.
- //
- // If the encoded frame representation is invalid, an error
- // is returned. This should terminate the **current**
- // connection but should not impact any other connected
- // client.
- let frame = Frame::parse(&mut buf)?;
-
- // Discard the parsed data from the read buffer.
- //
- // When `advance` is called on the read buffer, all of the
- // data up to `len` is discarded. The details of how this
- // works is left to `BytesMut`. This is often done by moving
- // an internal cursor, but it may be done by reallocating
- // and copying data.
- self.buffer.advance(len);
-
- // Return the parsed frame to the caller.
- return Ok(Some(frame));
- }
- // There is not enough data present in the read buffer to parse
- // a single frame. We must wait for more data to be received
- // from the socket. Reading from the socket will be done in the
- // statement after this `match`.
- //
- // We do not want to return `Err` from here as this "error" is
- // an expected runtime condition.
- Err(Incomplete) => {}
- // An error was encountered while parsing the frame. The
- // connection is now in an invalid state. Returning `Err` from
- // here will result in the connection being closed.
- Err(e) => return Err(e.into()),
+ // Attempt to parse a frame from the buffered data. If enough data
+ // has been buffered, the frame is returned.
+ if let Some(frame) = self.parse_frame()? {
+ return Ok(Some(frame));
}
// There is not enough buffered data to read a frame. Attempt to
@@ -138,6 +83,71 @@ impl Connection {
}
}
+ /// Tries to parse a frame from the buffer. If the buffer contains enough
+ /// data, the frame is returned and the data removed from the buffer. If not
+ /// enough data has been buffered yet, `Ok(None)` is returned. If the
+ /// buffered data does not represent a valid frame, `Err` is returned.
+ fn parse_frame(&mut self) -> crate::Result > {
+ use frame::Error::Incomplete;
+
+ // Cursor is used to track the "current" location in the
+ // buffer. Cursor also implements `Buf` from the `bytes` crate
+ // which provides a number of helpful utilities for working
+ // with bytes.
+ let mut buf = Cursor::new(&self.buffer[..]);
+
+ // The first step is to check if enough data has been buffered to parse
+ // a single frame. This step is usually much faster than doing a full
+ // parse of the frame, and allows us to skip allocating data structures
+ // to hold the frame data unless we know the full frame has been
+ // received.
+ match Frame::check(&mut buf) {
+ Ok(_) => {
+ // The `check` function will have advanced the cursor until the
+ // end of the frame. Since the cursor had position set to zero
+ // before `Frame::check` was called, we obtain the length of the
+ // frame by checking the cursor position.
+ let len = buf.position() as usize;
+
+ // Reset the position to zero before passing the cursor to
+ // `Frame::parse`.
+ buf.set_position(0);
+
+ // Parse the frame from the buffer. This allocates the necessary
+ // structures to represent the frame and returns the frame
+ // value.
+ //
+ // If the encoded frame representation is invalid, an error is
+ // returned. This should terminate the **current** connection
+ // but should not impact any other connected client.
+ let frame = Frame::parse(&mut buf)?;
+
+ // Discard the parsed data from the read buffer.
+ //
+ // When `advance` is called on the read buffer, all of the data
+ // up to `len` is discarded. The details of how this works is
+ // left to `BytesMut`. This is often done by moving an internal
+ // cursor, but it may be done by reallocating and copying data.
+ self.buffer.advance(len);
+
+ // Return the parsed frame to the caller.
+ Ok(Some(frame))
+ }
+ // There is not enough data present in the read buffer to parse a
+ // single frame. We must wait for more data to be received from the
+ // socket. Reading from the socket will be done in the statement
+ // after this `match`.
+ //
+ // We do not want to return `Err` from here as this "error" is an
+ // expected runtime condition.
+ Err(Incomplete) => Ok(None),
+ // An error was encountered while parsing the frame. The connection
+ // is now in an invalid state. Returning `Err` from here will result
+ // in the connection being closed.
+ Err(e) => Err(e.into()),
+ }
+ }
+
/// Write a single `Frame` value to the underlying stream.
///
/// The `Frame` value is written to the socket using the various `write_*`