Omar Abid

A primer to Rust Async

This article is not comprehensive on the Rust Async topic but could be an easy overview if you have no idea about Async Programming in Rust or in general. If you are wondering about the new async/await keywords, Futures, and intrigued what Tokio is useful for, then you should feel less clueless by the end.

Rust Async is the new hot thing in Rust's land. It has been hailed as a big milestone for Rust; especially for people developing highly performant networking applications. The long time for development, the different incompatible versions, and the various libraries; however, might made it not very straightforward to grasp. There is a lot going and it's not obvious from where to start.

Let's start from the beginning.

What is Async?

There are several articles, books and videos that goes in depth on Async; but I'll give you the short version: If you have a single processor and want to execute two tasks simultaneously (kind of), how would you do it? The solution is to run a little bit of the first task, then switch and run a little bit of the second task, then switch back, etc... until both tasks are complete.

This is useful if you want to give the impression that the computer is running both tasks simultaneously and the computer is fast enough to trick the human eye (i.e.: multitasking). Another useful use-case is IO operations. When your program is waiting for a network response, your CPU is sitting idle. This is an ideal time to switch to another task.

So how do we write Async code?

First, let's start with some synchronous code.

Synchronous code

Let's make a simple program that reads two files: file1.txt and file2.txt. We start with file1.txt and then move to file2.txt, in that order.

We split the program into two files: main.rs and file.rs. file.rs has a single function: read_file, and in main.rs we call this function with the path of each file. You can find the code below.

file.rs

use std::fs::File;
use std::io::{self, Read};

pub fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path)?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer)?;
    Ok(buffer)
}

main.rs

use std::io;

mod file;

fn main() -> io::Result<()> {
    println!("program started");

    let file1 = file::read_file("src/file1.txt")?;
    println!("processed file 1");

    let file2 = file::read_file("src/file2.txt")?;
    println!("processed file 2");

    dbg!(&file1);
    dbg!(&file2);

    Ok(())
}

Use cargo run to compile and run the program. The program should run with no surprises but make sure you have placed two files (file1.txt and file2.txt) in your src folder.

program started
processed file 1
processed file 2
[src/main.rs:14] &file1 = "file 1\n"
[src/main.rs:15] &file2 = "file 2\n"

So far, all is good. If you need file1.txt processed before file2.txt, then this is the only way to go. But there are other times when you don't care of the order each file is processed. Ideally, you would be looking to have the files processed as fast as possible.

In that case, we can make use of multi-threading.

The multi-threading approach

For this, we run a separate thread for each function call. Since we are using multi-threaded code, and if we want to access the file contents outside the threads, we have to use one of the sync primitives that Rust provides.

Here is how it will affect the code: file.rs will stay the same, so that's one thing already taken care of. In main.rs, we need to initialize two RwLocks; these will be used later in the threads to store the file contents.

