Runtime model

Applications written using Tokio are organized across a large number of small, non-blocking tasks. A Tokio task is similar to a goroutine or an Erlang process, but is non-blocking. They are designed to be lightweight, can be spawned fast, and maintain low scheduling overhead. They are also non-blocking, as such operations that are not able to finish immediately must still return immediately. Instead of returning the result of the operation, they return a value indicating that the operation is in progress.

Non-blocking execution

A Tokio task is implemented using the Future trait:

# extern crate futures;
# use futures::*;
# type MyResource = future::FutureResult<(), ()>;
struct MyTask {
    my_resource: MyResource,
}
# impl MyTask {
#     fn process(&self, _: ()) {}
#     fn process_err(&self, _: ()) {}
# }

impl Future for MyTask {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.my_resource.poll() {
            Ok(Async::Ready(value)) => {
                self.process(value);
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => {
                self.process_err(err);
                Ok(Async::Ready(()))
            }
        }
    }
}

Tasks are submitted to an executor using tokio::spawn or by calling a spawn method on an executor object. The poll function drives the task. No work is done without calling poll. It is the executor’s job to call poll on the task until Ready(()) is returned.

MyTask will receive a value from my_resource and process it. Once the value has been processed, the task has completed its logic and is done. This is represented by returning Ok(Async::Ready(())).

However, in order to complete processing, the task depends on my_resource providing a value. Given that my_resource is a non-blocking task, it may or may not be ready to provide the value when my_resource.poll() is called. If it is ready, it returns Ok(Async::Ready(value)). If it is not ready, it returns Ok(Async::NotReady).

When the resource is not ready to provide a value, this implies that the task itself is not ready to complete and the task’s poll function returns NotReady as well.

At some point in the future, the resource will become ready to provide the value. The resource uses the task system to signal to the executor that it is ready. The executor schedules the task, which leads to MyTask::poll being called again. This time, given that my_resource is ready, the value will be returned from my_resource.poll() and the task is able to complete.

Cooperative scheduling

Cooperative scheduling is used to schedule tasks on executors. A single executor is expected to manage many tasks across a small set of threads. There will be a far greater number of tasks than threads. There also is no pre-emption. This means that when a task is scheduled to execute, it blocks the current thread until the poll function returns.

Because of this, it is important for implementations of poll to only execute for very short periods of time. For I/O bound applications, this usually happens automatically. However, if a task must run a longer computation, it should defer work to a blocking pool or break up the computation into smaller chunks and yield back to the executor after each chunk.

Task system

The task system is the system by which resources notify executors of readiness changes. A task is composed of non-blocking logic that consume resources. In the example above, MyTask uses a single resource, my_resource, but there is no limit to the number of resources that a task can consume.

When a task is executing and attempts to use a resource that is not ready, it becomes logically blocked on that resource, i.e., the task is not able to make further progress until that resource becomes ready. Tokio tracks which resources a task is currently blocked on to make forward progress. When a dependent resource becomes ready, the executor schedules the task. This is done by tracking when a task expresses interest in a resource.

When MyTask executes, attempts to consume my_resource, and my_resource returns NotReady, MyTask has implicitly expressed interest in the my_resource resource. At this point the task and the resource are linked. When the resource becomes ready, the task is scheduled again.

task::current and Task::notify

Tracking interest and notifying readiness changes is done with two APIs:

When my_resource.poll() is called, if the resource is ready, it immediately returns the value without using the task system. If the resource is not ready, it gets a handle to the current task by calling task::current() -> Task. This handle is obtained by reading a thread-local variable set by the executor.

Some external event (data received on the network, background thread completing a computation, etc…) will result in my_resource becoming ready to produce its value. At that point, the logic that readies my_resource will call notify on the task handle obtained from task::current. This signals the readiness change to the executor, which then schedules the task for execution.

If multiple tasks have expressed interest in a resource, only the last task to have done so will be notified. Resources are intended to be used from a single task only.

Async::NotReady

Any function that returns Async must adhere to the contract. When NotReady is returned, the current task must have been registered for notification on readiness change. The implication for resources is discussed in the above section. For task logic, this means that NotReady cannot be returned unless a resource has returned NotReady. By doing this, the contract transitively upheld. The current task is registered for notification because NotReady has been received from the resource.

Great care must be taken to avoiding returning NotReady without having received NotReady from a resource. For example, the following task implementation results in the task never completing.

# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
use futures::{Future, Poll, Async};

# type Resource1 = futures::future::FutureResult<(), ()>;
# struct Resource2;
# impl Resource2 {
#     fn new(_: ()) -> Self { Resource2 }
# }
# impl Future for Resource2 {
#    type Item = ();
#    type Error = ();
#    fn poll(&mut self) -> Poll<(), ()> { unimplemented!(); }
# }
enum BadTask {
    First(Resource1),
    Second(Resource2),
}

impl Future for BadTask {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        use self::BadTask::*;
        let value = match *self {
            First(ref mut resource) => {
                try_ready!(resource.poll())
            }
            Second(ref mut resource) => {
                try_ready!(resource.poll());
                return Ok(Async::Ready(()));
            }
        };

        *self = Second(Resource2::new(value));
        Ok(Async::NotReady)
    }
}
# fn main() {}

The problem with the above implementation is that Ok(Async::NotReady) is returned right after transitioning the state to Second. During this transition, no resource has returned NotReady. When the task itself returns NotReady, it has violated the contract as the task will not be notified in the future.

This situation is generally resolved by adding a loop:

# #![deny(deprecated)]
# #[macro_use]
# extern crate futures;
use futures::{Future, Poll, Async};

