Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Background jobs

In virtually all apps, there exists some work that needs to be done outside of the "critical path" in order to provide a quick and responsive experience to the user. For example, in mobile apps, the only work that (should) happen on the main thread is updating the UI. Everything else, such as reading files from disk and fetching data from the server, happens on a background thread.

In web apps (and API servers), the same principle applies. In general, APIs should do the minimal amount of work needed in order to response to the user's (or other service's) API request, and everything else should be moved to some background "process". There are many ways this can be done; for example, AWS SQS, GCP Pub/Sub, Sidekiq, Faktory, pgmq, to name a few.

Roadster provides a Worker trait to encapsulate common functionality for handling async jobs, and an Enqueuer trait to handle enqueueing jobs into the job queue backend. The job queue backend for a worker can be easily changed simply by changing the Enqueuer associated type for a Worker implementation.

Built-in worker backends

Roadster provides built-in support for running async workers using either Postgres (via pgmq) or Redis/Sidekiq (via rusty-sidekiq) as the backing store. See the following chapters for more details on each.

Benchmarks

Roadster has a (small) benchmark suite to compare the worker backends we support. Below is a link to an example run of the benchmark. The number in the benchmark name indicates the number of worker tasks used to handle the jobs.

Example

Pg vs Sidekiq worker definition

Notice that the Worker implementation is identical for both a Postgres- vs a Sidekiq-backed queue. The only difference is the Enqueuer associated type.

#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
    type Error = roadster::error::Error;
    type Enqueuer = roadster::worker::PgEnqueuer;

    async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
        info!("Processing job with args: {args}");
        Ok(())
    }
}
#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
    type Error = roadster::error::Error;
    type Enqueuer = roadster::worker::SidekiqEnqueuer;

    async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
        info!("Processing job with args: {args}");
        Ok(())
    }
}

Pg vs Sidekiq worker registration

Workers need to be registered with a queue processor. The processor should at least be registered with the processor that matches its Enqueuer. However, it can also be registered with a processor that's different from its Enqueuer. This is useful if a worker's Enqueuer needs to change -- in this case, it's possible for some jobs to remain in the old backend after the Enqueuer was switched. To ensure jobs in the old backend are processed, the worker can temporarily be registered with both the old and new processors, and once all the jobs in the old backend are completed, the worker can be removed from the old processor.

The built-in Postgres and Sidekiq processors have the same APIs, so migrating between the two is easy.

        // Register the Postgres worker service
        .add_service_provider(move |registry, state| {
            Box::pin(async move {
                let processor = PgProcessor::builder(state)
                    // Register the `ExampleWorker` with the Postgres worker service
                    .register(ExampleWorker)?
                    .build()
                    .await?;
                registry
                    .register_service(PgWorkerService::builder().processor(processor).build())?;
                Ok(())
            })
        })
        // Register the Sidekiq worker service
        .add_service_provider(move |registry, state| {
            Box::pin(async move {
                let processor = SidekiqProcessor::builder(state)
                    // Register the `ExampleWorker` with the sidekiq service
                    .register(ExampleWorker)?
                    .build()
                    .await?;
                registry.register_service(
                    SidekiqWorkerService::builder().processor(processor).build(),
                )?;
                Ok(())
            })
        })

Enqueuing jobs

Enqueueing jobs from the application code is identical between each type of queue backend.

async fn example_get(State(state): State<AppContext>) -> RoadsterResult<()> {
    // Enqueue the job in your API handler
    ExampleWorker::enqueue(&state, "Example".to_string()).await?;

    Ok(())
}