Async in depth

At this point, we have completed a fairly comprehensive tour of asynchronous Rust and Tokio. Now we will dig deeper into Rust's asynchronous runtime model. At the very beginning of the tutorial, we hinted that asynchronous Rust takes a unique approach. Now, we explain what that means.

Futures

As a quick review, let's take a very basic asynchronous function. This is nothing new compared to what the tutorial has covered so far.

use tokio::net::TcpStream;

async fn my_async_fn() {
    println!("hello from async");
    let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
    println!("async TCP operation complete");
}

We call the function and it returns some value. We call .await on that value.

#[tokio::main]
async fn main() {
    let what_is_this = my_async_fn();
    // Nothing has been printed yet.
    what_is_this.await;
    // Text has been printed and socket has been
    // established and closed.
}

The value returned by my_async_fn() is a future. A future is a value that implements the std::future::Future trait provided by the standard library. They are values that contain the in-progress asynchronous computation.

The std::future::Future trait definition is:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

The associated type Output is the type that the future produces once it completes. The Pin type is how Rust is able to support borrows in async functions. See the standard library documentation for more details.

Unlike how futures are implemented in other languages, a Rust future does not represent a computation happening in the background, rather the Rust future is the computation itself. The owner of the future is responsible for advancing the computation by polling the future. This is done by calling Future::poll.

Implementing Future

Let's implement a very simple future. This future will:

  1. Wait until a specific instant in time.
  2. Output some text to STDOUT.
  3. Yield a string.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Async fn as a Future

In the main function, we instantiate the future and call .await on it. From async functions, we may call .await on any value that implements Future. In turn, calling an async function returns an anonymous type that implements Future. In the case of async fn main(), the generated future is roughly:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()>
    {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }
                    }
                }
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

Rust futures are state machines. Here, MainFuture is represented as an enum of the future's possible states. The future starts in the State0 state. When poll is invoked, the future attempts to advance its internal state as much as possible. If the future is able to complete, Poll::Ready is returned containing the output of the asynchronous computation.

If the future is not able to complete, usually due to resources it is waiting on not being ready, then Poll::Pending is returned. Receiving Poll::Pending indicates to the caller that the future will complete at a later time and the caller should invoke poll again later.

We also see that futures are composed of other futures. Calling poll on the outer future results in calling the inner future's poll function.

Executors

Asynchronous Rust functions return futures. Futures must have poll called on them to advance their state. Futures are composed of other futures. So, the question is, what calls poll on the very most outer future?

Recall from earlier, to run asynchronous functions, they must either be passed to tokio::spawn or be the main function annotated with #[tokio::main]. This results in submitting the generated outer future to the Tokio executor. The executor is responsible for calling Future::poll on the outer future, driving the asynchronous computation to completion.

Mini Tokio

To better understand how this all fits together, let's implement our own minimal version of Tokio! The full code can be found here.

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;

fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
        let when = Instant::now() + Duration::from_millis(10);
        let future = Delay { when };

        let out = future.await;
        assert_eq!(out, "done");
    });

    mini_tokio.run();
}

struct MiniTokio {
    tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
    fn new() -> MiniTokio {
        MiniTokio {
            tasks: VecDeque::new(),
        }
    }
    
    /// Spawn a future onto the mini-tokio instance.
    fn spawn<F>(&mut self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        self.tasks.push_back(Box::pin(future));
    }
    
    fn run(&mut self) {
        let waker = task::noop_waker();
        let mut cx = Context::from_waker(&waker);
        
        while let Some(mut task) = self.tasks.pop_front() {
            if task.as_mut().poll(&mut cx).is_pending() {
                self.tasks.push_back(task);
            }
        }
    }
}

This runs the async block. A Delay instance is created with the requested delay and is awaited on. However, our implementation so far has a major flaw. Our executor never goes to sleep. The executor continuously loops all spawned futures and polls them. Most of the time, the futures will not be ready to perform more work and will return Poll::Pending again. The process will burn CPU and generally not be very efficient.

Ideally, we want mini-tokio to only poll futures when the future is able to make progress. This happens when a resource that the task is blocked on becomes ready to perform the requested operation. If the task wants to read data from a TCP socket, then we only want to poll the task when the TCP socket has received data. In our case, the task is blocked on the given Instant being reached. Ideally, mini-tokio would only poll the task once that instant in time has passed.

To achieve this, when a resource is polled, and the resource is not ready, the resource will send a notification once it transitions into a ready state.

Wakers

Wakers are the missing piece. This is the system by which a resource is able to notify the waiting task that the resource has become ready to continue some operation.

Let's look at the Future::poll definition again:

fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;

The Context argument to poll has a waker() method. This method returns a Waker bound to the current task. The Waker has a wake() method. Calling this method signals to the executor that the associated task should be scheduled for execution. Resources call wake() when they transition to a ready state to notify the executor that polling the task will be able to make progress.

Updating Delay

