Background jobs
In virtually all apps, there exists some work that needs to be done outside of the "critical path" in order to provide a quick and responsive experience to the user. For example, in mobile apps, the only work that (should) happen on the main thread is updating the UI. Everything else, such as reading files from disk and fetching data from the server, happens on a background thread.
In web apps (and API servers), the same principle applies. In general, APIs should do the minimal amount of work needed in order to response to the user's (or other service's) API request, and everything else should be moved to some background "process". There are many ways this can be done; for example, AWS SQS, GCP Pub/Sub, Sidekiq, Faktory, pgmq, to name a few.
Roadster provides a Worker
trait to encapsulate common functionality for handling async jobs, and an Enqueuer
trait
to handle enqueueing jobs into the job queue backend. The job queue backend for a worker can be easily changed simply
by changing the Enqueuer
associated type for a Worker
implementation.
Built-in worker backends
Roadster provides built-in support for running async workers using either Postgres (via pgmq) or Redis/Sidekiq (via rusty-sidekiq) as the backing store. See the following chapters for more details on each.
Benchmarks
Roadster has a (small) benchmark suite to compare the worker backends we support. Below is a link to an example run of the benchmark. The number in the benchmark name indicates the number of worker tasks used to handle the jobs.
Example
Pg vs Sidekiq worker definition
Notice that the Worker
implementation is identical for both a Postgres- vs a Sidekiq-backed queue. The only difference
is the Enqueuer
associated type.
#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
type Error = roadster::error::Error;
type Enqueuer = roadster::worker::PgEnqueuer;
async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
info!("Processing job with args: {args}");
Ok(())
}
}
#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
type Error = roadster::error::Error;
type Enqueuer = roadster::worker::SidekiqEnqueuer;
async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
info!("Processing job with args: {args}");
Ok(())
}
}
Pg vs Sidekiq worker registration
Workers need to be registered with a queue processor. The processor should at least be registered with the processor
that matches its Enqueuer
. However, it can also be registered with a processor that's different from its Enqueuer
.
This is useful if a worker's Enqueuer
needs to change -- in this case, it's possible for some jobs to remain
in the old backend after the Enqueuer
was switched. To ensure jobs in the old backend are processed, the worker can
temporarily be registered with both the old and new processors, and once all the jobs in the old backend are completed,
the worker can be removed from the old processor.
The built-in Postgres and Sidekiq processors have the same APIs, so migrating between the two is easy.
// Register the Postgres worker service
.add_service_provider(move |registry, state| {
Box::pin(async move {
let processor = PgProcessor::builder(state)
// Register the `ExampleWorker` with the Postgres worker service
.register(ExampleWorker)?
.build()
.await?;
registry
.register_service(PgWorkerService::builder().processor(processor).build())?;
Ok(())
})
})
// Register the Sidekiq worker service
.add_service_provider(move |registry, state| {
Box::pin(async move {
let processor = SidekiqProcessor::builder(state)
// Register the `ExampleWorker` with the sidekiq service
.register(ExampleWorker)?
.build()
.await?;
registry.register_service(
SidekiqWorkerService::builder().processor(processor).build(),
)?;
Ok(())
})
})
Enqueuing jobs
Enqueueing jobs from the application code is identical between each type of queue backend.
async fn example_get(State(state): State<AppContext>) -> RoadsterResult<()> {
// Enqueue the job in your API handler
ExampleWorker::enqueue(&state, "Example".to_string()).await?;
Ok(())
}