Streams

A stream is an asynchronous series of values. It is the asynchronous equivalent to Rust's std::iter::Iterator and is represented by the Stream trait. Streams can be iterated in async functions. They can also be transformed using adapters. Tokio provides a number of common adapters on the StreamExt trait.

Tokio provides stream support in a separate crate: tokio-stream.

tokio-stream = "0.1"

Currently, Tokio's Stream utilities exist in the tokio-stream crate. Once the Stream trait is stabilized in the Rust standard library, Tokio's stream utilities will be moved into the tokio crate.

Iteration

Currently, the Rust programming language does not support async for loops. Instead, iterating streams is done using a while let loop paired with StreamExt::next().

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

Like iterators, the next() method returns Option<T> where T is the stream's value type. Receiving None indicates that stream iteration is terminated.

Mini-Redis broadcast

Let's go over a slightly more complicated example using the Mini-Redis client.

Full code can be found here.

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Publish some data
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

A task is spawned to publish messages to the Mini-Redis server on the "numbers" channel. Then, on the main task, we subscribe to the "numbers" channel and display received messages.

After subscribing, into_stream() is called on the returned subscriber. This consumes the Subscriber, returning a stream that yields messages as they arrive. Before we start iterating the messages, note that the stream is pinned to the stack using tokio::pin!. Calling next() on a stream requires the stream to be pinned. The into_stream() function returns a stream that is not pinned, we must explicitly pin it in order to iterate it.

A Rust value is "pinned" when it can no longer be moved in memory. A key property of a pinned value is that pointers can be taken to the pinned data and the caller can be confident the pointer stays valid. This feature is used by async/await to support borrowing data across .await points.

If we forget to pin the stream, we get an error like this:

error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

If you hit an error message like this, try pinning the value!

Before trying to run this, start the Mini-Redis server:

$ mini-redis-server

Then try running the code. We will see the messages outputted to STDOUT.

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

Some early messages may be dropped as there is a race between subscribing and publishing. The program never exits. A subscription to a Mini-Redis channel stays active as long as the server is active.

Let's see how we can work with streams to expand on this program.

Adapters

Functions that take a Stream and return another Stream are often called 'stream adapters', as they're a form of the 'adapter pattern'. Common stream adapters include map, take, and filter.

Lets update the Mini-Redis so that it will exit. After receiving three messages, stop iterating messages. This is done using take. This adapter limits the stream to yield at most n messages.

let messages = subscriber
    .into_stream()
    .take(3);

Running the program again, we get:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

This time the program ends.

Now, let's limit the stream to single digit numbers. We will check this by checking for the message length. We use the filter adapter to drop any message that does not match the predicate.

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

Running the program again, we get:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

Note that the order in which adapters are applied matters. Calling filter first then take is different than calling take then filter.

Finally, we will tidy up the output by stripping the Ok(Message { ... }) part of the output. This is done with map. Because this is applied after filter, we know the message is Ok, so we can use unwrap().

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

Now, the output is:

got = b"1"
got = b"3"
got = b"6"

Another option would be to combine the filter and map steps into a single call using filter_map.

There are more available adapters. See the list here.

Implementing Stream

The Stream trait is very similar to the Future trait.

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

The Stream::poll_next() function is much like Future::poll, except it can be called repeatedly to receive many values from the stream. Just as we saw in Async in depth, when a stream is not ready to return a value, Poll::Pending is returned instead. The task's waker is registered. Once the stream should be polled again, the waker is notified.

The size_hint() method is used the same way as it is with iterators.

Usually, when manually implementing a Stream, it is done by composing futures and other streams. As an example, let's build off of the Delay future we implemented in Async in depth. We will convert it to a stream that yields () three times at 10 ms intervals.

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // No more delays
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

Manually implementing streams using the Stream trait can be tedious. Unfortunately, the Rust programming language does not yet support async/await syntax for defining streams. This is in the works, but not yet ready.

The async-stream crate is available as a temporary solution. This crate provides a stream! macro that transforms the input into a stream. Using this crate, the above interval can be implemented like this:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}