Reading and Writing Data

Non-blocking I/O

In the overview we mentioned briefly that Tokio’s I/O types implement non-blocking variants of std::io::Read and std::io::Write called AsyncRead and AsyncWrite. These are an integral part of Tokio’s I/O story, and are important to understand when working with I/O code.

Note: in this section, we’ll primarily talk about AsyncRead, but AsyncWrite is pretty much exactly the same, just for writing data to an I/O resource (like a TCP socket) instead of reading from it.

So, let’s take a look at AsyncRead and see what all the fuss is about:

use std::io::Read;
pub trait AsyncRead: Read {
    // ...
    // various provided methods
    // ...
}

Huh. What’s going on here? Well, AsyncRead is really just Read from std::io, along with an additional contract. The documentation for AsyncRead reads:

This trait inherits from std::io::Read and indicates that an I/O object is non-blocking. All non-blocking I/O objects must return an error when bytes are unavailable instead of blocking the current thread.

That last part is critical. If you implement AsyncRead for a type, you are promising that calling read on it will never block. Instead, you are expected to return an io::ErrorKind::WouldBlock error to indicate that the operation would have blocked (for example because there was no more data available) if it wasn’t non-blocking. The provided poll_read method relies on this:

fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std::io::Error> {
    match self.read(buf) {
        Ok(t) => Ok(Async::Ready(t)),
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
            Ok(Async::NotReady)
        }
        Err(e) => Err(e),
    }
}

This code should look familiar. If you squint a little, poll_read looks a lot like Future::poll. And that’s because that’s almost exactly what it is! A type that implements AsyncRead essentially behaves like a future that you can try to read data out of, and it will inform you whether it is Ready (and some data was read) or NotReady (and you’ll have to poll_read again later).

Working with I/O futures

Since AsyncRead (and AsyncWrite) are pretty much futures, you can easily embed them in your own futures and poll_read them just as you would poll any other embedded Future. You can even use try_ready! to propagate errors and NotReady as appropriate. We’ll talk more about directly using these traits in the next section. However, to make life simpler in a number of situations, Tokio provides a number of useful combinators in tokio::io for performing common I/O operations on top of AsyncRead and AsyncWrite. In general, these provide wrappers around AsyncRead or AsyncWrite types that implement Future and that complete when a given read or write operation has completed.

The first handy I/O combinator is read_exact. It takes a mutable buffer (&mut [u8]) and an implementor of AsyncRead as arguments, and returns a Future that reads exactly enough bytes to fill the buffer. Internally the returned future just keeps track of how many bytes it has read thus far, and continues to issue poll_ready on the AsyncRead (returning NotReady if necessary) until it has exactly filled the buffer. At that point, it returns Ready(buf) with the filled buffer. Let’s take a look:

# extern crate tokio;
use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

# fn main() {
let addr = "127.0.0.1:12345".parse().unwrap();
let read_8_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // We need to create a buffer for read_exact to write into.
        // A Vec<u8> is a good starting point.
        // read_exact will read buffer.len() bytes, so we need
        // to make sure the Vec isn't empty!
        let mut buf = vec![0; 8];

        // read_exact returns a Future that resolves when
        // buffer.len() bytes have been read from stream.
        tokio::io::read_exact(stream, buf)
    })
    .inspect(|(_stream, buf)| {
        // Notice that we get both the buffer and the stream back
        // here, so that we can now continue using the stream to
        // send a reply for example.
        println!("got eight bytes: {:x?}", buf);
    });

// We can now either chain more futures onto read_8_fut,
// or if all we wanted to do was read and print those 8
// bytes, we can just use tokio::run to run it (taking
// care to map Future::Item and Future::Error to ()).
# }

A second I/O combinator that is often useful is write_all. It takes a buffer (&[u8]) and an implementor of AsyncWrite as arguments, and returns a Future that writes out all the bytes of the buffer into the AsyncWrite using poll_write. When the Future resolves, the entire buffer has been written out and flushed. We can combine this with read_exact to echo whatever the server says back to it:

# extern crate tokio;
use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

