Shared state
So far, we have a key-value server working. However, there is a major flaw: state is not shared across connections. We will fix that in this article.
Strategies
There are a couple of different ways to share state in Tokio.
- Guard the shared state with a Mutex.
- Spawn a task to manage the state and use message passing to operate on it.
Generally you want to use the first approach for simple data, and the second
approach for things that require asynchronous work such as I/O primitives. In
this chapter, the shared state is a HashMap
and the operations are insert
and get
. Neither of these operations is asynchronous, so we will use a
Mutex
.
The latter approach is covered in the next chapter.
Add bytes
dependency
Instead of using Vec<u8>
, the Mini-Redis crate uses Bytes
from the bytes
crate. The goal of Bytes
is to provide a robust byte array structure for
network programming. The biggest feature it adds over Vec<u8>
is shallow
cloning. In other words, calling clone()
on a Bytes
instance does not copy
the underlying data. Instead, a Bytes
instance is a reference-counted handle
to some underlying data. The Bytes
type is roughly an Arc<Vec<u8>>
but with
some added capabilities.
To depend on bytes
, add the following to your Cargo.toml
in the
[dependencies]
section:
bytes = "1"
Initialize the HashMap
The HashMap
will be shared across many tasks and potentially many threads. To
support this, it is wrapped in Arc<Mutex<_>>
.
First, for convenience, add the following type alias after the use
statements.
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
Then, update the main
function to initialize the HashMap
and pass an Arc
handle to the process
function. Using Arc
allows the HashMap
to be
referenced concurrently from many tasks, potentially running on many threads.
Throughout Tokio, the term handle is used to reference a value that provides
access to some shared state.
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
On using std::sync::Mutex
and tokio::sync::Mutex
Note that std::sync::Mutex
and not tokio::sync::Mutex
is used to guard
the HashMap
. A common error is to unconditionally use tokio::sync::Mutex
from within async code. An async mutex is a mutex that is locked across calls
to .await
.
A synchronous mutex will block the current thread when waiting to acquire the
lock. This, in turn, will block other tasks from processing. However, switching
to tokio::sync::Mutex
usually does not help as the asynchronous mutex uses a
synchronous mutex internally.
As a rule of thumb, using a synchronous mutex from within asynchronous code is
fine as long as contention remains low and the lock is not held across calls to
.await
.
Update process()
The process function no longer initializes a HashMap
. Instead, it takes the
shared handle to the HashMap
as an argument. It also needs to lock the
HashMap
before using it. Remember that the value's type for the HashMap
is
now Bytes
(which we can cheaply clone), so this needs to be changed as well.
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
Holding a MutexGuard
across an .await
You might write code that looks like this:
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
When you try to spawn something that calls this function, you will encounter the following error message:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
This happens because the std::sync::MutexGuard
type is not Send
. This
means that you can't send a mutex lock to another thread, and the error happens
because the Tokio runtime can move a task between threads at every .await
.
To avoid this, you should restructure your code such that the mutex lock's
destructor runs before the .await
.
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
Note that this does not work:
use std::sync::{Mutex, MutexGuard};
// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
This is because the compiler currently calculates whether a future is Send
based on scope information only. The compiler will hopefully be updated to
support explicitly dropping it in the future, but for now, you must explicitly
use a scope.
Note that the error discussed here is also discussed in the Send bound section from the spawning chapter.
You should not try to circumvent this issue by spawning the task in a way that
does not require it to be Send
, because if Tokio suspends your task at an
.await
while the task is holding the lock, some other task may be scheduled to
run on the same thread, and this other task may also try to lock that mutex,
which would result in a deadlock as the task waiting to lock the mutex would
prevent the task holding the mutex from releasing the mutex.
Keep in mind that some mutex crates implement Send
for their MutexGuards.
In this case, there is no compiler error, even if you hold a MutexGuard across
an .await
. The code compiles, but it deadlocks!
We will discuss some approaches to avoid these issues below:
Restructure your code to not hold the lock across an .await
The safest way to handle a mutex is to wrap it in a struct, and lock the mutex only inside non-async methods on that struct.
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
This pattern guarantees that you won't run into the Send
error, because the
mutex guard does not appear anywhere in an async function. It also protects you
from deadlocks, when using crates whose MutexGuard
implements Send
.
You can find a more detailed example in this blog post.
Spawn a task to manage the state and use message passing to operate on it
This is the second approach mentioned in the start of this chapter, and is often used when the shared resource is an I/O resource. See the next chapter for more details.
Use Tokio's asynchronous mutex
The tokio::sync::Mutex
type provided by Tokio can also be used. The primary
feature of the Tokio mutex is that it can be held across an .await
without any
issues. That said, an asynchronous mutex is more expensive than an ordinary
mutex, and it is typically better to use one of the two other approaches.
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
Tasks, threads, and contention
Using a blocking mutex to guard short critical sections is an acceptable strategy when contention is minimal. When a lock is contended, the thread executing the task must block and wait on the mutex. This will not only block the current task but it will also block all other tasks scheduled on the current thread.
By default, the Tokio runtime uses a multi-threaded scheduler. Tasks are
scheduled on any number of threads managed by the runtime. If a large number of
tasks are scheduled to execute and they all require access to the mutex, then
there will be contention. On the other hand, if the
current_thread
runtime flavor is used, then the mutex will
never be contended.
The
current_thread
runtime flavor is a lightweight, single-threaded runtime. It is a good choice when only spawning a few tasks and opening a handful of sockets. For example, this option works well when providing a synchronous API bridge on top of an asynchronous client library.
If contention on a synchronous mutex becomes a problem, the best fix is rarely to switch to the Tokio mutex. Instead, options to consider are to:
- Let a dedicated task manage state and use message passing.
- Shard the mutex.
- Restructure the code to avoid the mutex.
Mutex sharding
In our case, as each key is independent, mutex sharding will work well. To do
this, instead of having a single Mutex<HashMap<_, _>>
instance, we would
introduce N
distinct instances.
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
Then, finding the cell for any given key becomes a two step process. First, the
key is used to identify which shard it is part of. Then, the key is looked up in
the HashMap
.
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
The simple implementation outlined above requires using a fixed number of shards, and the number of shards cannot be changed once the sharded map is created.
The dashmap crate provides an implementation of a more sophisticated
sharded hash map. You may also want to have a look at such concurrent hash table
implementations as leapfrog and flurry, the latter being a port of Java's
ConcurrentHashMap
data structure.
Before you start using any of these crates, be sure you structure your code so,
that you cannot hold a MutexGuard
across an .await
. If you don't, you will
either have compiler errors (in case of non-Send guards) or your code will
deadlock (in case of Send guards). See a full example and more context in this blog post.