Example: A Chat Server

We’re going to use what has been covered so far to build a chat server. This is a non-trivial Tokio server application.

The server is going to use a line-based protocol. Lines are terminated by \r\n. This is compatible with telnet, so we will just use telnet for the client. When a client connects, it must identify itself by sending a line containing its “nick” (i.e., some name used to identify the client amongst its peers).

Once a client is identified, all sent lines are prefixed with [nick]: and broadcasted to all other connected clients.

The full code can be found here. Note that Tokio provides some additional abstractions that have not yet been covered that would enable the chat server to be written with less code.

Setup

First, generate a new crate.

$ cargo new --bin line-chat
cd line-chat

Next, add the necessary dependencies:

[dependencies]
tokio = "0.1"
tokio-io = "0.1"
futures = "0.1"
bytes = "0.4"

and the crates and types into scope in main.rs:

# #![deny(deprecated)]
extern crate tokio;
#[macro_use]
extern crate futures;
extern crate bytes;

use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};

/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;

/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<Bytes>;
# fn main() {}

Now, we setup the necessary structure for a server. These are the same steps that were used as part of the Hello World! example:

  • Bind a TcpListener to a local port.
  • Define a task that accepts inbound connections and processes them.
  • Start the Tokio runtime
  • Spawn the server task.

Again, no work actually happens until the server task is spawned on the executor.

# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
#
# use tokio::prelude::*;
# use tokio::net::TcpListener;
fn main() {
    let addr = "127.0.0.1:6142".parse().unwrap();
    let listener = TcpListener::bind(&addr).unwrap();

    let server = listener.incoming().for_each(move |socket| {
        // TODO: Process socket
        Ok(())
    })
    .map_err(|err| {
        // Handle error by printing to STDOUT.
        println!("accept error = {:?}", err);
    });

    println!("server running on localhost:6142");
# let server = server.select(futures::future::ok(())).then(|_| Ok(()));

    // Start the server
    //
    // This does a few things:
    //
    // * Start the Tokio runtime (reactor, threadpool, etc...)
    // * Spawns the `server` task onto the runtime.
    // * Blocks the current thread until the runtime becomes idle, i.e. all
    //   spawned tasks have completed.
    tokio::run(server);
}

Chat State

A chat server requires that messages received from one client are broadcasted to all other connected clients. This will be done using message passing over mpsc channels.

Each client socket will be managed by a task. Each task will have an associated mpsc channel that is used to receive messages from other clients. The send half of all these channels is stored in an Rc cell in order to make them accessible.

In this example, we are going to be using unbounded channels. Ideally, channels should never be unbounded, but handling backpressure in this kind of situation is a bit tricky. We will leave bounding the channels to a later section dedicated to handling backpressure.

Here is how the shared state is defined (the Tx type alias was done above):

# #![deny(deprecated)]
# use std::collections::HashMap;
# use std::net::SocketAddr;
# struct Tx;
struct Shared {
    peers: HashMap<SocketAddr, Tx>,
}

Then, at the very top of the main function, the state instance is created. This state instance will be moved into the task that accepts incoming connections.

# #![deny(deprecated)]
# use std::sync::{Arc, Mutex};
# type Shared = String;
let state = Arc::new(Mutex::new(Shared::new()));

Now we can handle processing incoming connections. The server task is updated to this:

# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
# use tokio::net::{TcpListener, TcpStream};
# use futures::prelude::*;
# fn dox() {
# let addr = "127.0.0.1:6142".parse().unwrap();
# let listener = TcpListener::bind(&addr).unwrap();
# fn process(_: TcpStream, _: String) {}
# let state = String::new();
listener.incoming().for_each(move |socket| {
    process(socket, state.clone());
    Ok(())
})
# ;
# }
# fn main() {}

The server task passes all sockets along with a clone of the server state to a process function. Let’s define that function. It will have a structure like this:

# #![deny(deprecated)]
# extern crate tokio;
# extern crate futures;
# use futures::future;
# use tokio::net::TcpStream;
# use std::sync::{Arc, Mutex};
# type Shared = String;
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
    // Define the task that processes the connection.
# /*
    let task = unimplemented!();
# */ let task = future::ok(());

    // Spawn the task
    tokio::spawn(task);
}
# fn main() {}

The call to tokio::spawn will spawn a new task onto the current Tokio runtime. All the worker threads keep a reference to the current runtime stored in a thread-local variable. Note, attempting to call tokio::spawn from outside of the Tokio runtime will result in a panic.

All the connection processing logic has to be able to do is understand the protocol. The protocol is line-based, terminated by \r\n. Instead of working at the byte stream level, it is much easier to work at the frame level, i.e. working with values that represent atomic messages.

We implement a codec that holds the socket and exposes an API that takes and consumes lines.

