Announcing Tokio-Compat
December 18, 2019
The release of Tokio 0.2 was the culmination of a great deal of
hard work from numerous contributors, and has brought several significant
improvements to Tokio. Using std::future
and async/await makes writing async
code using Tokio much more ergonomic, and a new scheduler
implementation makes Tokio 0.2's thread pool as much as 10x faster.
However, updating existing Tokio 0.1 projects to use 0.2 and std::future
poses
some new challenges. Therefore, we're very excited to announce the release of
the tokio-compat
crate to help ease this transition, by providing a runtime
compatible with both Tokio 0.1 and Tokio 0.2 futures.
You can find tokio-compat
on crates.io and on GitHub.
Motivation
Both the transition from futures
0.1 to std::future
and the breaking changes
to Tokio's APIs in 0.2 have made it challenging to update existing projects to
benefit from the variety of improvements Tokio 0.2 offers. Additionally, these
breaking changes make it difficult for projects to migrate incrementally:
instead, they must do so all at once, requiring a lot of effort and sometimes
putting other work on hold until the migration is complete. In order to enable
incremental migration, we need a compatibility layer that allows us to use code
written against both the legacy Tokio 0.1/futures
0.1 APIs and the new Tokio
0.2/std::future
APIs in the same project.
The futures
crate's compat
module provides
interoperability between futures
0.1 and std::future
future types, such as
implementing std::future::Future
for a type that implements the futures
0.1
Future
This is a foundational part of the compatibility story, but, on its
own, it is insufficient to allow most projects to be incrementally updated. Most
code using Tokio relies on the runtime services that Tokio provides. These
runtime services include the ability to spawn other tasks; the I/O driver, which
allows tasks to be notified by the operating system's async I/O APIs, and the
timer. Futures which rely on Tokio 0.1's runtime services will panic when they
try to access those runtime services (such as by spawning a task or creating a
timer) on the Tokio 0.2 runtime, even if they are converted to the
std::future::Future
trait. This is because the new runtime does not provide
these services in a way compatible with Tokio 0.1's APIs.
The tokio-compat
crate helps to bridge this gap by providing a compatibility
runtime, which provides runtime services compatible with both Tokio 0.1 and
Tokio 0.2. For example, using tokio-compat
, we can write code like this:
use futures_01::future::lazy;
tokio_compat::run(lazy(|| {
// spawn a `futures` 0.1 future using the `spawn` function from the
// `tokio` 0.1 crate:
tokio_01::spawn(lazy(|| {
println!("hello from tokio 0.1!");
Ok(())
}));
// spawn an `async` block future on the same runtime using `tokio`
// 0.2's `spawn`:
tokio_02::spawn(async {
println!("hello from tokio 0.2!");
});
Ok(())
}))
Similarly, we can run tasks that rely on both the 0.1 and 0.2 versions of runtime services like the timer and I/O driver:
use std::time::{Duration, Instant};
use tokio_compat::prelude::*;
tokio_compat::run_std(async {
// Wait for a `tokio` 0.1 `Delay`...
let when = Instant::now() + Duration::from_millis(10);
tokio_01::timer::Delay::new(when)
// convert the delay future into a `std::future` that we can `await`.
.compat()
.await
.expect("tokio 0.1 timer should work!");
println!("10 ms have elapsed");
// Wait for a `tokio` 0.2 `Delay`...
tokio_02::time::delay_for(Duration::from_millis(20)).await;
println!("20 ms have elapsed");
});
Using tokio-compat
Primary use-cases for tokio-compat
include:
- Incrementally migrating applications: Updating a large project to use new
APIs is challenging. This kind of change is often much easier when it can be
made gradually module-by-module, or by requiring that new code added to the
project use new APIs and slowly rewriting existing code as it's changed for
other reasons. Due to the incompatibility between 0.1 and 0.2 runtimes,
however, this isn't really possible for most projects. Instead, it's necessary
to update everything all at once, requiring a single large change that can
often hold up other work.
tokio-compat
can allow projects to migrate incrementally instead. - Using legacy libraries in new code: If a new project that uses Tokio 0.2
needs functionality from a library written against 0.1, its authors are faced
with a variety of choices, none of which are particularly good. They can block
the progress of their features on the dependency being rewritten to use 0.2,
which can take a long time; they can rewrite their project to use 0.1,
giving up all the advantages of async/await and likely requiring another
rewrite when the dependency is updated; or they can take on the responsibility
of updating the dependency themselves. With
tokio-compat
, however, it is possible to use futures from libraries that expect Tokio 0.1 on the same runtime as Tokio 0.2 futures.
In all these cases, tokio-compat
is hopefully a temporary necessity: ideally,
most code should transition to using async/await, std::future
, and Tokio 0.2.
Although we've worked hard to make the compatibility layer as lightweight as
possible, it is, by definition, an additional source of complexity in the
system. Furthermore, async/await offers significant ergonomics advantages, and
code using it is easier to understand and modify, so most projects will benefit
greatly from moving to it. The role of tokio-compat
is to make this transition
easier and more incremental.
Getting Started
The APIs provided by tokio-compat
are intended as a drop-in replacement for
tokio
0.1's APIs. The runtimes provided by tokio-compat
expose functions
with the same names and signatures as Tokio 0.1's runtimes. Therefore, in many
cases, getting started with tokio-compat
is as simple as adding
tokio-compat = { version = "0.1", features = ["rt-full"] }
to your Cargo.toml, and changing import and paths referencing tokio
0.1's
Runtime
module to tokio_compat
's. So, for example,
tokio::runtime::run(future);
becomes
tokio_compat::runtime::run(future);
Similarly,
use tokio::runtime::Runtime;
let mut rt = Runtime::new().unwrap();
rt.spawn(future);
rt.shutdown_on_idle()
.wait()
.unwrap();
becomes
use tokio_compat::runtime::Runtime;
let mut rt = Runtime::new().unwrap();
rt.spawn(future);
rt.shutdown_on_idle()
.wait()
.unwrap();
Only the tokio::runtime
module and tokio::run
function need to be replaced
with tokio-compat
's versions. Other APIs exposed by Tokio 0.1, such as
tokio::net
, will work properly on the compatibility runtime (with some
exceptions we'll discuss shortly). When running on the compatibility runtime,
code which spawns futures
0.1 tasks via tokio
0.1's spawn
function will
work, as will code that spawns std::future
tasks via tokio
0.2's spawn
.
Additionally, the tokio-compat
runtimes, TaskExecutor
s, and other types
also provide std::future
-compatible methods. For example, the Runtime
has both a spawn
, which spawns 0.1 futures, and a spawn_std
, which spawns a
std::future
future (or an async
function/block). See this
section in the tokio-compat
API docs for details on spawning.
Once a project is running on the compatibility runtime, it's easy to gradually
migrate to std::future
and async/await. One option is to simply perform a "one
to one" translation of the existing codebase: for example, types implementing
futures::future::Future
are rewritten to implement std::future::Future
, code
using future combinators is changed to use the futures
0.3 versions of those
combinators, and so on. In many cases, the required changes are fairly
mechanical, e.g. changing imports, renaming Async::NotReady
to
Poll::Pending
, et cetera. However, the Pin<&mut Self>
receiver type for
std::future::Future
's poll
method can make migrating manual Future
implementations challenging. Crates such as pin-project
can be helpful when
dealing with the stack pinning requirement.
In most cases, however, it is often significantly easier to rewrite the existing
code to use async/await syntax, rather than implementing Future
by hand or
using future combinators. Although such a change is larger when measured in
terms of how many lines of code were modified, the ergonomic benefits of
async/await syntax can make the migration much easier — oftentimes, a
large amount of boilerplate code can simply be removed. Furthermore, this has
the benefit of resulting in more idiomatic, readable, and maintainable code. For
most projects, switching to use async/await syntax is the recommended migration
path.
Notes
There are a small number of "gotchas" to keep in mind in the current
tokio-compat
v0.1. In particular, it is important to note that he
compatibility thread pool runtime does not currently support the tokio
0.1
tokio_threadpool::blocking
API. Calls to the legacy version of
blocking
made on the compatibility runtime will currently fail. In the future,
tokio-compat
will allow transparently replacing legacy blocking
with the
tokio
0.2 blocking APIs, but in the meantime, it will be necessary to convert
this code to call into the tokio
0.2 task::block_in_place
and
task::spawn_blocking
APIs instead. Since tokio::fs
relies on the
blocking APIs, the Tokio 0.1 version of tokio::fs
also will not currently work
on the compatibility runtime. See here for details.
Additionally, it's important to keep in mind that Tokio 0.1 and Tokio 0.2
provide subtly different behavior around spawning tasks and shutting down
runtimes. In particular, Tokio 0.1 tracks whether a runtime is idle (i.e., it
has no futures running on it), and provides a Runtime::shutdown_on_idle
method
which shuts down the runtime when it becomes idle. On the other hand, Tokio
0.2's spawn
functions return JoinHandle
s, which can be used to await the
completion of spawned tasks, and users are instead expected to await these
JoinHandles
in order to determine when the runtime should shut down.
Therefore, tokio-compat
provides both APIs, but it is important to note that
only tasks spawned without JoinHandle
s "count" against the runtime being
considered idle. See this section in the documentation for details.
Case Study: Vector
Vector is a high-performance observability data router for collecting and transforming logs, metrics, and events. Vector is a production application that relies on the Tokio runtime, and currently depends on Tokio 0.1.
After switching Vector from Tokio 0.1 to Tokio 0.2, its maintainers observed significant performance improvements in benchmarks:
| benchmark group | time (0.1) | thoughput (0.1) | time (0.2) | throughput (0.2) | speedup | | --------------------------- | ------------: | --------------: | ------------: | ---------------: | ------: | | batch 10mb with 2mb batches | 5.5±0.14ms | 1732.8 MB/s | 5.1±0.15ms | 1864.0 MB/s | x 1.08 | | buffers/in-memory | 199.3±2.35ms | 47.8 MB/s | 99.6±4.40ms | 95.7 MB/s | x 2.00 | | http/http_gzip | 191.1±3.17ms | 49.9 MB/s | 94.3±6.24ms | 101.1 MB/s | x 2.03 | | http/http_no_compression | 191.8±4.68ms | 49.7 MB/s | 91.2±2.56ms | 104.6 MB/s | x 2.10 | | interconnected | 238.7±5.66ms | 79.9 MB/s | 209.7±5.32ms | 91.0 MB/s | x 1.14 | | pipe | 198.4±0.60ms | 48.1 MB/s | 106.0±6.52ms | 90.0 MB/s | x 1.87 | | pipe_with_many_writers | 131.7±12.56ms | 72.4 MB/s | 150.7±10.56ms | 63.3 MB/s | x 0.86 | | pipe_with_tiny_lines | 165.7±0.77ms | 589.2 KB/s | 84.1±0.66ms | 1161.4 KB/s | x 1.97 | | transforms | 275.7±3.28ms | 38.0 MB/s | 167.5±8.07ms | 62.6 MB/s | x 1.65 |
Note how in many of these benchmarks, the Tokio 0.2 version (using
tokio-compat
) was as much as two times faster than the Tokio 0.1 version,
simply by changing the Tokio runtime used.
The surface area of the change necessary to update Vector to use the compatibility runtime is also quite minimal. Here's the full diff:
diff --git a/src/runtime.rs b/src/runtime.rs
index 07005a7d..4d64a106 100644
--- a/src/runtime.rs
+++ b/src/runtime.rs
@@ -1,15 +1,16 @@
use futures::future::{ExecuteError, Executor, Future};
+use futures_util::future::{FutureExt, TryFutureExt};
use std::io;
-use tokio::runtime::Builder;
+use tokio_compat::runtime::Builder;
pub struct Runtime {
- rt: tokio::runtime::Runtime,
+ rt: tokio_compat::runtime::Runtime,
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Ok(Runtime {
- rt: tokio::runtime::Runtime::new()?,
+ rt: tokio_compat::runtime::Runtime::new()?,
})
}
@@ -47,17 +48,17 @@ impl Runtime {
}
pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
- self.rt.shutdown_on_idle()
+ self.rt.shutdown_on_idle().unit_error().boxed().compat()
}
pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
- self.rt.shutdown_now()
+ self.rt.shutdown_now().unit_error().boxed().compat()
}
}
#[derive(Clone, Debug)]
pub struct TaskExecutor {
- inner: tokio::runtime::TaskExecutor,
+ inner: tokio_compat::runtime::TaskExecutor,
}
impl TaskExecutor {
@@ -71,6 +72,7 @@ where
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
- self.inner.execute(future)
+ self.inner.spawn(future);
+ Ok(())
}
}
diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs
index 1b99328b..880374b2 100644
--- a/src/sinks/kafka.rs
+++ b/src/sinks/kafka.rs
@@ -6,7 +6,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{
- future::{self, poll_fn, IntoFuture},
+ future::{self, IntoFuture},
stream::FuturesUnordered,
Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
};
@@ -213,18 +213,14 @@ impl Sink for KafkaSink {
fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();
- let check = poll_fn(move || {
- tokio_threadpool::blocking(|| {
- consumer
- .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
- .map(|_| ())
- .map_err(|err| err.into())
- })
- })
- .map_err(|err| err.into())
- .and_then(|result| result.into_future());
+ let task = tokio02::task::block_in_place(|| {
+ consumer
+ .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
+ .map(|_| ())
+ .map_err(|err| err.into())
+ });
- Box::new(check)
+ Box::new(task.into_future())
}
(thanks to @LucioFranco for trying out tokio-compat
in Vector!)
Conclusion
Many major open-source projects have been using Tokio since the tokio-core
days. Asynchronous programming in Rust has come incredibly far since then, and
we hope tokio-compat
makes it easy for all Tokio users to start seeing
benefits from the new std::future
/Tokio 0.2 ecosystem as painlessly as
possible. Of course, as with all 0.1 releases, there's still more to do,
including:
- Seamless support for
tokio-threadpool
0.1'sblocking
APIs - A compatibility layer for
tokio-io
0.1'sAsyncRead
andAsyncWrite
traits - Reimplementations of APIs from Tokio 0.1 that were removed in 0.2
- The obligatory bug fixes and performance improvements
You can find the tokio-compat
repository on GitHub. As always, we
would love feedback and contributions from the Tokio community. If you need
help, or want to discuss tokio-compat
details, please join us on
Discord — all are welcome!