Unrecognized Escape Sequence

An increasingly grumpy blog about software engineering

Building services in Rust with the Actix framework

Actix is a Rust crate that provides a framework for developing service-style programs using the Actor Model, a popular design pattern for writing complex concurrent applications. I have found it a useful abstraction due to the way it allows you to decompose your service into discrete actors, each responsible for some subset of the service logic, and communicate between these with strongly-typed messages.

In particular I think this kind of abstraction is useful in Rust as it allows you to avoid some of the noise that comes with managing synchronisation of shared resource by hand.

The Actix actor framework should not be confused with Actix Web, a web server framework developed by the same people, which was formerly based on Actix. Now it appears Actix-web has largely moved away from Actix, but I still think the actor framework part is useful and interesting for general purposes.

There are many crates that provide actor model abstractions for rust, and I don’t claim to know which is the best or most fully-featured. However, I can say that I have worked with Actix in anger and found it to be performant, flexible and relatively intuitive.

In this post, I’ll give a brief breakdown of the steps to build a simple Actix service and play with some different execution models provided by the framework.

1. Getting started

Let’s get started with a boilerplate Rust program, to which we’ll add the Actix dependency.

cargo new actix-demo
cd actix-demo
cargo add actix

Great! Our Cargo.toml now looks like:

[package]
name = "actix-demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix = "0.13.0"

2. Creating an Actor

Let’s create our first actor, a very simple one that will provide some arithmetic operations.

Alongside the main.rs, let’s create arithmetic_service.rs:

use actix::{Actor, Context};

pub(crate) struct ArithmeticService;

impl Actor for ArithmeticService {
    type Context = Context<Self>;

    fn started(&mut self, _context: &mut Self::Context) {
        println!("ArithmeticService is running");
    }

    fn stopped(&mut self, _context: &mut Self::Context) {
        println!("ArithmeticService stopped");
    }
}

Implementing the started and stopped functions is optional, but seems useful in this context.

To get this up and running, we need to decorate our main function with the #[actix::main] macro (which essentially just starts the async runtime); declare main as async (more on this later); and then start our service (requiring the Actor trait to be in scope):

mod arithmetic_service;

use crate::arithmetic_service::{GreetingService};
use actix::{Actor};

#[actix::main]
async fn main() {
    ArithmeticService.start();
}

And voila:

cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.79s
     Running `target/debug/actix-demo`
ArithmeticService is running
ArithmeticService stopped

3. Defining a message

Actix uses the concept of “messages” for communicating with actors. While more complex to setup and use than, say, a function call, messages are a big part of how the framework provides a separation of concerns and manages concurrency.

To get us started, let’s give our service functionality for returning the square of an input (*yawn*). We define messages as a struct with the Message derivation, and rtype macro to define a return type. We’ll put these in arithmetic_service.rs, although message definitions might also be defined elsewhere and shared between services/actors. That the result type must be declared by name rather than as an actual type declaration is not something I’m wildly keen on, but rest-assured the compiler complains if this string does not resolve as a valid type.

#[derive(Message)]
#[rtype(result = "i64")]
pub(crate) struct Square{
    pub input: i64,
}

Now we make our service able to “handle” this type of message, by implementing the Handler trait:

impl Handler<Square> for ArithmeticService {
    type Result = i64;

    fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
        message.input * message.input
    }
}

4. Sending a message to the service

We can now send a Square message to the service from main. To do so, when we create the service actor, we need to keep a “address” for the service, which allows us to speak to the service.

We send the message using the send function, which returns a promise so must be awaited. Send returns a Result because it might fail if, for example, the service is not running, or its message queue is full (topics for another day).

Finally, to make things a little cleaner, I’ll add the derive-new crate and derive this for the Square struct to allow creating the message concisely.

#[actix::main]
async fn main() {
    let service_address = ArithmeticService.start();
    let result = service_address.send(Square::new(5_i64)).await;

    match result {
        Ok(square) => println!("Square was: {}", square),
        _ => println!("Communication with the actor has failed"),
    }
}

And when we run we get:

ArithmeticService is running
Square was: 25

Nice! A couple of points about this:

  • Using the send function is only one option, there are also other options do_send and try_send which have different semantics around synchronicity and error handling.
  • I was a little surprised to not see the ArithmeticService stopped message in the stderr; I see the stopped function invoked in other applications I’ve written. For this post I’m not too interested in investigating whether this is a bug in Actix, or whether there is something I’m missing in the disposal of the resources. The documentation indicates that the service should be torn down when there are no addresses left active, but dropping the address used in main does not seem to help.

5. Concurrency

To play with the concurrency of Actix, let’s first modify our service to simulate a more interesting application than just calculating squares. If we add a delay and some logging into the Square message handler:

    fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
        println!("started processing Square({})", message.input);
   
        // Simulate some work being done by waiting for n seconds, where n is the input value
        let delay_in_seconds = message.input.try_into().unwrap();
        sleep(Duration::from_secs(delay_in_seconds));

        println!("finished processing Square({})", message.input);

        message.input * message.input
    }