Then, we run an infinite loop where we try to read the content of these two variables. If these variables are not empty, then we know that the file processing (or reading) completed. (This means that the files shouldn't be empty; otherwise our program will just keep, mistakenly, waiting. An alternative is to use Option<String> and check if the Option in None).

If you need more overview about RwLock and multi-threading, you can check my previous article: A brief introduction to Rust.

This code requires the lazy_static crate.

main.rs

use std::io;
use std::sync::RwLock;
use std::thread;

use lazy_static::lazy_static;

mod file;

// A sync primitive that allows to read/write from variables between threads.
// we declare the variables here, this requires the lazy_static crate
lazy_static! {
    static ref FILE1: RwLock<String> = RwLock::new(String::from(""));
    static ref FILE2: RwLock<String> = RwLock::new(String::from(""));
}

fn main() -> io::Result<()> {
    println!("program started");

    let thread_1 = thread::spawn(|| {
        let mut w1 = FILE1.write().unwrap();
        *w1 = file::read_file("src/file1.txt").unwrap();
        println!("read file 1");
    });

    println!("Launched Thread 1");

    let thread_2 = thread::spawn(|| {
        let mut w2 = FILE2.write().unwrap();
        *w2 = file::read_file("src/file2.txt").unwrap();
        println!("read file 2");
    });

    println!("Launched Thread 2");

    let mut rf1: bool = false;
    let mut rf2: bool = false;

    loop {
        let r1 = FILE1.read().unwrap();
        let r2 = FILE2.read().unwrap();

        if *r1 != String::from("") && rf1 == false {
            println!("completed file 1");
            rf1 = true;
        }

        if *r2 != String::from("") && rf2 == false {
            println!("completed file 2");
            rf2 = true;
        }
    }

    Ok(())
}

Interestingly, if we have a very large file1.txt, we get a strange output. The second file is processed first (read file 2); but inside our loop, the program seems to block and wait for the first file.

program started
Launched Thread 1
Launched Thread 2
read file 2
read file 1
completed file 1
completed file 2

Multi-threading can be a bit tricky because we have to account for the atomic operations which can be blocking. We used the read function to unlock our variable, and the documentation warns about this behavior.

Locks this rwlock with shared read access, blocking the current thread until it can be acquired.

Luckily, there is a try_read function which returns an Err if the lock can't be acquired.

Attempts to acquire this rwlock with shared read access.

If the access could not be granted at this time, then Err is returned. Otherwise, an RAII guard is returned which will release the shared access when it is dropped.

In this second attempt, we use try_read and ignore the Errs returned since they should mean our lock is busy. This helps moving our program to the next variable, and handles the one which gets ready first.

main.rs

use std::io;
use std::sync::RwLock;
use std::thread;

use lazy_static::lazy_static;

mod file;

lazy_static! {
    static ref FILE1: RwLock<String> = RwLock::new(String::from(""));
    static ref FILE2: RwLock<String> = RwLock::new(String::from(""));
}

fn main() -> io::Result<()> {
    println!("program started");

    let thread_1 = thread::spawn(|| {
        let mut w1 = FILE1.write().unwrap();
        *w1 = file::read_file("src/file1.txt").unwrap();
        println!("read file 1");
    });

    println!("Launched Thread 1");

    let thread_2 = thread::spawn(|| {
        let mut w2 = FILE2.write().unwrap();
        *w2 = file::read_file("src/file2.txt").unwrap();
        println!("read file 2");
    });

    println!("Launched Thread 2");

    let mut rf1: bool = false;
    let mut rf2: bool = false;

    loop {
        let r1 = FILE1.try_read();
        let r2 = FILE2.try_read();

        match r1 {
            Ok(v) => {
                if *v != String::from("") && rf1 == false {
                    println!("completed file 1");
                    rf1 = true;
                }
            }
            // If rwlock can't be acquired, ignore the error
            Err(_) => {}
        }

        match r2 {
            Ok(v) => {
                if *v != String::from("") && rf2 == false {
                    println!("completed file 2");
                    rf2 = true;
                }
            }
            // If rwlock can't be acquired, ignore the error
            Err(_) => {}
        }
    }

    Ok(())
}

Now the execution is different. If file1.txt is much bigger than file2.txt, then the second file should be processed first.

program started
Launched Thread 1
Launched Thread 2
read file 2
completed file 2
read file 1
completed file 1

The limits of multi-threading

Why do we need Async if we already have multi-threading? There are two main advantages: performance and simplicity. Spawning threads is expensive; and as you can conclude from the above, writing multi-threaded code can get quite complicated.

This answer on stackoverflow touches up on these points.

Why is async considered better performing than multi-threading?

When your code CPU-bound, there will be a little difference between between Tasks + async/await and pure multi-threaded code. In IO bound code, multi threading is the worst throughput you can have. Tasks + async/await will blow away any IO-bound-threadpool you can write your own. threads don't scale. usually (especially on Server side) you have both. you read some data from a connection (IO), then continue processing it on the CPU (json parsing, calculations etc.) and write the result back to the connection (IO again). Tasks + async/await are faster in this case than a pure multi threaded code.

It's the simplicity which makes async/await so appealing. writing a synchronous code which is actually asynchronous. if this is not "high level programing", what is?

And that's where Rust Async comes in. Here is a quote for Aaron Turon from Zero-cost futures in Rust .

We’ve wanted something higher level, with better ergonomics, but also better composability, supporting an ecosystem of asynchronous abstractions that all work together. This story might sound familiar: it’s the same goal that’s led to the introduction of futures (aka promises) in many languages, with some supporting async/await sugar on top.

Async, the keywords

Rust focus is to make writing Async code as simple as it can get. You only need to add the async/await keywords to make your code Async: async before the function declaration and await to resolve your async functions.

That sounds great. Let's give it a try.

file.rs

use std::fs::File;
use std::io::{self, Read};

pub async fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path)?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer)?;
    Ok(buffer)
}