We can update Delay to use wakers:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Get a handle to the waker for the current task
            let waker = cx.waker().clone();
            let when = self.when;

            // Spawn a timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

Now, once the requested duration has elapsed, the calling task is notified and the executor can ensure the task is scheduled again. The next step is to update mini-tokio to listen for wake notifications.

There are still a few remaining issues with our Delay implementation. We will fix them later.

When a future returns Poll::Pending, it must ensure that the waker is signalled at some point. Forgetting to do this results in the task hanging indefinitely.

Forgetting to wake a task after returning Poll::Pending is a common source of bugs.

Recall the first iteration of Delay. Here was the future implementation:

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

Before returning Poll::Pending, we called cx.waker().wake_by_ref(). This is to satisfy the future contract. By returning Poll::Pending, we are responsible for signalling the waker. Because we didn't implement the timer thread yet, we signalled the waker inline. Doing so will result in the future being immediately re-scheduled, executed again, and probably not be ready to complete.

Notice that you are allowed to signal the waker more often than necessary. In this particular case, we signal the waker even though we are not ready to continue the operation at all. There is nothing wrong with this besides some wasted CPU cycles. However, this particular implementation will result in a busy loop.

Updating Mini Tokio

The next step is updating Mini Tokio to receive waker notifications. We want the executor to only run tasks when they are woken, and to do this, Mini Tokio will provide its own waker. When the waker is invoked, its associated task is queued to be executed. Mini-Tokio passes this waker to the future when it polls the future.

The updated Mini Tokio will use a channel to store scheduled tasks. Channels allow tasks to be queued for execution from any thread. Wakers must be Send and Sync, so we use the channel from the crossbeam crate, as the standard library channel is not Sync.

The Send and Sync traits are marker traits related to concurrency provided by Rust. Types that can be sent to a different thread are Send. Most types are Send, but something like Rc is not. Types that can be concurrently accessed through immutable references are Sync. A type can be Send but not Sync — a good example is Cell, which can be modified through an immutable reference, and is thus not safe to access concurrently.

For more details, see the related chapter in the Rust book.

Add the following dependency to your Cargo.toml to pull in channels.

crossbeam = "0.7"

Then, update the MiniTokio struct.

use crossbeam::channel;
use std::sync::Arc;

struct MiniTokio {
    scheduled: channel::Receiver<Arc<Task>>,
    sender: channel::Sender<Arc<Task>>,
}

struct Task {
    // This will be filled in soon.
}

Wakers are Sync and can be cloned. When wake is called, the task must be scheduled for execution. To implement this, we have a channel. When the wake() is called on the waker, the task is pushed into the send half of the channel. Our Task structure will implement the wake logic. To do this, it needs to contain both the spawned future and the channel send half.

use std::sync::{Arc, Mutex};

struct Task {
    // The `Mutex` is to make `Task` implement `Sync`. Only
    // one thread accesses `future` at any given time. The
    // `Mutex` is not required for correctness. Real Tokio
    // does not use a mutex here, but real Tokio has
    // more lines of code than can fit in a single tutorial
    // page.
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
    executor: channel::Sender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) {
        self.executor.send(self.clone());
    }
}

To schedule the task, the Arc is cloned and sent through the channel. Now, we need to hook our schedule function with std::task::Waker. The standard library provides a low-level API to do this using manual vtable construction. This strategy provides maximum flexibility to implementors, but requires a bunch of unsafe boilerplate code. Instead of using RawWakerVTable directly, we will use the ArcWake utility provided by the futures crate. This allows us to implement a simple trait to expose our Task struct as a waker.

Add the following dependency to your Cargo.toml to pull in futures.

futures = "0.3"

Then implement futures::task::ArcWake.

use futures::task::ArcWake;
use std::sync::Arc;
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.schedule();
    }
}

When the timer thread above calls waker.wake(), the task is pushed into the channel. Next, we implement receiving and executing the tasks in the MiniTokio::run() function.

impl MiniTokio {
    fn run(&self) {
        while let Ok(task) = self.scheduled.recv() {
            task.poll();
        }
    }

    /// Initialize a new mini-tokio instance.
    fn new() -> MiniTokio {
        let (sender, scheduled) = channel::unbounded();

        MiniTokio { scheduled, sender }
    }

    /// Spawn a future onto the mini-tokio instance.
    ///
    /// The given future is wrapped with the `Task` harness and pushed into the
    /// `scheduled` queue. The future will be executed when `run` is called.
    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        Task::spawn(future, &self.sender);
    }
}

impl Task {
    fn poll(self: Arc<Self>) {
        // Create a waker from the `Task` instance. This
        // uses the `ArcWake` impl from above.
        let waker = task::waker(self.clone());
        let mut cx = Context::from_waker(&waker);

        // No other thread ever tries to lock the future
        let mut future = self.future.try_lock().unwrap();

        // Poll the future
        let _ = future.as_mut().poll(&mut cx);
    }

