Published on

Background job processing with rust using actix and redis

Authors

Overview

This article is based on Apalis, a library I have been working on. Apalis allows you to asyncronously launch jobs in the background and should work in any tokio environment.

Motivation

Imagine you are working on your awesome rust project using your favourite web framework. Things are going well but you notice something interesting. The request POST /account/forgot-password is slower than you would expect. Upon investigation, you realize (Oh no!) this is being caused by several issues:

  • Fetch email assets
  • Fetch email data
  • Process email template
  • Inline email css
  • Execute request to mail provider

A user requesting a password shouldn't have to wait this long to get a response. This is where Apalis comes in.

Our goal is to:

  1. Receive requests and respond instantly
  2. Move our forgot password email process to the background

We are going to use:

  • actix-web as web server.
  • apalis as worker.
  • redis as storage.

Getting Started

This tutorial assumes that you have Rust installed. It also assumes you have a running redis instance.

Lets start our new project:

cargo new awesome-app
cd awesome-app

You can now add the dependencies by adding the code below to Cargo.toml

[dependencies]
apalis = { version = "0.3", features=["redis"] }
serde = "1"
actix-rt = "2"
actix-web = "4"
futures = "0.3"
env_logger = "0.7"

Define the job type

In our main.rs, add

use apalis::Job;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct ForgottenEmail {
    pub email: String,
}

impl Job for ForgottenEmail {
    const NAME: &'static str = "awesome-app::ForgottenEmail";
}

Define the job executor

Apalis expects a tower Service<T> and we are going to build one using JobFn which requires async Fn(T, JobContext)-> Result<JobResult, JobError>:

use apalis::{JobContext, JobError, JobResult};

async fn send_email(email: ForgottenEmail, ctx: JobContext) -> Result<JobResult, JobError> {
    Ok(JobResult::Success)
}

The above function does nothing and for now lets leave it like that.

Define the job worker

Apalis has an inbuilt worker for a Storage like Redis and Postgres called StorageWorker. We can use WorkerBuilder to create a factory of StorageWorker workers that will consume our jobs.

use apalis::{WorkerBuilder, WorkerFactoryFn, Monitor};
use apalis::redis::RedisStorage;

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
    let storage = RedisStorage::connect(redis_url).await.expect("Could not connect to redis storage");
    let worker = Monitor::new()
        .register_with_count(2, move |_| {
            WorkerBuilder::new(storage.clone())
                .build_fn(send_email)
        })
        .run();

    worker.await
}

This connects to Redis starting two monitored workers that are listening for new emails.

Expose endpoint to the web

In a worker scenario this would be it, but lets add a web service that receives requests and adds them to redis as jobs. Here actix-web comes in handy.

Here is our route handler:

async fn forgotten_email_endpoint(
    email: web::Json<ForgottenEmail>,
    storage: web::Data<RedisStorage<ForgottenEmail>>,
) -> HttpResponse {
    let storage = &*storage.into_inner();
    let mut storage = storage.clone();
    let res = storage.push(email.into_inner()).await;
    match res {
        Ok(()) => HttpResponse::Ok().body(format!("ForgottenEmail added to queue")),
        Err(e) => HttpResponse::InternalServerError().body(format!("{}", e)),
    }
}

Here is an actix-web example.

NOTE: This example is standalone and can be run without a worker in the current scope. Then you can run scalable heroku style workers connected to the same redis instance.

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
    let storage = RedisStorage::connect(redis_url).await.expect("Could not connect to redis storage");
    let data = web::Data::new(storage.clone());
    let http = HttpServer::new(move || {
        App::new()
            .app_data(data.clone())
            .service(web::scope("/account").route("/forgot-password", web::post().to(forgotten_email_endpoint)))
    })
    .bind("127.0.0.1:8000")?
    .run();
    http.await
}

We can now combine these two to have a final main entry point using the futures crate, and setup some basic logging.

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();
    let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
    let storage = RedisStorage::connect(redis_url)
        .await
        .expect("Could not connect to redis storage");

    let data = web::Data::new(storage.clone());
    let http = HttpServer::new(move || {
        App::new().app_data(data.clone()).service(
            web::scope("/account")
                .route("/forgot-password", web::post().to(forgotten_email_endpoint)),
        )
    })
    .bind("127.0.0.1:8000")?
    .run();
    let worker = Monitor::new()
        .register_with_count(2, move |_| {
            WorkerBuilder::new(storage.clone()).build_fn(send_email)
        })
        .run();

    future::try_join(http, worker).await?;
    Ok(())
}

Now we can run

REDIS_URL=redis://127.0.0.1/ cargo run

Test our endpoint via curl

curl -X POST \
  http://localhost:8000/account/forgot-password \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/json' \
  -d '{
	"email": "test@gmail.com"
}'

You should get back a message: "ForgottenEmail added to queue".

Like what you are reading?

Looking at logs, you should see:

[INFO  actix_server::builder] Starting 2 workers
[INFO  actix_server::server] Actix runtime found; starting in Actix runtime
[DEBUG apalis_redis::storage] Received new job with id: [omited] to list: awesome-app::ForgottenEmail:active
[DEBUG apalis_core::job] service.ready latency=1.698724ms

We can see our job was added but we need more visibility. To do this we are going to use layers also called 'Middleware'.

Middleware

Since Apalis job execution is powered by tower then we can use most utilities in the tower ecosystem such as:

  • rate limit jobs
  • load shed requests
  • filter jobs

Here is an example of using SentryJobLayer which is available with the sentry feature flag.

Sentry example

Using our example, we will add a few layers in-built layers:

Tracing

This will allow easy tracing of the job futures using tracing

WorkerBuilder::new(storage.clone())
    .layer(TraceLayer::new())
    .build_fn(send_email)
}

And with that our logs become more clear

[INFO  actix_server::builder] Starting 2 workers
[INFO  actix_server::server] Actix runtime found; starting in Actix runtime
[DEBUG apalis_redis::storage] Received new job with id: 1235d5f1.. to list: awesome-app::ForgottenEmail:active
[DEBUG apalis_core::job] service.ready latency=2.0503ms
[DEBUG apalis_core::layers::tracing::make_span] job; job_id="1235d5f1.." current_attempt=1
[DEBUG apalis_core::layers::tracing::on_request] job.start
[DEBUG apalis_core::layers::tracing::on_response] job.done done_in=0ms result=Success

Extensions

One of the reasons why our service was slow might have been that a new client was created for every request. We could drastically improve performance by reusing our client. Let's update our empty send_mail service from the job service step.

[dependencies]
- apalis = { version = "0.3", features=["redis"] }
+ apalis = { version = "0.3", features=["redis", "extensions"] }
#[derive(Clone)]
struct MailProviderClient {
    // ...
}

let client = Arc::new(MailProviderClient::new(config));

async fn send_email(email: ForgottenEmail, ctx: JobContext) -> Result<JobResult, JobError> {
    let client: &Arc<MailProviderClient> = ctx.data_opt().unwrap();
    match client.send_email(email).await {
        Ok(_) => Ok(JobResult::Success),
        Err(e) => Err(JobError::from(e))
    }
}

WorkerBuilder::new(storage.clone())
    .layer(Extension(client.clone()))
    .layer(TraceLayer::new())
    .build_fn(send_email)
}

We could do the same for databases, pools, precompiled templates. This is similar to web::Data in actix or Extensions on axum.

Conclusions

As you can see, we were able to offload the heavy load of processing emails from the actix-web workers. This was just an intro to Apalis, and you should also check out: