Background jobs with Postgres
Roadster provides built-in support for running background jobs using Postgres as the queue backing store. Roadster uses the Rust-only integration provided by pgmq (as opposed to the Postgres extension integration). This means that all Roadster needs in order to provide a Postgres-backed queue is a URI for a standard Postgres DB with no extensions required.
Below is an example of how to register a worker and enqueue it into the job queue.
Service configs
Various properties of the Postgres worker service can be configured via the app's config files. The most important fields to configure are the following:
service.worker.pg.num-workers
: The number of worker tasks that can run at the same time.service.worker.pg.queues
: The names of the worker queues to handle.service.worker.pg.database.uri
: The URI of the Postgres database to use to enqueue jobs. If not provided, will fall back to the value in thedatabase.uri
config field.
[service.worker.worker-config.pg]
success-action = "delete"
failure-action = "archive"
[service.worker.pg]
num-workers = 1
balance-strategy = "round-robin"
[service.worker.pg.queue-config]
[service.worker.pg.queue-fetch-config]
error-delay = 10000
empty-delay = 10000
[service.worker.pg.periodic]
enable = true
stale-cleanup = "auto-clean-stale"
See the config struct for the full list of fields available.
Worker configs
In addition to the service-level configs, each worker has various configurable values that can be provided
by implementing the Worker::worker_config
method. Any configs not provided in this implementation will fall back
to the values provided in the app config.
[service.worker.enqueue-config]
queue = "default"
[service.worker.worker-config]
timeout = true
max-duration = 60000
max-retries = 25
Example
Worker definition
// Implement the `Worker` trait
#[async_trait]
impl Worker<AppContext, String> for ExampleWorker {
type Error = roadster::error::Error;
type Enqueuer = roadster::worker::PgEnqueuer;
// Optionally provide worker-level config overrides
fn worker_config(&self, _state: &AppContext) -> WorkerConfig {
WorkerConfig::builder()
.retry_config(RetryConfig::builder().max_retries(3).build())
.timeout(true)
.max_duration(Duration::from_secs(30))
.build()
}
async fn handle(&self, _state: &AppContext, args: String) -> Result<(), Self::Error> {
info!("Processing job with args: {args}");
Ok(())
}
}
Register the worker with the processor
fn build_app() -> RoadsterApp<AppContext> {
RoadsterApp::builder()
// Use the default `AppContext` for this example
.state_provider(Ok)
// Register the Postgres worker service
.add_service_provider(move |registry, state| {
Box::pin(async move {
let processor = PgProcessor::builder(state)
// Register the `ExampleWorker` with the Postgres worker service
.register(ExampleWorker)?
// Example of registering the `ExampleWorker` to run as a periodic cron job
.register_periodic(
ExampleWorker,
PeriodicArgs::builder()
.args("Periodic example args".to_string())
.schedule(cron::Schedule::from_str("* * * * * *")?)
.build(),
)?
.build()
.await?;
registry
.register_service(PgWorkerService::builder().processor(processor).build())?;
Ok(())
})
})
.build()
}
Enqueue a job for the worker
async fn example_get(State(state): State<AppContext>) -> RoadsterResult<()> {
// Enqueue the job in your API handler
ExampleWorker::enqueue(&state, "Example".to_string()).await?;
Ok(())
}