Overview

When developing applications with tokio one of the larger challenges is how to manage sharing access to resources that either need to be mutated or might have some internal state that needs to be protected. There are a few options for managing this synchronization. To go over this problem and what solutions are available we are going to use an somewhat contrived example of a rudimentary in-memory database. The basic structure will look something like the following.

#![allow(unused)]
fn main() {
use std::collections::BTreeMap;
use serde_json::Value;
use serde::Serialize;

/// A key value store where all keys are Strings
/// and any associated values backed by json values
#[derive(Debug, Default)]
pub struct Database {
    /// The mapping of keys to values
    map: BTreeMap<String, Value>,
}

impl Database {
    /// Get a reference to the values paired with the key if one exists
    pub fn get(&self, key: impl AsRef<str>) -> Option<&Value> {
        self.map.get(key.as_ref())
    }

    /// Insert a new value into the map, overwriting any previous value of one was present
    pub fn insert(&mut self, key: impl ToString, value: &impl Serialize) {
        let value = serde_json::to_value(value).unwrap();
        self.map.insert(key.to_string(), value);
    }
}
}

Our Database, is a simple wrapper around BTreeMap that isn't all that interesting yet but it does impose an issue if we tried to use this as a shared resource across tasks.

use std::time::{Duration, Instant};

#[tokio::main]
async fn main() {
    let mut db = Database::default();
    tokio::task::spawn(async {
        loop {
            dbg!(db.get("my-key"));
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    });

    tokio::task::spawn(async {
        let start = Instant::now();
        loop {
            db.insert("my-key", start.elapsed().as_millis())
        }
    });
}

The above will error with the following message

error[E0502]: cannot borrow `db` as mutable because it is also borrowed as immutable
  --> src/main.rs:38:24
   |
31 |        tokio::task::spawn(async {
   |  ______-__________________-
   | | _____|
   | ||
32 | ||         loop {
33 | ||             dbg!(db.get("my-key"));
   | ||                  -- first borrow occurs due to use of `db` in coroutine
34 | ||             tokio::time::sleep(Duration::from_secs(1)).await;
35 | ||         }
36 | ||     });
   | ||_____-- argument requires that `db` is borrowed for `'static`
   | |______|
   |        immutable borrow occurs here
37 |
38 |        tokio::task::spawn(async {
   |   ________________________^
39 |  |         let start = Instant::now();
40 |  |         loop {
41 |  |             db.insert("my-key", &start.elapsed().as_millis())
   |  |             -- second borrow occurs due to use of `db` in coroutine
42 |  |         }
43 |  |     });
   |  |_____^ mutable borrow

Because the task that calls get and the task that calls insert both need a reference to this shared database the compiler is complaining that we need somehting to synchronize the insert and get operations.

In the next chapter we are going to look at how we might achieve this with the standard libaray's primitives.

Using std

There are 2 types that we can utilize in the standard library's sync module, Arc and Mutex.

Arc moves our Database onto the heap so it can be accessed across each of our tasks via a clone. Mutex provides our synchronization, only one task will be allowed to lock the Mutex at any given time.

#[tokio::main]
async fn main() {
    // wrapping the database in an Arc+Mutex will make it shareable across our
    // tasks.
    let db = Arc::new(Mutex::new(Database::default()));
    tokio::task::spawn({
        // Clone the database reference to `move` into our async block
        let db = db.clone();
        async move {
            loop {
                dbg!(db.lock().unwrap().get("my-key"));
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
        }
    });

    tokio::task::spawn(async move {
        let start = Instant::now();
        loop {
            db.lock()
                .unwrap()
                .insert("my-key", &start.elapsed().as_millis());
            tokio::time::sleep(Duration::from_millis(350)).await;
        }
    // We need to await here to make sure we don't exit our application early
    }).await;
}

This works as expected (printing out the duration since start with about a 1 second delay between updates). However, immedatly dropping the the lock in both of our tasks isn't much of a real-world simulation. Instead, what happens when we add a short dealy before printing in the get task?

#![allow(unused)]
fn main() {
tokio::task::spawn({
    let db = db.clone();
    async move {
        loop {
            let guard = db.lock().unwrap();
            let value = guard.get("my-key");
            tokio::time::sleep(Duration::from_secs(1)).await;
            dbg!(value);
        }
    }
});
}

This seems like it would be just the same as our last version but cargo complains.

error: future cannot be sent between threads safely
   --> src/main.rs:31:5
    |
31  | /     tokio::task::spawn({
32  | |         let db = db.clone();
33  | |         async move {
34  | |             loop {
...   |
40  | |         }
41  | |     });
    | |______^ future created by async block is not `Send`

This is because we are awaiting in our get task before dropping the MutexGuard returned by db.lock(). Let's try and unroll what the order of operations here would be from when we first call lock.

  • db.lock() reserves the Mutex for our get task
  • guard.get returns a reference to a value in our database
  • tokio::time::sleep pauses this tasks allowing our insert task to run
  • db.lock() waits for all other MutexGuards to be dropped

And now we've achieved a deadlock since the insert task cannot procede until the get task makes progress but the get task has yielded control to tokio so there is no gaurantee that it will be resumed. For example if we were using a single threaded runtime the insert task would never yield control back to tokio meaning our application will always get stuck.

There are a few ways to deal with this, one would be to switch from std::sync::Mutex to tokio::sync::Mutex.

Tokio Mutex

To start, let's just swap out the Mutex and see what happens.

use serde::Serialize;
use serde_json::Value;
// We've dropped `sync::Mutex` here
use std::{
    collections::BTreeMap,
    sync::Arc,
    time::{Duration, Instant},
};
// This is our new import
use tokio::sync::Mutex;

async fn main() {
    let db = Arc::new(Mutex::new(Database::default()));
    tokio::task::spawn({
        let db = db.clone();
        async move {
            loop {
                let guard = db.lock().await;
                let value = guard.get("my-key");
                tokio::time::sleep(Duration::from_secs(1)).await;
                dbg!(value);
            }
        }
    });

    tokio::task::spawn(async move {
        let start = Instant::now();
        loop {
            db.lock()
                .await
                .insert("my-key", &start.elapsed().as_millis());
            tokio::time::sleep(Duration::from_millis(350)).await;
        }
    })
    .await
    .unwrap();
}

That worked! But what exactly is happening? Let's add some print statements to see if we can figure out the ordering of operations.

async fn main() {
    let db = Arc::new(Mutex::new(Database::default()));
    tokio::task::spawn({
        let db = db.clone();
        async move {
            loop {
                println!("->read-guard");
                let guard = db.lock().await;
                let value = guard.get("my-key");
                println!("->read-sleep");
                tokio::time::sleep(Duration::from_secs(1)).await;
                println!("<-read-sleep");
                dbg!(value);
                println!("<-read-guard")
            }
        }
    });

    tokio::task::spawn(async move {
        let start = Instant::now();
        loop {
            println!("->write-guard");
            db.lock()
                .await
                .insert("my-key", &start.elapsed().as_millis());
            println!("<-write-guard");
            tokio::time::sleep(Duration::from_millis(350)).await;
        }
    })
    .await
    .unwrap();
}

When we run this, we get the following output.

->read-guard
->read-sleep
->write-guard
<-read-sleep
[src/main.rs:55:17] value = None
<-read-guard
->read-guard
<-write-guard
->read-sleep
->write-guard
<-read-sleep
[src/main.rs:55:17] value = Some(
    Number(1002),
)
<-read-guard
->read-guard
<-write-guard

So, looking over our logs it seems like our get task locks our Mutex and then immeadly sleeps, tokio then selects our insert task which attempts to lock our Mutex however since our get task already has the lock it will yield back to tokio. Once our sleep finishes it prints out the value and drops the guard and then immedatly tries to lock the map again. At this point because the insert task called lock before our second lock in the get task, tokio will let our insert task start to make progress which inserts our first value.

Notice how the insert is delayed by the access in our get task, In this case that would be desirable since we don't want to be able to mutate a value when we don't have exclusive access to it.