# fn main() {
# let addr = "127.0.0.1:12345".parse().unwrap();
let echo_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // We're going to read the first 32 bytes the server sends us
        // and then just echo them back:
        let mut buf = vec![0; 32];
        // First, we need to read the server's message
        tokio::io::read_exact(stream, buf)
    })
    .and_then(|(stream, buf)| {
        // Then, we use write_all to write the entire buffer back:
        tokio::io::write_all(stream, buf)
    })
    .inspect(|(_stream, buf)| {
        println!("echoed back {} bytes: {:x?}", buf.len(), buf);
    });

// As before, we can chain more futures onto echo_fut,
// or declare ourselves finished and run it with tokio::run.
# }

Tokio also comes with an I/O combinator to implement this kind of copying. It is (perhaps unsurprisingly) called copy. copy takes an AsyncRead and an AsyncWrite, and continuously writes all the bytes read out from the AsyncRead into the AsyncWrite until poll_read indicates that the input has been closed and all the bytes have been written out and flushed to the output. This is the combinator we used in our echo server! It greatly simplifies our example from above, and also makes it work for any amount of server data!

# extern crate tokio;
use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

# fn main() {
# let addr = "127.0.0.1:12345".parse().unwrap();
let echo_fut = TcpStream::connect(&addr)
    .and_then(|stream| {
        // First, we need to get a separate read and write handle for
        // the connection so that we can forward one to the other.
        // See "Split I/O resources" below for more details.
        let (reader, writer) = stream.split();
        // Then, we can use copy to send all the read bytes to the
        // writer, and return how many bytes it read/wrote.
        tokio::io::copy(reader, writer)
    })
    .inspect(|(bytes_copied, r, w)| {
        println!("echoed back {} bytes", bytes_copied);
    });
# }

Pretty neat!

The combinators we’ve talked about so far are all for pretty low-level operations: read these bytes, write these bytes, copy these bites. Often times though, you want to operate on higher-level representations, like “lines”. Tokio has you covered there too! lines takes an AsyncRead, and returns a Stream that yields each line from the input until there are no more lines to read:

# extern crate tokio;
use tokio::net::tcp::TcpStream;
use tokio::prelude::*;

# fn main() {
# let addr = "127.0.0.1:12345".parse().unwrap();
let lines_fut = TcpStream::connect(&addr).and_then(|stream| {
    // We want to parse out each line we receive on stream.
    // To do that, we may need to buffer input for a little while
    // (if the server sends two lines in one packet for example).
    // Because of that, lines requires that the AsyncRead it is
    // given *also* implements BufRead. This may be familiar if
    // you've ever used the lines() method from std::io::BufRead.
    // Luckily, BufReader from the standard library gives us that!
    let stream = std::io::BufReader::new(stream);
    tokio::io::lines(stream).for_each(|line| {
        println!("server sent us the line: {}", line);
        // This closure is called for each line we receive,
        // and returns a Future that represents the work we
        // want to do before accepting the next line.
        // In this case, we just wanted to print, so we
        // don't need to do anything more.
        Ok(())
    })
});
# }

There are also plenty more I/O combinators in tokio::io that you may want to take a look at before you decide to write your own!

Split I/O resources

Both the copy example above and the echo server contained this mysterious-looking snippet:

let (reader, writer) = socket.split();
let bytes_copied = tokio::io::copy(reader, writer);

As the comment above it explains, we split the TcpStream (socket) into a read “half” and a write “half”, and use the copy combinator we discussed above to produce a Future that asynchronously copies all the data from the read half to the write half. But why is this “split” required in the first place? After all, AsyncRead::poll_ready and AsyncWrite::poll_write just take &mut self.

To answer that, we need to think back to Rust’s ownership system a little. Recall that Rust only allows you to have a single mutable reference to a given variable at a time. But we have to pass two arguments to copy, one for where to read from, and one for where to write to. However, once we pass a mutable reference to the TcpStream as one of the arguments, we cannot also construct a second mutable reference to it to pass as the second argument! We know that copy won’t read and write at the same time to those, but that’s not expressed in copy’s types.

