Actix is a Rust crate that provides a framework for developing service-style programs using the Actor Model, a popular design pattern for writing complex concurrent applications. I have found it a useful abstraction due to the way it allows you to decompose your service into discrete actors, each responsible for some subset of the service logic, and communicate between these with strongly-typed messages.
In particular I think this kind of abstraction is useful in Rust as it allows you to avoid some of the noise that comes with managing synchronisation of shared resource by hand.
The Actix actor framework should not be confused with Actix Web, a web server framework developed by the same people, which was formerly based on Actix. Now it appears Actix-web has largely moved away from Actix, but I still think the actor framework part is useful and interesting for general purposes.
There are many crates that provide actor model abstractions for rust, and I don’t claim to know which is the best or most fully-featured. However, I can say that I have worked with Actix in anger and found it to be performant, flexible and relatively intuitive.
In this post, I’ll give a brief breakdown of the steps to build a simple Actix service and play with some different execution models provided by the framework.
1. Getting started
Let’s get started with a boilerplate Rust program, to which we’ll add the Actix dependency.
cargo new actix-demo
cd actix-demo
cargo add actix
Great! Our Cargo.toml
now looks like:
[package]
name = "actix-demo"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix = "0.13.0"
2. Creating an Actor
Let’s create our first actor, a very simple one that will provide some arithmetic operations.
Alongside the main.rs
, let’s create arithmetic_service.rs
:
use actix::{Actor, Context};
pub(crate) struct ArithmeticService;
impl Actor for ArithmeticService {
type Context = Context<Self>;
fn started(&mut self, _context: &mut Self::Context) {
println!("ArithmeticService is running");
}
fn stopped(&mut self, _context: &mut Self::Context) {
println!("ArithmeticService stopped");
}
}
Implementing the started
and stopped
functions is optional, but seems useful in this context.
To get this up and running, we need to decorate our main
function with the #[actix::main]
macro (which essentially just starts the async runtime); declare main
as async (more on this later); and then start our service (requiring the Actor
trait to be in scope):
mod arithmetic_service;
use crate::arithmetic_service::{GreetingService};
use actix::{Actor};
#[actix::main]
async fn main() {
ArithmeticService.start();
}
And voila:
cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.79s
Running `target/debug/actix-demo`
ArithmeticService is running
ArithmeticService stopped
3. Defining a message
Actix uses the concept of “messages” for communicating with actors. While more complex to setup and use than, say, a function call, messages are a big part of how the framework provides a separation of concerns and manages concurrency.
To get us started, let’s give our service functionality for returning the square of an input (*yawn*). We define messages as a struct with the Message
derivation, and rtype
macro to define a return type. We’ll put these in arithmetic_service.rs
, although message definitions might also be defined elsewhere and shared between services/actors. That the result type must be declared by name rather than as an actual type declaration is not something I’m wildly keen on, but rest-assured the compiler complains if this string does not resolve as a valid type.
#[derive(Message)]
#[rtype(result = "i64")]
pub(crate) struct Square{
pub input: i64,
}
Now we make our service able to “handle” this type of message, by implementing the Handler
trait:
impl Handler<Square> for ArithmeticService {
type Result = i64;
fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
message.input * message.input
}
}
4. Sending a message to the service
We can now send a Square
message to the service from main
. To do so, when we create the service actor, we need to keep a “address” for the service, which allows us to speak to the service.
We send the message using the send
function, which returns a promise so must be await
ed. Send returns a Result
because it might fail if, for example, the service is not running, or its message queue is full (topics for another day).
Finally, to make things a little cleaner, I’ll add the derive-new
crate and derive this for the Square
struct to allow creating the message concisely.
#[actix::main]
async fn main() {
let service_address = ArithmeticService.start();
let result = service_address.send(Square::new(5_i64)).await;
match result {
Ok(square) => println!("Square was: {}", square),
_ => println!("Communication with the actor has failed"),
}
}
And when we run we get:
ArithmeticService is running
Square was: 25
Nice! A couple of points about this:
- Using the
send
function is only one option, there are also other optionsdo_send
andtry_send
which have different semantics around synchronicity and error handling. - I was a little surprised to not see the
ArithmeticService stopped
message in the stderr; I see the stopped function invoked in other applications I’ve written. For this post I’m not too interested in investigating whether this is a bug in Actix, or whether there is something I’m missing in the disposal of the resources. The documentation indicates that the service should be torn down when there are no addresses left active, but dropping the address used inmain
does not seem to help.
5. Concurrency
To play with the concurrency of Actix, let’s first modify our service to simulate a more interesting application than just calculating squares. If we add a delay and some logging into the Square
message handler:
fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
println!("started processing Square({})", message.input);
// Simulate some work being done by waiting for n seconds, where n is the input value
let delay_in_seconds = message.input.try_into().unwrap();
sleep(Duration::from_secs(delay_in_seconds));
println!("finished processing Square({})", message.input);
message.input * message.input
}
Note that I’m using a call to unwrap
in this code for brevity which I would certainly not do in a production application.
Now let’s send multiple messages to the service in main
. I’m going to do this by collecting the futures returned from calls to send
and awaiting them out-of-order. The FuturesUnordered
struct is a really nice tool for this purpose, essentially providing a mechanism for polling futures and responding to them as they complete:
let mut futures = FuturesUnordered::new();
futures.push(service_address.send(Square::new(5_i64)));
futures.push(service_address.send(Square::new(2_i64)));
futures.push(service_address.send(Square::new(3_i64)));
while let Some(send_result) = futures.next().await {
match send_result {
Ok(square) => println!("Got square: {}", square),
Err(err) => println!("Error communicating with service: {}", err),
};
}
Now when we run we get:
ArithmeticService is running
started processing Square(5)
finished processing Square(5)
started processing Square(2)
finished processing Square(2)
started processing Square(3)
finished processing Square(3)
Got square: 25
Got square: 4
Got square: 9
What’s happening here? Actix uses a single-threaded asynchronous event-loop for all it’s processing. This means that both the invocations of handle
and the main
function are all running on the same single thread. In particular, the blocking calls to sleep
in the message handler are a problem for this processing model as they block the whole event loop.
To help us see what’s going on, let’s capture some thread IDs in our logging by adding to main
:
println!("Running main on thread: {:?}", thread::current().id());
And in handle
:
println!(
"started processing Square({}), on {:?}",
message.input,
thread::current().id(),
);
To prevent this blocking behaviour we should perform our message handling asynchronously, so as not to block the event loop. If we change our handle
function to use an asynchronous sleep
from tokio
, we can send this to the event loop and wait for it to complete without blocking:
fn handle(&mut self, message: Square, context: &mut Self::Context) -> Self::Result {
println!(
"started processing Square({}), on {:?}",
message.input,
thread::current().id()
);
let processing_task = async move {
let delay_in_seconds = message.input.try_into().unwrap();
sleep(Duration::from_secs(delay_in_seconds)).await;
};
context.wait(processing_task.into_actor(self));
println!("finished processing Square({})", message.input);
message.input * message.input
}
The magic incantation here is context.wait(processing_task.into_actor(self));
which executes processing_task
on whatever execution environment is available to the actor context. This is nice as we don’t need to know about the internals of this, we just tell Actix to run it. In fact, the execution environment in this case comes from what Actix terms an Arbiter. In this case, we are using the default System Arbiter, as we have not told Actix to do anything else.
So how does our program behave now?
Running main on thread: ThreadId(1)
ArithmeticService is running
started processing Square(5) on ThreadId(1)
finished processing Square(5)
Got square: 25
started processing Square(2) on ThreadId(1)
finished processing Square(2)
Got square: 4
started processing Square(3) on ThreadId(1)
finished processing Square(3)
Got square: 9
ArithmeticService stopped
Great! We get our message responses as soon as they are ready, and they are still guaranteed to be in-order, which may be a requirement of whatever system we are building. Notice also that everything is running on the same thread.
6. Synchronous processing with multiple threads
As a final modification to this example, let’s switch from using this single-threaded asynchronous execution environment to a multi-threaded synchronous one. This will change the behaviour such that messages are no longer responded to in FIFO order.
To do this we can use the Actix SyncArbiter
in place of the implicit system arbiter. When we create our actor using this, we can specify the number of threads to create for processing, each of which will have it’s own ArithmeticService
to process messages.
In main
, we swap out the way we create our service:
let processing_threads = 3;
let service_address = SyncArbiter::start(processing_threads, || ArithmeticService);
And we change the type of Context
used by our service:
type Context = SyncContext<Self>;
SyncContext
, being a context designed for synchronous processing, does not allow us to spawn
or wait
for futures, so we replace our tokio
sleep with a synchronous equivalent:
fn handle(&mut self, message: Square, _context: &mut Self::Context) -> Self::Result {
println!(
"started processing Square({}), on {:?}",
message.input,
thread::current().id(),
);
let delay_in_seconds = message.input.try_into().unwrap();
thread::sleep(Duration::from_secs(delay_in_seconds));
println!("finished processing Square({})", message.input);
message.input * message.input
}
And we’re done. Let’s see how this behaves:
Running main on thread: ThreadId(1)
ArithmeticService is running
ArithmeticService is running
ArithmeticService is running
started processing Square(5), on ThreadId(4)
started processing Square(2), on ThreadId(3)
started processing Square(3), on ThreadId(2)
finished processing Square(2)
Got square: 4
finished processing Square(3)
Got square: 9
finished processing Square(5)
Got square: 25
ArithmeticService stopped
ArithmeticService stopped
ArithmeticService stopped
Presto. We see that three ArithmeticService
s are started on separate threads, and each one processes one of the messages we send. All three messages are processed concurrently, and their responses are returned when ready, completing out-of-order.
Conclusions
For those that have made it this far, hopefully this post has been a useful exploration of Actix and how it works, and the different ways in which it can operate.
The final state of the source code for this project is on Github here: https://github.com/benp44/actix-demo.