    // Spawns a new taks with the given future.
    //
    // Initializes a new Task harness containing the given future and pushes it
    // onto `sender`. The receiver half of the channel will get the task and
    // execute it.
    fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            executor: sender.clone(),
        });

        let _ = sender.send(task);
    }

}

Multiple things are happening here. First, MiniTokio::run() is implemented. The function runs in a loop receiving scheduled tasks from the channel. As tasks are pushed into the channel when they are woken, these tasks are able to make progress when executed.

Additionally, the MiniTokio::new() and MiniTokio::spawn() functions are adjusted to use a channel rather than a VecDeque. When new tasks are spawned, they are given a clone of the sender-part of the channel, which the task can use to schedule itself on the runtime.

The Task::poll() function creates the waker using the ArcWake utility from the futures crate. The waker is used to create a task::Context. That task::Context is passed to poll.

Summary

We have now seen an end-to-end example of how asynchronous Rust works. Rust's async/await feature is backed by traits. This allows third-party crates, like Tokio, to provide the execution details.

  • Asynchronous Rust operation are lazy and require a caller to poll them.
  • Wakers are passed to futures to link a future to the task calling it.
  • When a resource is not ready to complete an operation, Poll::Pending is returned and the task's waker is recorded.
  • When the resource becomes ready, the task's waker is notified.
  • The executor receives the notification and schedules the task to execute.
  • The task is polled again, this time the resource is ready and the task makes progress.

A few loose ends

Recall when we were implementing the Delay future, we said there were a few more things to fix. Rust's asynchronous model allows a single future to migrate across tasks while it executes. Consider the following:

use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
        let mut delay = delay.take().unwrap();
        let res = Pin::new(&mut delay).poll(cx);
        assert!(res.is_pending());
        tokio::spawn(async move {
            delay.await;
        });

        Poll::Ready(())
    }).await;
}

The poll_fn function creates a Future instance using a closure. The snippet above creates a Delay instance, polls it once, then sends the Delay instance to a new task where it is awaited. In this example, Delay::poll is called more than once with different Waker instances. Our earlier implementation did not handle this case and the spawned task would sleep forever, as the wrong task is notified.

When implementing a future, it is critical to assume that each call to poll could supply a different Waker instance. The poll function must update any previously recorded waker with the new one.

To fix our earlier implementation, we could do something like this:

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
    waker: Option<Arc<Mutex<Waker>>>,
}

impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // First, if this is the first time the future is called, spawn the
        // timer thread. If the timer thread is already running, ensure the
        // stored `Waker` matches the current task's waker.
        if let Some(waker) = &self.waker {
            let mut waker = waker.lock().unwrap();

            // Check if the stored waker matches the current task's waker.
            // This is necessary as the `Delay` future instance may move to
            // a different task between calls to `poll`. If this happens, the
            // waker contained by the given `Context` will differ and we
            // must update our stored waker to reflect this change.
            if !waker.will_wake(cx.waker()) {
                *waker = cx.waker().clone();
            }
        } else {
            let when = self.when;
            let waker = Arc::new(Mutex::new(cx.waker().clone()));
            self.waker = Some(waker.clone());

            // This is the first time `poll` is called, spawn the timer thread.
            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                // The duration has elapsed. Notify the caller by invoking
                // the waker.
                let waker = waker.lock().unwrap();
                waker.wake_by_ref();
            });
        }

        // Once the waker is stored and the timer thread is started, it is
        // time to check if the delay has completed. This is done by
        // checking the current instant. If the duration has elapsed, then
        // the future has completed and `Poll::Ready` is returned.
        if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            // The duration has not elapsed, the future has not completed so
            // return `Poll::Pending`.
            //
            // The `Future` trait contract requires that when `Pending` is
            // returned, the future ensures that the given waker is signalled
            // once the future should be polled again. In our case, by
            // returning `Pending` here, we are promising that we will
            // invoke the given waker included in the `Context` argument
            // once the requested duration has elapsed. We ensure this by
            // spawning the timer thread above.
            //
            // If we forget to invoke the waker, the task will hang
            // indefinitely.
            Poll::Pending
        }
    }
}

It is a bit involved, but the idea is, on each call to poll, the future checks if the supplied waker matches the previously recorded waker. If the two wakers match, then there is nothing else to do. If they do not match, then the recorded waker must be updated.

Notify utility

We demonstrated how a Delay future could be implemented by hand using wakers. Wakers are the foundation of how asynchronous Rust works. Usually, it is not necessary to drop down to that level. For example, in the case of Delay, we could implement it entirely with async/await by using the tokio::sync::Notify utility. This utility provides a basic task notification mechanism. It handles the details of wakers, including making sure that the recorded waker matches the current task.

Using Notify, we can implement a delay function using async/await like this:

use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;

async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    thread::spawn(move || {
        let now = Instant::now();

        if now < when {
            thread::sleep(when - now);
        }

        notify2.notify_one();
    });


    notify.notified().await;
}