I've been working on a new side project recently called Rivelin. It's a cloud deployment service inspired by Binder, Railway and Qovery. These deploy and run application code on the cloud, while abstracting away the complexity of managing container images and infrastructure. I want to build something that takes inspiration from these, but with a few twists and a focus on data science and machine learning projects.

I'm planning to write a couple of blog posts about some of the components of Rivelin. For the moment, I've decided not to open-source the codebase. Instead, I've moved some of the components into their own repo at rivelin_extra.

Rivelin is built in Rust using axum and htmx. It receives HTTP requests and uses background services for long running tasks.

These posts are going to focus on how Rivelin implements background services using a custom actor framework. There are a lot of Rust actor frameworks out there, but part of the goal of Rivelin is my learning, so we're going to build our own.

We'll cover:

  1. Building an actor framework and using it to implementing a background task manager.
  2. Building a pub-sub event bus.

This post is 1 of 2.

If you want to find out more about Rivelin or would like a demo, drop me a message on linkedin.

Rivelin's Architecture

Before we dive into some code lets take a quick look at a subsection of Rivelin's architecture to give some context to what we want to achieve (see diagram below).

On the left we have three HTTP handlers. The first triggers the build of an OCI image, while the second deletes or cancels a build. The last handler streams events back to the client (using server-sent events) which provides information about the progress of the build. In reality there are quite a few other handlers involved here (particularly because we're using htmx to update parts of the UI), but these illustrate the most important parts of the system.

On the right we have background services, which in this case are all actors. These are:

  • The Build Task Manager. Responsible for managing container builds, including starting and stopping them.
  • The EventBus. An event bus receives events from producers and delivers them to consumers.
  • The Database sink. This listens for events and persists them to storage.

Solid lines show messages being sent to an actor, while dashed lines show responses to the messages.

In this post we're only going to implement the Build Task Manager actor.

Why Actors

The Actor model is a mathematical model of concurrent computation which treats actors as the base unit of computation. Actors are entities that:

  • Only communicate by sending asynchronous messages.
  • Only have local state.
  • Can create new actors, send messages to other actors, and respond to messages.

The combination of communicating only via well defined messages and only having local state makes actors naturally modular and loosely-coupled. These characteristics can help to give our code structure and reduce it's complexity, making it easier to understand, test, and change.

We can exchange one actor for another, as long as they support the same message types, and can make changes to the internal workings of the actor without needing to alter other components in the system.

Actors are designed for concurrent computation, although how the achieve concurrency is an implementation details. Actors could be running on distributed machines, on a single machine with multiple threads, or even in a single-threaded event loop. This is great for Rivelin because we want to implement background services that run concurrently along side our HTTP handlers. In Rivelin's case we are using tokio to run our actors on a multithreaded work-stealing async runtime.

Our first actor - Hello World!

To build an actor framework in Rust we're going to use two language features that are often considered advanced - async/await and traits. If you don't have a basic understanding of these you might find the following resources helpful:

It's worth flagging this fantastic blog post on Actors with tokio by Alice Ryhl, which was the inspiration for my implementation.

Please note: Rivelin's actor framework is not production ready, is feature poor and has not been thoroughly tested.

  • Update: I came across ractor while scouring reddit, which has a similar API to the one I built for Rivelin. I've only glanced at it but it has a lot of cool features, including distributed actors. It could be worth looking into.

We're going to start by implementing an actor, HelloWorldActor, before diving into how the actor framework is implemented.

The HelloWorldActor can receive a single message, which has the String type. When we send a message it will send back a response (also a String). The actor holds some internal state (a u8) that keeps track of the number of messages it has received.

Lets look at the HelloWorldActor in action (it's implemented as a test in rivelin_extra if you want to run the code):

let actor = HelloWorldActor; // Instantiate the actor.
let state = 0; // Instantiate the actors state.

// Spawn the actor on tokio's multi-threaded runtime.
let (addr, _handle): (HelloWorldAddr, _) = Actor::spawn(actor, state);

// Send a message to the actor via its address and await its response.
let response = addr.message("World").await?;
assert_eq!(response, "Hello World! I have received 1 messages");

We spawn an actor on the Tokio runtime by providing an instance of the actor and its initial state to the Actor::spawn associated function. This returns an address (addr) for the actor and a handle. The addr can be cheaply cloned so we can send messages to the actor from multiple places in our code. The handle allows us stop the actor, either by asking it to gracefully shutdown, or by aborting it (see abort).

The actor's addr is a custom type called HelloWorldAddr. This has a single method, message, which provides a convenient way to send messages to the actor and await the response.

Implementing HelloWorldActor

Lets see how this actor is implemented using our actor framework. We'll start by defining the message type that the actor will receive:

#[derive(Debug)]
struct HelloMessage {
    content: String,
    channel: tokio::sync::oneshot::Sender<String>,
}

This struct has two fields, the first holds the message we want to send to the actor, and the second holds the receiving end of a oneshot channel, which the HelloWorldActor will use to send a response back to the caller.

We can now implement our actor. We do this by defining a HelloWorldActor struct and then implement the Actor trait for it:

struct HelloWorldActor;

impl Actor for HelloWorldActor {
    type Message = HelloMessage;
    type State = u8;

    async fn handle(&self, message: Self::Message, n_messages: &mut Self::State) {
        *n_messages += 1; // Increment count of messages received.

        // Prepare a response
        let response = format!(
            "Hello {}! I have received {} messages",
            message.content, n_messages
        );

        // Send the response back to the caller using the oneshot channel provided in the message.
        message
            .channel
            .send(response)
            .expect("Channel must be open");
    }
}

The Actor trait has two associated types. The first provides the type of Message the actor can receive, while the second provides the type for the actors internal State (this will be a count of the number of messages received).

Next we implement the trait's handle method. This is called when the actor receives a message. Here it simply increments the message counter by one, creates a response and sends it back to the caller using the oneshot channel provided in the message.

The Actor trait defines other methods but these all have default implementations so we don't need to worry about them here. We'll use these in the next example.

This is all we need to implement our first actor! However, constructing messages to send to the actor is somewhat verbose:

// Spawn the actor
let (addr, _handle): (Addr<HelloWorldActor>, _) = Actor::spawn(HelloWorldActor, 0);

// Create a message
let (tx, rx) = oneshot::channel();
addr.send(HelloMessage {
    content: "Alice".to_string(),
    channel: tx,
}).await?;

// Listen for a response on the oneshot channel receiver
let response = rx.await?;

To create the API we saw in our first example we can create a wrapper type around the raw address (Addr<HelloWorldActor>):

// Wrapper around `Addr<HelloWorldActor>`
struct HelloWorldAddr(Addr<HelloWorldActor>);

impl HelloWorldAddr {
    async fn message(&self, message: impl Into<String>) -> anyhow::Result<String> {
        let (tx, rx) = oneshot::channel();

        self.0
            .send(HelloMessage {
                content: message.into(),
                channel: tx,
            })
            .await?;

        Ok(rx.await?)
    }
}

impl From<Addr<HelloWorldActor>> for HelloWorldAddr {
    fn from(addr: Addr<HelloWorldActor>) -> Self {
        Self(addr)
    }
}

and now the caller can use:

let response = addr.message("World").await?;

This abstracts away the oneshot channel which could make it easier to change the actors implementation in the future (for example, if we wanted to use a different channel implementation or even communicate with actors over the network). It also means our uses don't necessarily need to understand channels to use our API.

The implementation of the From trait for our HelloWorldAddr allows the Actor::spawn function to return the wrapper type instead of Addr<HelloWorld>.

I don't love that we need to implement a wrapper type for each actor, just to get a response back. This seems like a common task so I might rethink this soon.

The Actor trait

Now we'll look at how our actor framework is implement. It consists of a single Actor trait. This trait is a little bit complex, but we'll break it down to understand what is going on:

pub trait Actor
where
    Self: 'static + Sized + Send,
{
    type Message: Send;
    type State: Send;

    fn handle(
        &self,
        message: Self::Message,
        state: &mut Self::State,
    ) -> impl std::future::Future
    <Output = ()> + Send;

    fn run(
        self,
        mut message_stream: impl Stream<Item = Self::Message> + Send + 'static + std::marker::Unpin,
        mut state: Self::State,
    ) -> impl std::future::Future<Output = ()> + Send {
        async move {
            while let Some(message) = message_stream.next().await {
                self.handle(message, &mut state).await;
            }
        }
    }

    fn spawn<K>(actor: Self, state: Self::State) -> (K, tokio::task::JoinHandle<()>)
    where
        K: From<Addr<Self>>,
    {
        let (sender, receiver) = mpsc::channel::<Self::Message>(1000);

        let handle = tokio::spawn(async move {
            let mut state = state;
            actor.on_start(&mut state).await;
            actor.run(ReceiverStream::new(receiver), state).await;
        });

        let addr = Addr::<Self>::new(sender);

        (addr.into(), handle)
    }

    fn on_start(&self, _state: &mut Self::State) -> impl std::future::Future<Output = ()> + Send {
        async {}
    }

    fn on_stop(self, _state: &mut Self::State) -> impl std::future::Future<Output = ()> + Send {
        async {}
    }
}

First lets take a look at this where clause in the trait definition:

pub trait Actor
where
    Self: 'static + Sized + Send,
{

These are trait bounds which "stipulate what functionality a type implements". In this case, the Actor trait requires that any type that implements it must be 'static and implement the Sized, and Send traits.

In the context of traits, a 'statictrait bound means that the type cannot contain any non-static references (i.e. we can hold onto the type for an indefinite amount of time while guaranteeing any references it holds will not become invalid). For example, the following type contains a non-static reference and so attempting to implement the Actor trait will result in a compilation error:

struct MyActor<'a> {
    data: &'a str,
}

This is required because actors are moved into a tokio::Task (essentially an asynchronous green thread). That means the actor could outlive the the data pointed to by the reference it holds, which could result in a use after free bug.

Types implementing Actor must be Sized, meaning they have a known size at compile time. This is required because Rust needs to know how much memory to allocate for the type so it can be placed on the stack.

Finally it must implement the Send trait. Send is a marker trait that means the type is safe to send between threads. This is required because Tokio uses a multi-threaded runtime, and the actor could be moved between threads.

The Send trait bound is required because we're using the Tokio multithreaded runtime, not because we are using async rust. If we used a single threaded runtime we could relax this requirement.

The next part defines two associated types for the trait:

type Message: Send;
type State: Send;

These are the Message and State types that we saw in the HelloWorldActor example. Both of these have the Send trait bound. State because it is moved into the spawned tokio task as per the actor itself. The Message type requires Send because the message is sent from the actors address to the actor, and this could cross a thread bound.

The most important method is handle:

fn handle(
    &self,
    message: Self::Message,
    state: &mut Self::State,
) -> impl std::future::Future
<Output = ()> + Send;

You will remember from out HelloWorldActor example that we implemented this method using async/await syntax:

async fn handle(&self, message: Self::Message, n_messages: &mut Self::State) {
}

This async fn is syntactic sugar for:

fn handle(
        &self,
        message: Self::Message,
        state: &mut Self::State,
    ) -> impl std::future::Future
    <Output = ()> 

This is quite new in Rust, with "return position impl Traits" only being supported since Rust 1.75. The additional Send bound is required because Tokio's multi-threaded runtime is work-stealing. That means that at any time, the future could be moved to a different thread.

  • It's recommended that Rust libraries implement two versions on traits that return Futures - one with a Send bound and one without. As Rivelin's actor framework will only work with Tokio I have only implemented a version with Send bounds.

There is one final method that we didn't show above:

fn run(
    self,
    mut message_stream: impl Stream<Item = Self::Message> + Send + 'static + std::marker::Unpin,
    mut state: Self::State,
    cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = ()> + Send {
    async move {
        loop {
            tokio::select! {
                _ = cancellation_token.cancelled() => {
                    self.on_stop(&mut state).await;
                    break;
                },
                message = message_stream.next() => {
                    match message {
                        Some(message) => self.handle(message, &mut state).await,
                        None => {
                            self.on_stop(&mut state).await;
                            break
                        },
                    };
                }
            }
        }
    }
}

This method is the actors event-loop and starts when the actor is spawned. It has a default implementation so actors generally do not need to implement it.

It uses the tokio::select! macro to wait on multiple branches concurrently, returning when the first branch completes. The second branch is the key one, which iterates over the message stream, passing each message to the actors .handle method. The first branch listens for a cancellation signal, which can be used to gracefully shut down the actor.

The CancellationToken used in the first branch is a type that can be used to cancel the actor. In the default .run implementation, when the cancellation token is cancelled the actor will call its on_stop method and then break from the loop (shutting the actor down). You can implement any shutdown logic you like in the on_stop method:

fn on_stop(self, _state: &mut Self::State) -> impl std::future::Future<Output = ()> + Send {
    async {}
}

You'll notice this takes self as an argument, which means this function takes ownership of self. This has the useful effect of ensuring we get a compilation error if we don't immediately break from the loop after calling on_stop. This ensures that if .run is overridden, the event loop will still exit after on_stop is called.

message_stream type (advanced)

  • You'll notice the message_stream argument has a complex type. It is a type which implements the Stream trait, which is the async equivilent of the Iterator trait.
  • We've seen the Send and 'static bounds before. The last bound std::marker::Unpin is another marker trait. Rust has a concept called Pinning, which prevents a value from being moved in memory (which is required if moving the value would invalidate references that it holds). Pinning is an advanced topic and notoriously tricky to understand 🤯. For a great deep dive into Pin, you could read this post from without.boats.

We've come a long way. We've built our first actor, looked at its implementation and got got to grips with the Actor trait. Now we'll try something more complex and implement our first background service which we could use in a web application like Rivelin.

Implementing a "Build Task Manager" Actor

We'll create the "Builder Actor" from the architecture diagram above. We won't build exactly the actor used in Rivelin, but we'll create one that follows the same pattern. Lets remind ourselves how this fits into Rivelin's architecture:

We have two http handlers that interact with a background service. That service is an actor called the BuildTaskManager. In Rivelin this component builds OCI images, but for our purposes we can just assume it's doing some arbitrary work that takes more time than we'd wish to wait before our HTTP handler sends a response back to the client. This work will be happening inside of a tokio::task - an asynchronous green thread.

If you want to run the code, the actor is implemented in a test here.

BuildTaskManager Message type

Our BuildTaskManager will receive two kinds of messages:

  • A message telling the actor to start a "build". This should respond with an id for the build, which can be used to cancel the build if necessary.
  • A message telling the actor to cancel a "build". This should respond with whether the build was cancelled.

We can implement that as an enum in Rust, with the variants representing the different message types:

#[derive(Debug)]
pub enum Message {
    Build {
        resp_chan: oneshot::Sender<tokio::task::Id>,
    },
    Cancel {
        build_id: tokio::task::Id,
        resp_chan: oneshot::Sender<bool>,
    },
}

The Build variant has a oneshot::Sender which will be used to send a unique id for the build back to the caller. The Cancel variant uses that same id to cancel a build task, and a oneshot::Sender which will be used to send back whether the build was cancelled.

This uses tokio::task::Id as the build id. Initially I used uuid::Uuid as the build id, but swapped it out to solve a problem with tracking builds that had panicked. Be aware that tokio::task::Id requires tokio_unstable.

BuildTaskManager State

Next we define the State type for our actor:

pub struct TaskManagerState {
    abort_handles: HashMap<tokio::task::Id, AbortHandle>,
    tasks: JoinSet<()>,
}

This has two interesting fields. The tasks field is a JoinSet, which is a collection of spawned tasks. We can await the completion of a task using its join_next or join_next_with_id methods. Our actor will work by spawning tasks, adding them to the JoinSet and then awaiting for any of the tasks to finish.

The abort_handles field is a HashMap which maps builds tokio::task::Id to their AbortHandle, which can be used to abort the task. We store the AbortHandle so that when the actor receives a Message::Cancel we can retrieve its AbortHandle and abort the task.

An aborted task will stop executing at its next await point. An alternative would be to use CancellationTokens to implement graceful shutdowns.

BuildTaskManager handle method

Lets look at the implementation of the Actor::handle method:

async fn handle(&self, message: Self::Message, state: &mut Self::State) {
    match message {
        Message::Build { resp_chan } => {
            let handle = state.tasks.spawn(async move {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            });

            let task_id = handle.id();
            println!("Building: {}", task_id);
            // Store the abort handle in the build_tracker
            state.abort_handles.insert(task_id, handle);
            // Respond with the task id to allow cancellation
            resp_chan.send(task_id).unwrap();
        }
        Message::Cancel {
            build_id,
            resp_chan: msg,
        } => {
            println!("Cancelling: build: {}", build_id);
            if let Some(handle) = state.abort_handles.remove(&build_id) {
                handle.abort();
                msg.send(true).unwrap();
            } else {
                eprintln!("Task not found for cancellation: {:?}", build_id);
                msg.send(false).unwrap();
            }
        }
    }
}

When a Message::Build is received we a spawn a new task which is stored in the JoinSet. Here that task just sleeps for 1 second, but in Rivelin it builds our OCI image. It then gets an id for the task, stores the AbortHandle for the task in the HashMap and finally sends back the id to the caller.

When a Message::Cancel is received we take the AbortHandle out of the HashMap, attempt to abort the task and then send a message to the caller indicating whether the task was cancelled.

BuildTaskManager run method

We have one final problem to solve. If you recap the default run method above, you'll remember it simply iterates over a stream of messages, passing them to the handle method. However, our actor also needs to check the JoinSet periodically to see if any tasks have finished. When they finish it can then remove its AbortHandle from its HashMap.

To do this we can override the actors .run method:

async fn run(
    self,
    mut message_stream: impl Stream<Item = Self::Message> + Send + 'static + std::marker::Unpin,
    mut state: Self::State,
    cancellation_token: CancellationToken,
) {
    loop {
        tokio::select! {
            _ = cancellation_token.cancelled() => {
                self.on_stop(&mut state).await;
                break;
            },
            message = message_stream.next() => {
                match message {
                    Some(message) => self.handle(message, &mut state).await,
                    None => {
                        self.on_stop(&mut state).await;
                        break
                    },
                };
            },
            res = state.tasks.join_next_with_id(), if !state.tasks.is_empty() => {
                if let Some(Ok((build_id, _))) = res {
                    if let Some(_handle) = state.abort_handles.remove(&build_id) {
                        println!("Task completed: {:?}", build_id);
                    } else {
                        eprintln!("Task not found for cancellation: {:?}", build_id);
                    }
                }
                else if let Some(Err(e)) = res {
                    // Get the build_id from the error and remove it from the build_tracker
                    let build_id = e.id();
                    state.abort_handles.remove(&build_id);
                    if e.is_cancelled() {
                        eprintln!("A task was cancelled.");
                    } else {
                        eprintln!("Task failed. But the actor is still alive, honest");

                    }
                }
            }
        }
    }
}

It's identical to the default, but with an additional branch that checks if any tasks have completed: If a task has completed it removes the AbortHandle from the HashMap. You'll notice it uses a guard,

state.tasks.join_next_with_id(), if !state.tasks.is_empty() 

to ensure the branch only executes if the JoinSet is not empty.

Finally we also need to implement the on_stop method to handle graceful shutdown. It simply waits for all existing tasks to complete:

async fn on_stop(self, state: &mut Self::State) {
    println!("Waiting for all running tasks to complete.");
    while !state.tasks.is_empty() {
        state.tasks.join_next().await;
    }
}

In this implementation the actors address could keep sending messages after graceful shutdown has started, but the actor will never process them. It might be better for callers to receive an error when sending new messages to an actor that is shutting down.

Fault-tolerance: Implementing background services can be a little tricky because the service going down can be catastrophic. One option is to ensure that the service is restarted if it panics. We could implement a supervisor for our actors which would restart them if a panic occurs. However, Rust is exceptionally good at eliminating runtime errors because it treats errors as values. None-the-less, Rust code can still panic, and when prototyping I sometimes catch myself out by overusing unwrap and expect. Luckily our BuildTaskManager will already gracefully handle panics as long as they occur inside of a task in the JoinSet.

Last step: Implement an address wrapper type

Just like in the first example, we can implement a wrapper type to create a simpler API for our actors address:

pub struct BackgroundActorAddr(pub Addr<BuildTaskManager>);

impl From<Addr<BuildTaskManager>> for BackgroundActorAddr {
    fn from(addr: Addr<BuildTaskManager>) -> Self {
        Self(addr)
    }
}
impl BackgroundActorAddr {
    pub async fn build(&self) -> tokio::task::Id {
        let (tx, rx) = oneshot::channel();
        self.0.send(Message::Build { resp_chan: tx }).await.unwrap();
        rx.await.unwrap()
    }
    pub async fn cancel(&self, build_id: tokio::task::Id) -> bool {
        let (tx, rx) = oneshot::channel();
        self.0
            .send(Message::Cancel {
                build_id,
                resp_chan: tx,
            })
            .await
            .unwrap();

        rx.await.unwrap()
    }
}

Start using the BuildTaskManager:

With our API completed, we can now use the BuildTaskManager like this:

let (addr, handle): (BackgroundActorAddr, _) =
        Actor::spawn(BuildTaskManager, TaskManagerState::new());

// Start two builds
let first_build_id = addr.build().await;
let _second_build_id = addr.build().await;

// Cancel one of them
println!("Cancelling: {:?}", first_build_id);
let cancel_success = addr.cancel(first_build_id).await;
assert!(cancel_success);

// Drop addr so that the actor can shut down. It will process any remaining tasks before shutting down.
drop(addr);

// Wait for the actor to shutdown
handle.await.unwrap();

Aborting the actor

We can call abort on the actor's JoinHandle like so:

let (addr, handle): (BackgroundActorAddr, _) =
    Actor::spawn(BuildTaskManager, TaskManagerState::new());

handle.abort();

causing the actor to stop at the next await point and shutdown. In the case of the BuildTaskManager this will cause the JoinSet held in its state to be dropped. Dropping the JoinSet causes all tasks in the set to be immediately aborted. Be aware that any tasks we started that were not in the JoinSet would be orphaned, and continue running.

Graceful shutdown

The actor will gracefully shutdown when all of its addresses have been dropped. However, we can also trigger graceful shutdown using the actors handle:

let (addr, handle): (BackgroundActorAddr, _) =
        Actor::spawn(BuildTaskManager, TaskManagerState::new());

addr.build().await;

handle.graceful_shutdown().await.unwrap();

This uses the CancellationToken we saw earlier to signal to the actor that it should shutdown.

Originally the handle was a tokio JoinHandle, but I changed this to a custom type to implement the graceful shutdown.

Lets wrap this up 😴

Well that was a longer post than I anticipated. I hope you enjoyed it! Let me know if you like this architecture or if you would implement it differently.

Come back soon for the next post on implementing a Pub/Sub Eventbus using the Actor model 🚀

Finally, if your team is hiring and you think I might be a good fit feel free to message me on linkedin.