Enter split, a provided method on the AsyncRead trait when the type also implements AsyncWrite. If we look at the signature, we see

fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
  where Self: AsyncWrite { ... }

The returned ReadHalf implements AsyncRead, and the WriteHalf implements AsyncWrite. And crucially, we now have two separate pointers into our type, which we can pass around separately. This comes in handy for copy, but it also means that we can pass each half to a different future, and handle the reads and writes completely independently! Behind the scenes, split ensures that if we both try to read and write at the same time, only one of them happen at a time.

Transports

Turning an AsyncRead into a Stream (like lines does) or an AsyncWrite into a Sink is pretty common in applications that need to do I/O. They often want to abstract away the way bytes are retrieved from or put on the wire, and let most of their application code deal with more convenient “requests” and “response” types. This is often known as “framing”: instead of viewing your connections as consisting of just bytes in/bytes out, you view them as “frames” of application data that are received and sent. A framed stream of bytes is often referred to as a “transport”.

Transports are typically implemented using a codec. For example, lines represents a very simple codec that separates a byte string by the newline character, \n, and parses each frame as a string before passing it to the application. Tokio provides helpers for implementing new codecs in tokio::codec; you implement the Encoder and Decoder traits for your transport, and use Framed::new to make a Sink + Stream from your byte stream (like a TcpStream). It’s almost like magic! There are versions for doing just the read or write side of a codec too (like lines). Let’s take a look at writing a simple implementation of a line-based codec (even though LinesCodec exists):

# extern crate tokio;
extern crate bytes;
use bytes::{BufMut, BytesMut};
use tokio::codec::{Decoder, Encoder};
use tokio::prelude::*;

// This is where we'd keep track of any extra book-keeping information
// our transport needs to operate.
struct LinesCodec;

// Turns string errors into std::io::Error
fn bad_utf8<E>(_: E) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::InvalidData, "Unable to decode input as UTF8")
}

// First, we implement encoding, because it's so straightforward.
// Just write out the bytes of the string followed by a newline!
// Easy-peasy.
impl Encoder for LinesCodec {
    type Item = String;
    type Error = std::io::Error;

    fn encode(&mut self, line: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
        // Note that we're given a BytesMut here to write into.
        // BytesMut comes from the bytes crate, and aims to give
        // efficient read/write access to a buffer. To use it,
        // we have to reserve memory before we try to write to it.
        buf.reserve(line.len() + 1);
        // And now, we write out our stuff!
        buf.put(line);
        buf.put_u8(b'\n');
        Ok(())
    }
}

// The decoding is a little trickier, because we need to look for
// newline characters. We also need to handle *two* cases: the "normal"
// case where we're just asked to find the next string in a bunch of
// bytes, and the "end" case where the input has ended, and we need
// to find any remaining strings (the last of which may not end with a
// newline!
impl Decoder for LinesCodec {
    type Item = String;
    type Error = std::io::Error;

    // Find the next line in buf!
    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        Ok(if let Some(offset) = buf.iter().position(|b| *b == b'\n') {
            // We found a newline character in this buffer!
            // Cut out the line from the buffer so we don't return it again.
            let mut line = buf.split_to(offset + 1);
            // And then parse it as UTF-8
            Some(
                std::str::from_utf8(&line[..line.len() - 1])
                    .map_err(bad_utf8)?
                    .to_string(),
            )
        } else {
            // There are no newlines in this buffer, so no lines to speak of.
            // Tokio will make sure to call this again when we have more bytes.
            None
        })
    }

    // Find the next line in buf when there will be no more data coming.
    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        Ok(match self.decode(buf)? {
            Some(frame) => {
                // There's a regular line here, so we may as well just return that.
                Some(frame)
            },
            None => {
                // There are no more lines in buf!
                // We know there are no more bytes coming though,
                // so we just return the remainder, if any.
                if buf.is_empty() {
                    None
                } else {
                    Some(
                        std::str::from_utf8(&buf.take()[..])
                            .map_err(bad_utf8)?
                            .to_string(),
                    )
                }
            }
        })
    }
}