Note that I’m using a call to unwrap in this code for brevity which I would certainly not do in a production application.

Now let’s send multiple messages to the service in main. I’m going to do this by collecting the futures returned from calls to send and awaiting them out-of-order. The FuturesUnordered struct is a really nice tool for this purpose, essentially providing a mechanism for polling futures and responding to them as they complete:

    let mut futures = FuturesUnordered::new();
    futures.push(service_address.send(Square::new(5_i64)));
    futures.push(service_address.send(Square::new(2_i64)));
    futures.push(service_address.send(Square::new(3_i64)));

    while let Some(send_result) = futures.next().await {
        match send_result {
            Ok(square) => println!("Got square: {}", square),
            Err(err) => println!("Error communicating with service: {}", err),
        };
    }

Now when we run we get:

ArithmeticService is running
started processing Square(5)
finished processing Square(5)
started processing Square(2)
finished processing Square(2)
started processing Square(3)
finished processing Square(3)
Got square: 25
Got square: 4
Got square: 9

What’s happening here? Actix uses a single-threaded asynchronous event-loop for all it’s processing. This means that both the invocations of handle and the main function are all running on the same single thread. In particular, the blocking calls to sleep in the message handler are a problem for this processing model as they block the whole event loop.

To help us see what’s going on, let’s capture some thread IDs in our logging by adding to main:

println!("Running main on thread: {:?}", thread::current().id());

And in handle:

    println!(
        "started processing Square({}), on {:?}",
        message.input,
        thread::current().id(),
    );

To prevent this blocking behaviour we should perform our message handling asynchronously, so as not to block the event loop. If we change our handle function to use an asynchronous sleep from tokio, we can send this to the event loop and wait for it to complete without blocking:

    fn handle(&mut self, message: Square, context: &mut Self::Context) -> Self::Result {
        println!(
            "started processing Square({}), on {:?}",
            message.input,
            thread::current().id()
        );

        let processing_task = async move {
            let delay_in_seconds = message.input.try_into().unwrap();
            sleep(Duration::from_secs(delay_in_seconds)).await;
        };

        context.wait(processing_task.into_actor(self));

        println!("finished processing Square({})", message.input);
        message.input * message.input
    }

The magic incantation here is context.wait(processing_task.into_actor(self)); which executes processing_task on whatever execution environment is available to the actor context. This is nice as we don’t need to know about the internals of this, we just tell Actix to run it. In fact, the execution environment in this case comes from what Actix terms an Arbiter. In this case, we are using the default System Arbiter, as we have not told Actix to do anything else.

So how does our program behave now?

Running main on thread: ThreadId(1)
ArithmeticService is running
started processing Square(5) on ThreadId(1)
finished processing Square(5)
Got square: 25
started processing Square(2) on ThreadId(1)
finished processing Square(2)
Got square: 4
started processing Square(3) on ThreadId(1)
finished processing Square(3)
Got square: 9
ArithmeticService stopped

Great! We get our message responses as soon as they are ready, and they are still guaranteed to be in-order, which may be a requirement of whatever system we are building. Notice also that everything is running on the same thread.

6. Synchronous processing with multiple threads

As a final modification to this example, let’s switch from using this single-threaded asynchronous execution environment to a multi-threaded synchronous one. This will change the behaviour such that messages are no longer responded to in FIFO order.

To do this we can use the Actix SyncArbiter in place of the implicit system arbiter. When we create our actor using this, we can specify the number of threads to create for processing, each of which will have it’s own ArithmeticService to process messages.

In main, we swap out the way we create our service:

    let processing_threads = 3;
    let service_address = SyncArbiter::start(processing_threads, || ArithmeticService);

And we change the type of Context used by our service:

    type Context = SyncContext<Self>;

SyncContext, being a context designed for synchronous processing, does not allow us to spawn or wait for futures, so we replace our tokio sleep with a synchronous equivalent:

    fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
        println!(
            "started processing Square({}), on {:?}",
            message.input,
            thread::current().id(),
        );

        let delay_in_seconds = message.input.try_into().unwrap();
        thread::sleep(Duration::from_secs(delay_in_seconds));

        println!("finished processing Square({})", message.input);
        message.input * message.input
    }

And we’re done. Let’s see how this behaves:

Running main on thread: ThreadId(1)
ArithmeticService is running
ArithmeticService is running
ArithmeticService is running
started processing Square(5), on ThreadId(4)
started processing Square(2), on ThreadId(3)
started processing Square(3), on ThreadId(2)
finished processing Square(2)
Got square: 4
finished processing Square(3)
Got square: 9
finished processing Square(5)
Got square: 25
ArithmeticService stopped
ArithmeticService stopped
ArithmeticService stopped

Presto. We see that three ArithmeticServices are started on separate threads, and each one processes one of the messages we send. All three messages are processed concurrently, and their responses are returned when ready, completing out-of-order.

Conclusions

For those that have made it this far, hopefully this post has been a useful exploration of Actix and how it works, and the different ways in which it can operate.

The final state of the source code for this project is on Github here: https://github.com/benp44/actix-demo.