Line Codec

A codec is a loose term for a type that takes a byte stream type (AsyncRead + AsyncWrite) and exposes a read and write API at the frame level. The tokio-io crate provides additional helpers for writing codecs, in this example, we are going to do it by hand.

The Lines codec is defined as such:

# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# use tokio::net::TcpStream;
# use bytes::BytesMut;
struct Lines {
    socket: TcpStream,
    rd: BytesMut,
    wr: BytesMut,
}

impl Lines {
    /// Create a new `Lines` codec backed by the socket
    fn new(socket: TcpStream) -> Self {
        Lines {
            socket,
            rd: BytesMut::new(),
            wr: BytesMut::new(),
        }
    }
}
# fn main() {}

Data read from the socket is buffered into rd. When a full line is read, it is returned to the caller. Lines submitted by the caller to write to the socket are buffered into wr, then flushed.

This is how the read half is implemented:

# #![deny(deprecated)]
# extern crate bytes;
# extern crate tokio;
# #[macro_use]
# extern crate futures;
# #[macro_use]
# use bytes::BytesMut;
# use tokio::io;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# struct Lines {
#     socket: TcpStream,
#     rd: BytesMut,
#     wr: BytesMut,
# }
impl Stream for Lines {
    type Item = BytesMut;
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
        // First, read any new data that might have been received
        // off the socket
        //
        // We track if the socket is closed here and will be used
        // to inform the return value below.
        let sock_closed = self.fill_read_buf()?.is_ready();

        // Now, try finding lines
        let pos = self.rd.windows(2)
            .position(|bytes| bytes == b"\r\n");

        if let Some(pos) = pos {
            // Remove the line from the read buffer and set it
            // to `line`.
            let mut line = self.rd.split_to(pos + 2);

            // Drop the trailing \r\n
            line.split_off(pos);

            // Return the line
            return Ok(Async::Ready(Some(line)));
        }

        if sock_closed {
            Ok(Async::Ready(None))
        } else {
            Ok(Async::NotReady)
        }
    }
}

impl Lines {
    fn fill_read_buf(&mut self) -> Result<Async<()>, io::Error> {
        loop {
            // Ensure the read buffer has capacity.
            //
            // This might result in an internal allocation.
            self.rd.reserve(1024);

            // Read data into the buffer.
            //
            // The `read_buf` fn is provided by `AsyncRead`.
            let n = try_ready!(self.socket.read_buf(&mut self.rd));

            if n == 0 {
                return Ok(Async::Ready(()));
            }
        }
    }
}
# fn main() {}

The example uses BytesMut from the bytes crate. This provides some nice utilities for working with byte sequences in a networking context. The Stream implementation yields BytesMut values which contain exactly one line.

As always, the key to implementing a function that returns Async is to never return Async::NotReady unless the function implementation received an Async::NotReady itself. In this example, NotReady is only returned if fill_read_buf returns NotReady and fill_read_buf only returns NotReady if TcpStream::read_buf returns NotReady.

Now, for the write half.

# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# #[macro_use]
# extern crate futures;
# use tokio::io;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use bytes::{BytesMut, BufMut};
struct Lines {
    socket: TcpStream,
    rd: BytesMut,
    wr: BytesMut,
}
impl Lines {
    fn buffer(&mut self, line: &[u8]) {
        // Push the line onto the end of the write buffer.
        //
        // The `put` function is from the `BufMut` trait.
        self.wr.put(line);
    }

    fn poll_flush(&mut self) -> Poll<(), io::Error> {
        // As long as there is buffered data to write, try to write it.
        while !self.wr.is_empty() {
            // Try to write some bytes to the socket
            let n = try_ready!(self.socket.poll_write(&self.wr));

            // As long as the wr is not empty, a successful write should
            // never write 0 bytes.
            assert!(n > 0);

            // This discards the first `n` bytes of the buffer.
            let _ = self.wr.split_to(n);
        }

        Ok(Async::Ready(()))
    }
}
fn main() {}

The caller queues up all lines by calling buffer. This appends the line to the internal wr buffer. Then, once all data is queued up, the caller calls poll_flush, which does the actual writing to the socket. poll_flush only returns Ready once all the queued data has been successfully written to the socket.

Similar to the read half, NotReady is only returned when the function implementation received NotReady itself.

And the Lines codec is used in the process function as such:

