Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Background jobs with Postgres

Roadster provides built-in support for running background jobs using Postgres as the queue backing store. Roadster uses the Rust-only integration provided by pgmq (as opposed to the Postgres extension integration). This means that all Roadster needs in order to provide a Postgres-backed queue is a URI for a standard Postgres DB with no extensions required.

Below is an example of how to register a worker and enqueue it into the job queue.

Service configs

Various properties of the Postgres worker service can be configured via the app's config files. The most important fields to configure are the following:

  • service.worker.pg.num-workers: The number of worker tasks that can run at the same time.
  • service.worker.pg.queues: The names of the worker queues to handle.
  • service.worker.pg.database.uri: The URI of the Postgres database to use to enqueue jobs. If not provided, will fall back to the value in the database.uri config field.
[service.worker.worker-config.pg]
success-action = "delete"
failure-action = "archive"

[service.worker.pg]
num-workers = 1
balance-strategy = "round-robin"

[service.worker.pg.queue-config]

[service.worker.pg.queue-fetch-config]
error-delay = 10000
empty-delay = 10000

[service.worker.pg.periodic]
enable = true
stale-cleanup = "auto-clean-stale"

See the config struct for the full list of fields available.

Worker configs

In addition to the service-level configs, each worker has various configurable values that can be provided by implementing the Worker::worker_config method. Any configs not provided in this implementation will fall back to the values provided in the app config.

[service.worker.enqueue-config]
queue = "default"

[service.worker.worker-config]
timeout = true
max-duration = 60000
max-retries = 25

Example

Worker definition

// Implement the `Worker` trait
#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
    type Error = roadster::error::Error;
    type Enqueuer = roadster::worker::PgEnqueuer;

    // Optionally provide worker-level config overrides
    fn worker_config(&self, _state: &AppContext) -> WorkerConfig {
        WorkerConfig::builder()
            .retry_config(RetryConfig::builder().max_retries(3).build())
            .timeout(true)
            .max_duration(Duration::from_secs(30))
            .build()
    }

    async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
        info!("Processing job with args: {args}");
        Ok(())
    }
}

Register the worker with the processor

fn build_app() -> RoadsterApp<AppContext> {
    RoadsterApp::builder()
        // Use the default `AppContext` for this example
        .state_provider(Ok)
        // Register the Postgres worker service
        .add_service_provider(move |registry, state| {
            Box::pin(async move {
                let processor = PgProcessor::builder(state)
                    // Register the `ExampleWorker` with the Postgres worker service
                    .register(ExampleWorker)?
                    // Example of registering the `ExampleWorker` to run as a periodic cron job
                    .register_periodic(
                        ExampleWorker,
                        PeriodicArgs::builder()
                            .args("Periodic example args".to_string())
                            .schedule(cron::Schedule::from_str("* * * * * *")?)
                            .build(),
                    )?
                    .build()
                    .await?;
                registry
                    .register_service(PgWorkerService::builder().processor(processor).build())?;
                Ok(())
            })
        })
        .build()
}

Enqueue a job for the worker

async fn example_get(State(state): State<AppContext>) -> RoadsterResult<()> {
    // Enqueue the job in your API handler
    ExampleWorker::enqueue(&state, "Example".to_string()).await?;

    Ok(())
}