Framing

We will now apply what we just learned about I/O and implement the Mini-Redis framing layer. Framing is the process of taking a byte stream and converting it to a stream of frames. A frame is a unit of data transmitted between two peers. The Redis protocol frame is as follows:

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

Note how the frame only consists of data without any semantics. The command parsing and implementation happen at a higher level.

For HTTP, a frame might look like:

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

To implement framing for Mini-Redis, we will implement a Connection struct that wraps a TcpStream and reads/writes mini_redis::Frame values.

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... other fields here
}

impl Connection {
    /// Read a frame from the connection.
    /// 
    /// Returns `None` if EOF is reached
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // implementation here
    }

    /// Write a frame to the connection.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // implementation here
    }
}

You can find the details of the Redis wire protocol here. The full Connection code is found here.

Buffered reads

The read_frame method waits for an entire frame to be received before returning. A single call to TcpStream::read() may return an arbitrary amount of data. It could contain an entire frame, a partial frame, or multiple frames. If a partial frame is received, the data is buffered and more data is read from the socket. If multiple frames are received, the first frame is returned and the rest of the data is buffered until the next call to read_frame.

To implement this, Connection needs a read buffer field. Data is read from the socket into the read buffer. When a frame is parsed, the corresponding data is removed from the buffer.

We will use BytesMut as the buffer type. This is a mutable version of Bytes.

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

Next, we implement the read_frame() method.

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // Attempt to parse a frame from the buffered data. If
        // enough data has been buffered, the frame is
        // returned.
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // There is not enough buffered data to read a frame.
        // Attempt to read more data from the socket.
        //
        // On success, the number of bytes is returned. `0`
        // indicates "end of stream".
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // The remote closed the connection. For this to be
            // a clean shutdown, there should be no data in the
            // read buffer. If there is, this means that the
            // peer closed the socket while sending a frame.
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

Let's break this down. The read_frame method operates in a loop. First, self.parse_frame() is called. This will attempt to parse a redis frame from self.buffer. If there is enough data to parse a frame, the frame is returned to the caller of read_frame().Otherwise, we attempt to read more data from the socket into the buffer. After reading more data, parse_frame() is called again. This time, if enough data has been received, parsing may succeed.

When reading from the stream, a return value of 0 indicates that no more data will be received from the peer. If the read buffer still has data in it, this indicates a partial frame has been received and the connection is being terminated abruptly. This is an error condition and Err is returned.

The Buf trait

When reading from the stream, read_buf is called. This version of the read function takes a value implementing BufMut from the bytes crate.

First, consider how we would implement the same read loop using read(). Vec<u8> could be used instead of BytesMut.

use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

And the read_frame() function on Connection:

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

When working with byte arrays and read, we must also maintain a cursor tracking how much data has been buffered. We must make sure to pass the empty portion of the buffer to read(). Otherwise, we would overwrite buffered data. If our buffer gets filled up, we must grow the buffer in order to keep reading. In parse_frame() (not included), we would need to parse data contained by self.buffer[..self.cursor].

Because pairing a byte array with a cursor is very common, the bytes crate provides an abstraction representing a byte array and cursor. The Buf trait is implemented by types from which data can be read. The BufMut trait is implemented by types into which data can be written. When passing a T: BufMut to read_buf(), the buffer's internal cursor is automatically updated by read_buf. Because of this, in our version of read_frame, we do not need to manage our own cursor.

Additionally, when using Vec<u8>, the buffer must be initialized. vec![0; 4096] allocates an array of 4096 bytes and writes zero to every entry. When resizing the buffer, the new capacity must also be initialized with zeros. The initialization process is not free. When working with BytesMut and BufMut, capacity is uninitialized. The BytesMut abstraction prevents us from reading the uninitialized memory. This lets us avoid the initialization step.

Parsing

Now, let's look at the parse_frame() function. Parsing is done in two steps.

  1. Ensure a full frame is buffered and find the end index of the frame.
  2. Parse the frame.

The mini-redis crate provides us with a function for both of these steps:

  1. Frame::check
  2. Frame::parse

We will also reuse the Buf abstraction to help. A Buf is passed into Frame::check. As the check function iterates the passed in buffer, the internal cursor will be advanced. When check returns, the buffer's internal cursor points to the end of the frame.

For the Buf type, we will use std::io::Cursor<&[u8]>.

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Create the `T: Buf` type.
    let mut buf = Cursor::new(&self.buffer[..]);

    // Check whether a full frame is available
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;

            // Reset the internal cursor for the
            // call to `parse`.
            buf.set_position(0);

            // Parse the frame
            let frame = Frame::parse(&mut buf)?;

            // Discard the frame from the buffer
            self.buffer.advance(len);

            // Return the frame to the caller.
            Ok(Some(frame))
        }
        // Not enough data has been buffered
        Err(Incomplete) => Ok(None),
        // An error was encountered
        Err(e) => Err(e.into()),
    }
}

The full Frame::check function can be found here. We will not cover it in its entirety.

The relevant thing to note is that Buf's "byte iterator" style APIs are used. These fetch data and advance the internal cursor. For example, to parse a frame, the first byte is checked to determine the type of the frame. The function used is Buf::get_u8. This fetches the byte at the current cursor's position and advances the cursor by one.

There are more useful methods on the Buf trait. Check the API docs for more details.

Buffered writes

The other half of the framing API is the write_frame(frame) function. This function writes an entire frame to the socket. In order to minimize write syscalls, writes will be buffered. A write buffer is maintained and frames are encoded to this buffer before being written to the socket. However, unlike read_frame(), the entire frame is not always buffered to a byte array before writing to the socket.

Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes). The wire format of a bulk frame is a frame head, which consists of the $ character followed by the data length in bytes. The majority of the frame is the contents of the Bytes value. If the data is large, copying it to an intermediate buffer would be costly.

To implement buffered writes, we will use the BufWriter struct. This struct is initialized with a T: AsyncWrite and implements AsyncWrite itself. When write is called on BufWriter, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.

We will not attempt a full implementation of write_frame() as part of the tutorial. See the full implementation here.

First, the Connection struct is updated:

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

Next, write_frame() is implemented.

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_value(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

The functions used here are provided by AsyncWriteExt. They are available on TcpStream as well, but it would not be advisable to issue single byte writes without the intermediate buffer.

The function ends with a call to self.stream.flush().await. Because BufWriter stores writes in an intermediate buffer, calls to write do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush() writes any data pending in the buffer to the socket.

Another alternative would be to not call flush() in write_frame(). Instead, provide a flush() function on Connection. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write syscall. Doing this complicates the Connection API. Simplicity is one of Mini-Redis' goals, so we decided to include the flush().await call in fn write_frame().