Background jobs with Sidekiq
Sidekiq is a popular system for running background and cron jobs in Ruby on Rails apps. Roadster provides built-in support for running background jobs with Sidekiq via the Sidekiq.rs crate, which provides a Rust interface for interacting with a Sidekiq server (e.g., a Redis server).
Below is an example of how to register a worker and enqueue it into the job queue. See
the Sidekiq.rs for more details on implementing
Worker
s.
Service configs
Various properties of the Sidekiq worker service can be configured via the app's config files. The most important fields to configure are the following:
service.sidekiq.num-workers
: The number of Sidekiq workers that can run at the same time.service.sidekiq.queues
: The names of the worker queues to handle.service.sidekiq.redis.uri
: The URI of the Redis database to use as the Sidekiq server.
[service.sidekiq]
num-workers = 2
queues = ["default"]
[service.sidekiq.redis]
# A hard-coded value can be provided to connect to a local server for local development.
# Production values should be provided via a more secure method, such as an environment var
# or an `AsyncSource` that fetches from an external secrets manager.
uri = "redis://localhost:6379"
See the config struct for the full list of fields available.
Worker configs
In addition to the service-level configs, each worker has various configurable values. Some of these can be provided by implementing the respective methods of the sidekiq.rs Worker trait. However, they can also be provided when the worker is registered with the SidekiqWorkerServiceBuilder.
service.register_worker_with_config(
ExampleWorker::new(context),
AppWorkerConfig::builder()
.max_retries(3)
.timeout(true)
.max_duration(Duration::from_secs(30))
.build(),
)?;
Roadster worker
All workers registered with
the SidekiqWorkerServiceBuilder
are wrapped in our
custom RoadsterWorker.
This allows us to implement some additional features for workers. Specifically, the ability to set a max duration for
workers, after which they will automatically timeout, be reported as an error, and be retried according to the
worker's retry config. The default behavior is to timeout after 60
seconds, but this can be extended or disabled at
the service level or in each individual worker.
Note: in order for a worker to stop running when the timeout is exceeded, the worker needs to hit an await
point.
So, it will work great for async IO-bound tasks, but CPU-bound tasks will require manual yields (e.g.
with yield_now) in order for the tasks to be automatically
timed out.
Example
pub struct ExampleWorker {
// If the worker needs access to your app's state, it can be added as a field in the worker.
state: AppContext,
}
impl ExampleWorker {
pub fn new(state: &AppContext) -> Self {
Self {
state: state.clone(),
}
}
}
// Implement the `Worker` trait
#[async_trait]
impl Worker<String> for ExampleWorker {
async fn perform(&self, args: String) -> sidekiq::Result<()> {
info!("Processing job with args: {args}");
Ok(())
}
}
fn build_app() -> RoadsterApp<AppContext> {
RoadsterApp::builder()
// Use the default `AppContext` for this example
.state_provider(|context| Ok(context))
// Register the Sidekiq worker service
.add_service_provider(move |registry, state| {
Box::pin(async move {
registry
.register_builder(
SidekiqWorkerService::builder(state)
.await?
// Register the `ExampleWorker` with the sidekiq service
.register_worker(ExampleWorker::new(state))?
// Optionally register the worker with worker-level config overrides
.register_worker_with_config(
ExampleWorker::new(state),
AppWorkerConfig::builder()
.max_retries(3)
.timeout(true)
.max_duration(Duration::from_secs(30))
.build(),
)?
// Register the `ExampleWorker` to run as a periodic cron job
.register_periodic_worker(
sidekiq::periodic::builder("* * * * * *")?
.name("Example periodic worker"),
ExampleWorker::new(state),
"Periodic example args".to_string(),
)
.await?,
)
.await?;
Ok(())
})
})
.build()
}
async fn example_get(State(state): State<AppContext>) -> RoadsterResult<()> {
// Enqueue the job in your API handler
ExampleWorker::enqueue(&state, "Example".to_string()).await?;
Ok(())
}