main.rs

use std::io;

mod file;

fn main() -> io::Result<()> {
    let r1 = file::read_file("src/file1.txt");
    let r2 = file::read_file("src/file2.txt");

    let f1 = r1.await;
    let f2 = r2.await;

    dbg!(f1);
    dbg!(f2);

    Ok(())
}

This doesn't work, however. await is available only inside an async block or function. If we try to run this code, the compiler will throw this error.

error[E0728]: `await` is only allowed inside `async` functions and blocks
 --> src/main.rs:9:14
  |
5 | fn main() -> io::Result<()> {
  |    ---- this is not `async`
...
9 |     let f1 = r1.await;
  |              ^^^^^^^^ only allowed inside `async` functions and blocks

Can we make the main function async? Unfortunately, it's not that simple; we get another error.

error[E0277]: `main` has invalid return type `impl std::future::Future`
 --> src/main.rs:5:20
  |
5 | async fn main() -> io::Result<()> {
  |                    ^^^^^^^^^^^^^^ `main` can only return types that implement `std::process::Termination`
  |
  = help: consider using `()`, or a `Result`

The error message is a bit intriguing, though. It seems that the async keyword makes our function return a Future instead of the declared type.

Here is some excerpt from the async-await RFC.

The return type of an async function is a unique anonymous type generated by the compiler, similar to the type of a closure. You can think of this type as being like an enum, with one variant for every "yield point" of the function - the beginning of it, the await expressions, and every return. Each variant stores the state that is needed to be stored to resume control from that yield point.

The return type of an async function is a Future (precisely, a closure that implements the Future trait).

The anonymous return type implements Future, with the return type as its Item.

What about await? From the same RFC.

The await! builtin expands roughly to this:

let mut future = IntoFuture::into_future($expression);
let mut pin = unsafe { Pin::new_unchecked(&mut future) };
loop {
    match Future::poll(Pin::borrow(&mut pin), &mut ctx) {
          Poll::Ready(item) => break item,
          Poll::Pending     => yield,
    }
}

This is not a literal expansion, because the yield concept cannot be expressed in the surface syntax within async functions. This is why await! is a compiler builtin instead of an actual macro.

So roughly speaking: async makes our function a Future. await loops through that future until completion. However, there is another missing piece of the puzzle: Rust doesn't resolve the future on its own. We need an executor that is going to run this Async code.

What is an executor?

If you rewind to our multi-threading example, you'll notice that we used a loop in order to detect when our files were processed. It was simple: Loop infinitely until there is some content in the variable and then do something. We could have improved on that by breaking out of the loop if both files are read.

An Async executor is the loop. Rust, by default, doesn't have any built-in executors. There is a number of Async run-times; async-std and Tokio being the most popular ones. The job of a run-time is to poll your async functions (Futures) until they finally return a value.

This discussion on Reddit has more details, but you can ignore the specifics for now.

Creating/Composing a future object (for example, by calling an async function which calls another async function) does not involve any progress. Only when you pass it to something that actively polls it, the future object starts its work and makes progress (poll by poll). At that point, we refer to the whole future thingy as a task. A suspension point simply makes a call to the future's poll method return immediately with a Poll::Pending value if the value is not ready yet.

A simple executor

The futures crate has a very basic executor, and a function to join two Futures. Let's give it a shot.

The following code uses the futures crate version 0.3.4

main.rs

use futures::executor::block_on;
use futures::join;
use std::io;

mod file;

fn main() -> io::Result<()> {

    println!("Program started");

    // Block on the final future
    block_on(load_files());

    Ok(())
}

async fn load_files() {
    // Join the two futures together
    join!(load_file_1(), load_file_2());
}

async fn load_file_1() {
    let r1 = file::read_file("src/file1.txt").await;
    println!("file 1 size: {}", r1.unwrap().len());
}

async fn load_file_2() {
    let r2 = file::read_file("src/file2.txt").await;
    println!("file 2 size: {}", r2.unwrap().len());
}

To verify the asynchronicity, I dumped a bunch of data in file1.txt. Rust is pretty fast; at the time of testing this, file1.txt had over 5Gb of text.

Program started
file 1 size: 5430447414
file 2 size: 7

Unfortunately, this looks like (and it is) blocking on the first file function, again.

So what is Async anyway?

Similar to multi-threading, there are some gotchas and things to worry about in Async programming. The truth is, the async keyword doesn't magically make your code async; it just makes your function return a Future. You still have to do the heavy lifting of scheduling your code execution.

This means that your function has to return quickly that it's not ready instead of being stuck doing calculations. In our case, the clogging is happening specifically at File::Open and file.read_to_string. These two functions are not async and therefore block execution.

We need to create an Async version of these two functions. Luckily, some handsome people over async-std did the hard work of rewriting the std library in Rust to an async version.

This crate provides an async version of std. It provides all the interfaces you are used to, but in an async version and ready for Rust's async/await syntax.

File IO with async-std

The only change we have to make is to replace our std imports with async_std.

For the following example, we are using the async-std crate version 1.5.0

file.rs

// We use async_std instead of std, it's that simple.
use async_std::io;
use async_std::fs::File;
use async_std::prelude::*;

pub async fn read_file(path: &str) -> io::Result<String> {
    let mut file = File::open(path).await?;
    let mut buffer = String::new();
    file.read_to_string(&mut buffer).await?;
    Ok(buffer)
}

The code in main.rs remains the same; the program is still using the block_on executor from the futures crates.

Compile and run the program. (make sure to have a fat file1.txt)

Program started
file 2 size: 7
file 1 size: 5430447414

Finally! file2.txt gets processed first and fast here, then the program moves to file1.txt.

Manually implementing Async with Futures

Let's recap what we learned so far:

  • async makes our function return a Future.
  • Running our Future requires a run-time.
  • The run-time checks if our Future is ready; and when ready returns its value.

One more thing to add: Rust futures are lazy. The executor doesn't constantly poll the Future but instead waits for the Future to signal that it is due for a poll.

The future we will design is rather useless but could demonstrate how progress is made during execution. The future object holds some state (an i32 field named counter). At each poll, we increment our counter by one, and then schedule another poll.

To demonstrate task switching, we'll create two different Future types. Their functionality is identical, though.

main.rs

use futures::executor::block_on;
use futures::join;

mod asyncop;

use asyncop::{Task1, Task2};

fn main() {
    // Wait for the joined future completion
    block_on(asyncop());
}

async fn asyncop() {
    // Join the two futures
    join!(fut1(), fut2());
}

// Future 1
fn fut1() -> Task1 {
    Task1 { count: 0 }
}

// Future 2
fn fut2() -> Task2 {
    Task2 { count: 0 }
}

asyncop.rs

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
pub struct Task1 {
    pub count: i32,
}

impl Future for Task1 {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // If counter reaches 99, our future is ready
        if &self.count > &100 {
            Poll::Ready(())
        } else {
            println!("Task 1 Progress - {}", &self.count);

            // Increment counter
            self.get_mut().count += 1;
            // Schedule future for another poll()
            cx.waker().clone().wake();

            Poll::Pending
        }
    }
}

#[derive(Debug)]
pub struct Task2 {
    pub count: i32,
}

impl Future for Task2 {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // If counter reaches 99, our future is ready
        if &self.count > &100 {
            Poll::Ready(())
        } else {
            println!("Task 2 Progress - {}", &self.count);

            // Increment counter
            self.get_mut().count += 1;
            // Schedule future for another poll()
            cx.waker().clone().wake();

            Poll::Pending
        }
    }
}

The output on execution demonstrates how the executor is switching between Future 1 and Future 2.

...
Task 1 Progress - 95
Task 2 Progress - 95
Task 1 Progress - 96
Task 2 Progress - 96
Task 1 Progress - 97
Task 2 Progress - 97
Task 1 Progress - 98
Task 2 Progress - 98
Task 1 Progress - 99
Task 2 Progress - 99
Task 1 Progress - 100
Task 2 Progress - 100

Wrap-up

In this post we covered synchronous code, multi-threaded code, some Async terminology in Rust, the async-std library and a simple Future implementation. This is, really, a lightweight introduction and many of the specifics were omitted for brevity.

I've compiled a list of resources below if you want to explore more.

Further resources on Async Rust