- Published on
Background job processing with rust using actix and redis
- Authors
- Name
- Njuguna Mureithi
- @tweetofnjuguna
Overview
This article is based on Apalis
, a library I have been working on. Apalis allows you to asyncronously launch jobs in the background and should work in any tokio environment.
- Motivation
- Getting Started
- Define the job type
- Define the job executor
- Define the job worker
- Expose endpoint to the web
- Middleware
- Tracing
- Extensions
- Conclusions
Motivation
Imagine you are working on your awesome rust project using your favourite web framework. Things are going well but you notice something interesting. The request POST /account/forgot-password
is slower than you would expect. Upon investigation, you realize (Oh no!) this is being caused by several issues:
- Fetch email assets
- Fetch email data
- Process email template
- Inline email css
- Execute request to mail provider
A user requesting a password shouldn't have to wait this long to get a response. This is where Apalis comes in.
Our goal is to:
- Receive requests and respond instantly
- Move our forgot password email process to the background
We are going to use:
actix-web
as web server.apalis
as worker.redis
as storage.
Getting Started
This tutorial assumes that you have Rust installed. It also assumes you have a running redis instance.
Lets start our new project:
cargo new awesome-app
cd awesome-app
You can now add the dependencies by adding the code below to Cargo.toml
[dependencies]
apalis = { version = "0.3", features=["redis"] }
serde = "1"
actix-rt = "2"
actix-web = "4"
futures = "0.3"
env_logger = "0.7"
Define the job type
In our main.rs
, add
use apalis::Job;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct ForgottenEmail {
pub email: String,
}
impl Job for ForgottenEmail {
const NAME: &'static str = "awesome-app::ForgottenEmail";
}
Define the job executor
Apalis expects a tower Service<T>
and we are going to build one using JobFn
which requires async Fn(T, JobContext)-> Result<JobResult, JobError>
:
use apalis::{JobContext, JobError, JobResult};
async fn send_email(email: ForgottenEmail, ctx: JobContext) -> Result<JobResult, JobError> {
Ok(JobResult::Success)
}
The above function does nothing and for now lets leave it like that.
Define the job worker
Apalis has an inbuilt worker for a Storage
like Redis and Postgres called StorageWorker
. We can use WorkerBuilder
to create a factory of StorageWorker
workers that will consume our jobs.
use apalis::{WorkerBuilder, WorkerFactoryFn, Monitor};
use apalis::redis::RedisStorage;
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
let storage = RedisStorage::connect(redis_url).await.expect("Could not connect to redis storage");
let worker = Monitor::new()
.register_with_count(2, move |_| {
WorkerBuilder::new(storage.clone())
.build_fn(send_email)
})
.run();
worker.await
}
This connects to Redis starting two monitored workers that are listening for new emails.
Expose endpoint to the web
In a worker scenario this would be it, but lets add a web service that receives requests and adds them to redis as jobs. Here actix-web
comes in handy.
Here is our route handler:
async fn forgotten_email_endpoint(
email: web::Json<ForgottenEmail>,
storage: web::Data<RedisStorage<ForgottenEmail>>,
) -> HttpResponse {
let storage = &*storage.into_inner();
let mut storage = storage.clone();
let res = storage.push(email.into_inner()).await;
match res {
Ok(()) => HttpResponse::Ok().body(format!("ForgottenEmail added to queue")),
Err(e) => HttpResponse::InternalServerError().body(format!("{}", e)),
}
}
Here is an actix-web
example.
NOTE: This example is standalone and can be run without a worker in the current scope. Then you can run scalable heroku
style workers connected to the same redis instance.
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
let storage = RedisStorage::connect(redis_url).await.expect("Could not connect to redis storage");
let data = web::Data::new(storage.clone());
let http = HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(web::scope("/account").route("/forgot-password", web::post().to(forgotten_email_endpoint)))
})
.bind("127.0.0.1:8000")?
.run();
http.await
}
We can now combine these two to have a final main
entry point using the futures
crate, and setup some basic logging.
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let redis_url = std::env::var("REDIS_URL").expect("Please include REDIS_URL in env");
let storage = RedisStorage::connect(redis_url)
.await
.expect("Could not connect to redis storage");
let data = web::Data::new(storage.clone());
let http = HttpServer::new(move || {
App::new().app_data(data.clone()).service(
web::scope("/account")
.route("/forgot-password", web::post().to(forgotten_email_endpoint)),
)
})
.bind("127.0.0.1:8000")?
.run();
let worker = Monitor::new()
.register_with_count(2, move |_| {
WorkerBuilder::new(storage.clone()).build_fn(send_email)
})
.run();
future::try_join(http, worker).await?;
Ok(())
}
Now we can run
REDIS_URL=redis://127.0.0.1/ cargo run
Test our endpoint via curl
curl -X POST \
http://localhost:8000/account/forgot-password \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"email": "test@gmail.com"
}'
You should get back a message: "ForgottenEmail added to queue"
.
Looking at logs, you should see:
[INFO actix_server::builder] Starting 2 workers
[INFO actix_server::server] Actix runtime found; starting in Actix runtime
[DEBUG apalis_redis::storage] Received new job with id: [omited] to list: awesome-app::ForgottenEmail:active
[DEBUG apalis_core::job] service.ready latency=1.698724ms
We can see our job was added but we need more visibility. To do this we are going to use layers also called 'Middleware'.
Middleware
Since Apalis job execution is powered by tower
then we can use most utilities in the tower ecosystem such as:
- rate limit jobs
- load shed requests
- filter jobs
Here is an example of using SentryJobLayer
which is available with the sentry
feature flag.
Using our example, we will add a few layers in-built layers:
Tracing
This will allow easy tracing of the job futures using tracing
WorkerBuilder::new(storage.clone())
.layer(TraceLayer::new())
.build_fn(send_email)
}
And with that our logs become more clear
[INFO actix_server::builder] Starting 2 workers
[INFO actix_server::server] Actix runtime found; starting in Actix runtime
[DEBUG apalis_redis::storage] Received new job with id: 1235d5f1.. to list: awesome-app::ForgottenEmail:active
[DEBUG apalis_core::job] service.ready latency=2.0503ms
[DEBUG apalis_core::layers::tracing::make_span] job; job_id="1235d5f1.." current_attempt=1
[DEBUG apalis_core::layers::tracing::on_request] job.start
[DEBUG apalis_core::layers::tracing::on_response] job.done done_in=0ms result=Success
Extensions
One of the reasons why our service was slow might have been that a new client was created for every request. We could drastically improve performance by reusing our client
. Let's update our empty send_mail
service from the job service step.
[dependencies]
- apalis = { version = "0.3", features=["redis"] }
+ apalis = { version = "0.3", features=["redis", "extensions"] }
#[derive(Clone)]
struct MailProviderClient {
// ...
}
let client = Arc::new(MailProviderClient::new(config));
async fn send_email(email: ForgottenEmail, ctx: JobContext) -> Result<JobResult, JobError> {
let client: &Arc<MailProviderClient> = ctx.data_opt().unwrap();
match client.send_email(email).await {
Ok(_) => Ok(JobResult::Success),
Err(e) => Err(JobError::from(e))
}
}
WorkerBuilder::new(storage.clone())
.layer(Extension(client.clone()))
.layer(TraceLayer::new())
.build_fn(send_email)
}
We could do the same for databases, pools, precompiled templates. This is similar to web::Data
in actix or Extensions
on axum.
Conclusions
As you can see, we were able to offload the heavy load of processing emails from the actix-web
workers. This was just an intro to Apalis, and you should also check out:
- Code for this example
- Cargo Docs
- Github Repo Consider giving Apalis a like.
- Many More Examples - including postgres, sentry and prometheus