# #![deny(deprecated)]
# extern crate tokio;
# extern crate bytes;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use bytes::BytesMut;
# use std::io;
# use std::sync::{Arc, Mutex};
# type Shared = String;
# struct Lines;
# impl Lines {
# fn new(_: TcpStream) -> Self { unimplemented!() }
# }
# impl Stream for Lines {
#     type Item = BytesMut;
#     type Error = io::Error;
#     fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { unimplemented!() }
# }
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
    // Wrap the socket with the `Lines` codec that we wrote above.
    let lines = Lines::new(socket);

    // The first line is treated as the client's name. The client
    // is not added to the set of connected peers until this line
    // is received.
    //
    // We use the `into_future` combinator to extract the first
    // item from the lines stream. `into_future` takes a `Stream`
    // and converts it to a future of `(first, rest)` where `rest`
    // is the original stream instance.
    let connection = lines.into_future()
        // `into_future` doesn't have the right error type, so map
        // the error to make it work.
        .map_err(|(e, _)| e)
        // Process the first received line as the client's name.
        .and_then(|(name, lines)| {
            let name = match name {
                Some(name) => name,
                None => {
                    // TODO: Handle a client that disconnects
                    // early.
                    unimplemented!();
                }
            };

            // TODO: Rest of the process function
# Ok(())
        });
}
# fn main() {}

Broadcasting Messages

The next step is to implement the connection processing logic that handles the actual chat functionality, i.e. broadcasting messages from one client to all the others.

To implement this, we will explicitly implement a Future that takes the Lines codec instance and handles the broadcasting logic. This logic handles:

  1. Receive messages on its message channel and write them to the socket.
  2. Receive messages from the socket and broadcast them to all peers.

Implementing this logic entirely with combinators is also possible, but requires using split, which hasn’t been covered yet. Also, this provides an opportunity to see how to implement a non-trivial Future by hand.

Here is the definition of the future that processes the broadcast logic for a connection:

# use std::net::SocketAddr;
# use std::sync::{Arc, Mutex};
# type BytesMut = ();
# type Lines = ();
# type Shared = ();
# type Rx = ();
struct Peer {
    /// Name of the peer. This is the first line received from the client.
    name: BytesMut,

    /// The TCP socket wrapped with the `Lines` codec.
    lines: Lines,

    /// Handle to the shared chat state.
    state: Arc<Mutex<Shared>>,

    /// Receive half of the message channel.
    ///
    /// This is used to receive messages from peers. When a message is received
    /// off of this `Rx`, it will be written to the socket.
    rx: Rx,

    /// Client socket address.
    ///
    /// The socket address is used as the key in the `peers` HashMap. The
    /// address is saved so that the `Peer` drop implementation can clean up its
    /// entry.
    addr: SocketAddr,
}
# fn main() {}

And a Peer instance is created as such:

# extern crate bytes;
# extern crate futures;
# extern crate tokio;
# use bytes::{BytesMut, Bytes};
# use futures::sync::mpsc;
# use tokio::net::TcpStream;
# use tokio::prelude::*;
# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
#     name: BytesMut,
#     lines: Lines,
#     state: Arc<Mutex<Shared>>,
#     rx: Rx,
#     addr: SocketAddr,
# }
# struct Shared {
#     peers: HashMap<SocketAddr, Tx>,
# }
# struct Lines {
#     socket: TcpStream,
# }
# type Tx = mpsc::UnboundedSender<Bytes>;
# type Rx = mpsc::UnboundedReceiver<Bytes>;
impl Peer {
    fn new(name: BytesMut,
           state: Arc<Mutex<Shared>>,
           lines: Lines) -> Peer
    {
        // Get the client socket address
        let addr = lines.socket.peer_addr().unwrap();

        // Create a channel for this peer
        let (tx, rx) = mpsc::unbounded();

        // Add an entry for this `Peer` in the shared state map.
        state.lock().unwrap()
            .peers.insert(addr, tx);

        Peer {
            name,
            lines,
            state,
            rx,
            addr,
        }
    }
}
# fn main() {}

A mpsc channel is created for other peers to send their messages to this newly created peer. After creating the channel, the transmit half is inserted into the peers map. This entry is removed in the drop implementation for Peer.

# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
#     state: Arc<Mutex<Shared>>,
#     addr: SocketAddr,
# }
# struct Shared {
#     peers: HashMap<SocketAddr, ()>,
# }
impl Drop for Peer {
    fn drop(&mut self) {
        self.state.lock().unwrap().peers
            .remove(&self.addr);
    }
}
# fn main() {}

And here is the implementation.

# extern crate tokio;
# extern crate futures;
# extern crate bytes;
# use tokio::io;
# use tokio::prelude::*;
# use futures::sync::mpsc;
# use bytes::{Bytes, BytesMut, BufMut};
# use std::net::SocketAddr;
# use std::collections::HashMap;
# use std::sync::{Arc, Mutex};
# struct Peer {
#     name: BytesMut,
#     lines: Lines,
#     state: Arc<Mutex<Shared>>,
#     rx: Rx,
#     addr: SocketAddr,
# }
# struct Shared {
#     peers: HashMap<SocketAddr, Tx>,
# }
# struct Lines;
# type Tx = mpsc::UnboundedSender<Bytes>;
# type Rx = mpsc::UnboundedReceiver<Bytes>;
# impl Lines {
#     fn buffer(&mut self, _: &[u8]) { unimplemented!() }
#     fn poll_flush(&mut self) -> Poll<(), io::Error> { unimplemented!() }
# }
# impl Stream for Lines {
#     type Item = BytesMut;
#     type Error = io::Error;
#     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
#         unimplemented!();
#     }
# }
impl Future for Peer {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<(), io::Error> {
        // Receive all messages from peers.
        loop {
            // Polling an `UnboundedReceiver` cannot fail, so `unwrap`
            // here is safe.
            match self.rx.poll().unwrap() {
                Async::Ready(Some(v)) => {
                    // Buffer the line. Once all lines are buffered,
                    // they will be flushed to the socket (right
                    // below).
                    self.lines.buffer(&v);
                }
                _ => break,
            }
        }

        // Flush the write buffer to the socket
        let _ = self.lines.poll_flush()?;

        // Read new lines from the socket
        while let Async::Ready(line) = self.lines.poll()? {
            println!("Received line ({:?}) : {:?}", self.name, line);

            if let Some(message) = line {
                // Append the peer's name to the front of the line:
                let mut line = self.name.clone();
                line.put(": ");
                line.put(&message);
                line.put("\r\n");

                // We're using `Bytes`, which allows zero-copy clones
                // (by storing the data in an Arc internally).
                //
                // However, before cloning, we must freeze the data.
                // This converts it from mutable -> immutable,
                // allowing zero copy cloning.
                let line = line.freeze();

                // Now, send the line to all other peers
                for (addr, tx) in &self.state.lock().unwrap().peers {
                    // Don't send the message to ourselves
                    if *addr != self.addr {
                        // The send only fails if the rx half has been
                        // dropped, however this is impossible as the
                        // `tx` half will be removed from the map
                        // before the `rx` is dropped.
                        tx.unbounded_send(line.clone()).unwrap();
                    }
                }
            } else {
                // EOF was reached. The remote client has disconnected.
                // There is nothing more to do.
                return Ok(Async::Ready(()));
            }
        }

        // As always, it is important to not just return `NotReady`
        // without ensuring an inner future also returned `NotReady`.
        //
        // We know we got a `NotReady` from either `self.rx` or
        // `self.lines`, so the contract is respected.
        Ok(Async::NotReady)
    }
}
# fn main() {}

Final Touches

All that remains is wiring up the Peer future that was just implemented. To do this, the client connection task (defined in the process function) is extended to use Peer.

# extern crate tokio;
# extern crate futures;
# use tokio::io;
# use tokio::prelude::*;
# use futures::future::{self, Either, empty};
# type Lines = Box<Stream<Item = (), Error = io::Error>>;
# struct Peer;
# impl Peer {
#     fn new(_: (), state: (), lines: Lines) -> impl Future<Item = (), Error = io::Error> {
#         empty()
#     }
# }
# fn dox(lines: Lines) {
# let state = ();
let connection = lines.into_future()
    .map_err(|(e, _)| e)
    .and_then(|(name, lines)| {
        // If `name` is `None`, then the client disconnected without
        // actually sending a line of data.
        //
        // Since the connection is closed, there is no further work
        // that we need to do. So, we just terminate processing by
        // returning `future::ok()`.
        //
        // The problem is that only a single future type can be
        // returned from a combinator closure, but we want to
        // return both `future::ok()` and `Peer` (below).
        //
        // This is a common problem, so the `futures` crate solves
        // this by providing the `Either` helper enum that allows
        // creating a single return type that covers two concrete
        // future types.
        let name = match name {
            Some(name) => name,
            None => {
                // The remote client closed the connection without
                // sending any data.
                return Either::A(future::ok(()));
            }
        };

        println!("`{:?}` is joining the chat", name);

        // Create the peer.
        //
        // This is also a future that processes the connection, only
        // completing when the socket closes.
        let peer = Peer::new(
            name,
            state,
            lines);

        // Wrap `peer` with `Either::B` to make the return type fit.
        Either::B(peer)
    })
    // Task futures have an error of type `()`, this ensures we handle
    // the error. We do this by printing the error to STDOUT.
    .map_err(|e| {
        println!("connection error = {:?}", e);
    });
# }
# fn main() {}

Besides just adding Peer, name == None is also handled. In this case, the remote client terminated before identifying itself.

Returning multiple futures (the name == None handler and Peer) is handled by wrapping the returned futures in Either. Either is an enum that accepts a different future type for each variant. This allows returning multiple future types without reaching for trait objects.

The full code can be found here.

Next up: Timers