# type Resource1 = futures::future::FutureResult<(), ()>;
# struct Resource2;
# impl Resource2 {
#     fn new(_: ()) -> Self { Resource2 }
# }
# impl Future for Resource2 {
#    type Item = ();
#    type Error = ();
#    fn poll(&mut self) -> Poll<(), ()> { unimplemented!(); }
# }
# enum BadTask {
#     First(Resource1),
#     Second(Resource2),
# }
# impl Future for BadTask {
# type Item = ();
# type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    use self::BadTask::*;
    loop {
        let value = match *self {
            First(ref mut resource) => {
                try_ready!(resource.poll())
            }
            Second(ref mut resource) => {
                try_ready!(resource.poll());
                return Ok(Async::Ready(()));
            }
        };

        *self = Second(Resource2::new(value));
    }
}
# }
# fn main() {}

One way to think about it is that a task’s poll function must not return until it is unable to make any further progress due to its resources not being ready or it explicitly yields (see below).

Also note that functions that return Async must only be called from a task. In other words, these functions may only be called from code that has been submitted to tokio::spawn or other task spawn function.

Yielding

Sometimes a task must return NotReady without being blocked on a resource. This usually happens when computation to run is large and the task wants to return control to the executor to allow it to execute other futures.

Yielding is done by notifying the current task and returning NotReady:

# extern crate futures;
use futures::task;
use futures::Async;

# fn poll_dox() -> Result<Async<()>, ()> {
// Yield the current task. The executor will poll this task next
// iteration through its run list.
task::current().notify();
return Ok(Async::NotReady);
# }

Yield can be used to break up a CPU expensive computation:

# extern crate futures;
# use futures::*;
struct Count {
    remaining: usize,
}

impl Future for Count {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while self.remaining > 0 {
            self.remaining -= 1;

            // Yield every 10 iterations
            if self.remaining % 10 == 0 {
                task::current().notify();
                return Ok(Async::NotReady);
            }
        }

        Ok(Async::Ready(()))
    }
}

Executors

Executors are responsible for driving many tasks to completion. A task is spawned onto an executor, at which point the executor calls its poll function when needed. The executor hooks into the task system to receive resource readiness notifications.

By decoupling the task system with the executor implementation, the specific execution and scheduling logic can be left to the executor implementation. Tokio provides two executor implementations, each with unique characteristics: current_thread and thread_pool.

When a task is first spawned onto the executor, the executor wraps it with Spawn. This binds the task logic with the task state (this is mostly required for legacy reasons). Executors will typically store the task on the heap, usually by storing it in a Box or an Arc. When the executor picks a task for execution, it calls Spawn::poll_future_notify. This function ensures that the task context is set to the thread-local variable such that task::current is able to read it.

When calling poll_future_notify, the executor also passes in a notify handle and an identifier. These arguments are included in the task handle returned by task::current and are how the task is linked to the executor.

The notify handle is an implementation of Notify and the identifier is a value that the executor uses to look up the current task. When Task::notify is called, the notify function on the notify handle is called with the supplied identifier. The implementation of this function is responsible for performing the scheduling logic.

One strategy for implementing an executor is to store each task in a Box and to use a linked list to track tasks that are scheduled for execution. When Notify::notify is called, then the task associated with the identifier is pushed at the end of the scheduled linked list. When the executor runs, it pops from the front of the linked list and executes the task as described above.

Note that this section does not describe how the executor is run. The details of this are left to the executor implementation. One option is for the executor to spawn one or more threads and dedicate these threads to draining the scheduled linked list. Another is to provide a MyExecutor::run function that blocks the current thread and drains the scheduled linked list.

Resources, drivers, and runtimes

Resources are leaf futures, i.e. futures that are not implemented in terms of other futures. They are the types that use the task system described above to interact with the executor. Resource types include TCP and UDP sockets, timers, channels, file handles, etc. Tokio applications rarely need to implement resources. Instead, they use resources provided by Tokio or third party crates.

Oftentimes, a resource cannot function by itself and requires a driver. For example, Tokio TCP sockets are backed by a Reactor. The reactor is the socket resource driver. A single driver may power large numbers of resource instances. In order to use the resource, the driver must be running somewhere in the process. Tokio provides drivers for network resources (tokio-reactor), file resources (tokio-fs), and timers (tokio-timer). Providing decoupled driver components allows users to pick and choose which components they wish to use. Each driver can be used standalone or combined with other drivers.

Because of this, in order to use Tokio and successfully execute tasks, an application must start an executor and the necessary drivers for the resources that the application’s tasks depend on. This requires significant boilerplate. To manage the boilerplate, Tokio offers a couple of runtime options. A runtime is an executor bundled with all necessary drivers to power Tokio’s resources. Instead of managing all the various Tokio components individually, a runtime is created and started in a single call.

Tokio offers a concurrent runtime and a single-threaded runtimee. The concurrent runtime is backed by a multi-threaded, work-stealing executor. The single-threaded runtime executes all tasks and drivers on thee current thread. The user may pick the runtime with characteristics best suited for the application.

Future

As mentioned above, tasks are implemented using the Future trait. This trait is not limited to implementing tasks. A Future is a value that represents a non-blocking computation that will complete sometime in the future. A task is a computation with no output. Many resources in Tokio are represented with Future implementations. For example, a timeout is a Future that completes once the deadline has been reached.

The trait includes a number of combinators that are useful for working with future values.

Applications are built by either implementing Future for application specific types or defining application logic using combinators. Often, a mix of both strategies is most successful.

Next up: Non-blocking I/O