GitHub - boringnode/queue: A simple and efficient framework-agnostic queue system for Node.js applications
A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.
Installation
npm install @boringnode/queue
Features
- Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
- Type-Safe Jobs: TypeScript classes with typed payloads
- Delayed Jobs: Schedule jobs to run after a delay
- Priority Queues: Process high-priority jobs first
- Bulk Dispatch: Efficiently dispatch thousands of jobs at once
- Job Grouping: Organize related jobs for monitoring
- Job Deduplication: Prevent duplicate jobs with custom IDs
- Retry with Backoff: Exponential, linear, or fixed backoff strategies
- Job Timeout: Fail or retry jobs that exceed a time limit
- Job History: Retain completed/failed jobs for debugging
- Scheduled Jobs: Cron or interval-based recurring jobs
- Auto-Discovery: Automatically register jobs from specified locations
Quick Start
1. Define a Job
import { Job } from '@boringnode/queue' import type { JobOptions } from '@boringnode/queue/types' interface SendEmailPayload { to: string } export default class SendEmailJob extends Job<SendEmailPayload> { static options: JobOptions = { queue: 'email', } async execute(): Promise<void> { console.log(`Sending email to: ${this.payload.to}`) } }
Note
The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.
Warning
If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.
2. Configure the Queue Manager
import { QueueManager } from '@boringnode/queue' import { redis } from '@boringnode/queue/drivers/redis_adapter' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, locations: ['./app/jobs/**/*.ts'], })
3. Dispatch Jobs
// Simple dispatch await SendEmailJob.dispatch({ to: 'user@example.com' }) // With options await SendEmailJob.dispatch({ to: 'user@example.com' }) .toQueue('high-priority') .priority(1) .in('5m')
4. Start a Worker
import { Worker } from '@boringnode/queue' const worker = new Worker(config) await worker.start(['default', 'email'])
Bulk Dispatch
Efficiently dispatch thousands of jobs in a single batch operation:
const { jobIds } = await SendEmailJob.dispatchMany([ { to: 'user1@example.com' }, { to: 'user2@example.com' }, { to: 'user3@example.com' }, ]) .group('newsletter-jan-2025') .toQueue('emails') .priority(3) console.log(`Dispatched ${jobIds.length} jobs`)
This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.
Job Grouping
Organize related jobs together for monitoring and filtering:
// Group newsletter jobs await SendEmailJob.dispatch({ to: 'user@example.com' }).group('newsletter-jan-2025') // Group with bulk dispatch await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')
The groupId is stored with job data and accessible via job.data.groupId.
Job Deduplication
Prevent the same job from being pushed multiple times. Four modes, all via .dedup():
Simple (skip while job exists)
// First dispatch - job is created await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() // Second dispatch with same dedup ID - silently skipped await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
Throttle (skip within TTL window)
// Within 5s, duplicates are skipped. After 5s, a new job is created. await SendEmailJob.dispatch({ to: 'user@example.com' }) .dedup({ id: 'welcome-123', ttl: '5s' }) .run()
Extend (reset TTL on duplicate)
// Each duplicate push resets the TTL timer. await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run()
Debounce (replace payload + reset TTL)
// Within the 2s window, the latest payload overwrites the previous pending job. await SaveDraftJob.dispatch({ content: 'latest draft' }) .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) .run()
Inspecting the outcome
DispatchResult tells you what happened:
const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' }) .dedup({ id: 'draft-42', ttl: '2s', replace: true }) .run() // deduped: 'added' | 'skipped' | 'replaced' | 'extended' // jobId: the UUID of the job (the existing one when deduped)
How it works
- The dedup ID is automatically prefixed with the job name (
SendInvoiceJob::order-123), so different job types can reuse the same key. - The user-supplied
idmust be ≤ 400 characters, and the combined<jobName>::<id>key must be ≤ 510 characters (constrained by the Knex storage column). Both limits are validated at.dedup()time. ttlaccepts a Duration ('5s','1m') or milliseconds, and must be positive when provided. Use0or omitttlif you want no expiry —ttl: 0is rejected to avoid an ambiguous "expired immediately vs no-expiry" interpretation across engines.extendandreplacerequirettl— calling them withoutttlthrows.replaceonly applies to jobs inpendingordelayedstate. Jobs that are active (executing) or retained in history (completed/failedwith retention) are left alone; the dispatch returns{ deduped: 'skipped' }.replaceswaps the payload only — priority, queue, delay, groupId, and stored dedup options of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire.extendresets the TTL clock but never changes the window length. The window length is fixed to thettlfrom the first dispatch that created the dedup slot. Later dispatches that pass a differentttlonly reset the clock; theirttlvalue is ignored. To resize the window, let the slot expire and start over with a new dispatch.extendworks in all states — even when the existing job isactive(executing) or retained in history. Unlikereplace(which is no-op on non-replaceable states),extendalways refreshes the dedup TTL window. Use this when you want the dedup slot to keep blocking new dispatches for the lifetime of a long-running job.extendrequires the first dispatch to have set attl. If the slot was created without attl, laterextenddispatches have no window to refresh and return{ deduped: 'skipped' }instead of'extended'.retryJobdoes not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped.- Atomic and race-free:
- Redis: a single Lua script per dispatch performs the dedup-key lookup, state check (pending/delayed ZSCORE), payload swap, and TTL refresh atomically.
- Knex: transactional
SELECT ... FOR UPDATE+ insert/update inside a transaction. A nested savepoint catches unique-constraint violations under concurrent inserts and returns{ deduped: 'skipped' }pointing at the winner. - SyncAdapter: executes inline, no dedup support.
Caveats
- Without
.dedup(), jobs use auto-generated UUIDs and are never deduplicated. - The Sync adapter ignores
.dedup()entirely — every dispatch executes inline anddedupedis alwaysundefinedon the result. Use Redis or Knex if you need real deduplication. .dedup()is only available on single dispatch.dispatchMany/pushManyOnreject jobs with adedupfield.- Scheduled jobs (
.schedule()) do not support dedup — each cron/interval fire is an independent dispatch. - With no
ttl, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned. - With
ttl, dedup expires after the window — a new job (new UUID) is created. The old job still runs. - Knex MySQL concurrent race: MySQL does not support partial unique indexes, so two
pushOncalls with the same dedup id firing at the exact same instant can both succeed. Serialize at the app layer if strict guarantees are required, or use Postgres / SQLite / Redis (all of which serialize correctly via the partial unique index or Lua atomicity).
Job History & Retention
Keep completed and failed jobs for debugging:
export default class ImportantJob extends Job<Payload> { static options: JobOptions = { // Keep last 1000 completed jobs removeOnComplete: { count: 1000 }, // Keep failed jobs for 7 days removeOnFail: { age: '7d' }, } }
Retention options
| Value | Behavior |
|---|---|
true (default) |
Remove immediately |
false |
Keep forever |
{ count: n } |
Keep last n jobs |
{ age: '7d' } |
Keep for duration |
{ count: 100, age: '1d' } |
Both limits apply |
Query job history:
const job = await adapter.getJob('job-id', 'queue-name') console.log(job.status) // 'completed' | 'failed' console.log(job.finishedAt) // timestamp console.log(job.error) // error message (if failed)
Adapters
Redis (recommended for production)
import { redis } from '@boringnode/queue/drivers/redis_adapter' // With options const adapter = redis({ host: 'localhost', port: 6379 }) // With existing ioredis instance import { Redis } from 'ioredis' const connection = new Redis({ host: 'localhost' }) const adapter = redis(connection)
Knex (PostgreSQL, MySQL, SQLite)
import { knex } from '@boringnode/queue/drivers/knex_adapter' const adapter = knex({ client: 'pg', connection: { host: 'localhost', database: 'myapp' }, })
More Knex examples
// With existing Knex instance import Knex from 'knex' const connection = Knex({ client: 'pg', connection: '...' }) const adapter = knex(connection) // Custom table name const adapter = knex(config, 'custom_jobs_table')
Database setup with QueueSchemaService
The Knex adapter requires tables to be created before use. Use QueueSchemaService to create them:
import { QueueSchemaService } from '@boringnode/queue' import Knex from 'knex' const connection = Knex({ client: 'pg', connection: '...' }) const schemaService = new QueueSchemaService(connection) // Create tables with default names await schemaService.createJobsTable() await schemaService.createSchedulesTable() // Or extend with custom columns await schemaService.createJobsTable('queue_jobs', (table) => { table.string('tenant_id', 255).nullable() })
AdonisJS migration example:
import { BaseSchema } from '@adonisjs/lucid/schema' import { QueueSchemaService } from '@boringnode/queue' export default class extends BaseSchema { async up() { const schemaService = new QueueSchemaService(this.db.connection().getWriteClient()) await schemaService.createJobsTable() await schemaService.createSchedulesTable() } async down() { const schemaService = new QueueSchemaService(this.db.connection().getWriteClient()) await schemaService.dropSchedulesTable() await schemaService.dropJobsTable() } }
Fake (testing + assertions)
import { QueueManager } from '@boringnode/queue' import { redis } from '@boringnode/queue/drivers/redis_adapter' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost' }), }, locations: ['./app/jobs/**/*.ts'], }) // The `using` keyword automatically restores the real adapters when // the variable goes out of scope (at the end of the test function). using fake = QueueManager.fake() await SendEmailJob.dispatch({ to: 'user@example.com' }) fake.assertPushed(SendEmailJob) fake.assertPushed(SendEmailJob, { queue: 'default', payload: (payload) => payload.to === 'user@example.com', }) fake.assertPushedCount(1)
You can also call QueueManager.restore() manually if you need more control over when the real adapters are restored.
Sync (for testing)
import { sync } from '@boringnode/queue/drivers/sync_adapter' const adapter = sync() // Jobs execute immediately
Use the sync adapter for tests and lightweight local development only.
await MyJob.dispatch(payload).run()waits for the job to fully finish.- Retries are executed inline, not by a background worker.
- If you configure backoff, the adapter will
sleepbetween attempts. - This means the caller can stay blocked for the full retry duration.
Example: with maxRetries: 3 and an exponential backoff of 1s, 2s, 4s,
the request or command that dispatched the job can stay busy for about 7 seconds
before the job exhausts its retries and runs failed().
Job Options
export default class MyJob extends Job<Payload> { static options: JobOptions = { queue: 'email', // Queue name (default: 'default') priority: 1, // Lower = higher priority (default: 5) maxRetries: 3, // Retry attempts before failing timeout: '30s', // Max execution time failOnTimeout: true, // Fail permanently on timeout (default: retry) removeOnComplete: { count: 100 }, // Keep last 100 completed removeOnFail: { age: '7d' }, // Keep failed for 7 days } }
Delayed Jobs
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds await SendEmailJob.dispatch(payload).in('5m') // 5 minutes await SendEmailJob.dispatch(payload).in('2h') // 2 hours await SendEmailJob.dispatch(payload).in('1d') // 1 day
Retry & Backoff
import { exponentialBackoff } from '@boringnode/queue' export default class ReliableJob extends Job<Payload> { static options: JobOptions = { maxRetries: 5, retry: { backoff: () => exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2, jitter: true, }), }, } }
maxRetries can be defined directly on the job options, and retry.backoff
controls the delay between attempts.
With the
syncadapter, these delays happen inline in the caller viasleep. If a job fails repeatedly,dispatch().run()will take as long as the total backoff duration. Use a worker-backed adapter when you do not want retries to slow down the request/command that dispatched the job.
Available strategies
import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue' // Exponential: 1s, 2s, 4s, 8s... exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 }) // Linear: 1s, 2s, 3s, 4s... linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 }) // Fixed: 5s, 5s, 5s... fixedBackoff({ baseDelay: '5s', jitter: true })
Job Timeout
export default class LongRunningJob extends Job<Payload> { static options: JobOptions = { timeout: '30s', failOnTimeout: false, // Will retry (default) } async execute(): Promise<void> { for (const item of this.payload.items) { // Check abort signal for graceful timeout handling if (this.signal?.aborted) { throw new Error('Job timed out') } await this.processItem(item) } } }
Job Context
Access execution metadata via this.context:
async execute(): Promise<void> { console.log(this.context.jobId) // Unique job ID console.log(this.context.attempt) // 1, 2, 3... console.log(this.context.queue) // Queue name console.log(this.context.priority) // Priority value console.log(this.context.acquiredAt) // When acquired console.log(this.context.stalledCount) // Stall recoveries }
Scheduled Jobs
Run jobs on a recurring basis:
// Every 10 seconds await MetricsJob.schedule({ endpoint: '/health' }).every('10s') // Cron schedule await CleanupJob.schedule({ days: 30 }) .id('daily-cleanup') .cron('0 0 * * *') // Midnight daily .timezone('Europe/Paris')
Schedule management
import { Schedule } from '@boringnode/queue' // Find and manage const schedule = await Schedule.find('daily-cleanup') await schedule.pause() await schedule.resume() await schedule.trigger() // Run now await schedule.delete() // List schedules const all = await Schedule.list() const active = await Schedule.list({ status: 'active' })
Schedule options:
| Method | Description |
|---|---|
.id(string) |
Unique identifier |
.every(duration) |
Fixed interval ('5s', '1m', '1h') |
.cron(expression) |
Cron schedule |
.timezone(tz) |
Timezone (default: 'UTC') |
.from(date) |
Start boundary |
.to(date) |
End boundary |
.limit(n) |
Maximum runs |
Dependency Injection
Integrate with IoC containers:
await QueueManager.init({ // ... jobFactory: async (JobClass) => { return app.container.make(JobClass) }, })
Example with injected services
export default class SendEmailJob extends Job<SendEmailPayload> { constructor( private mailer: MailerService, private logger: Logger ) { super() } async execute(): Promise<void> { this.logger.info(`Sending email to ${this.payload.to}`) await this.mailer.send(this.payload) } }
Worker Configuration
const config = { worker: { concurrency: 5, // Parallel jobs idleDelay: '2s', // Poll interval when idle timeout: '1m', // Default job timeout stalledThreshold: '30s', // When to consider job stalled stalledInterval: '30s', // How often to check maxStalledCount: 1, // Max recoveries before failing gracefulShutdown: true, // Wait for jobs on SIGTERM }, }
Logging
import { pino } from 'pino' await QueueManager.init({ // ... logger: pino(), })
OpenTelemetry Instrumentation (experimental)
Warning
The OpenTelemetry instrumentation is experimental and its API may change in future releases.
@boringnode/queue ships with built-in OpenTelemetry instrumentation that creates PRODUCER spans for job dispatch and CONSUMER spans for job execution, following OTel messaging semantic conventions.
Quick Setup
import { QueueInstrumentation } from '@boringnode/queue/otel' import * as boringqueue from '@boringnode/queue' const instrumentation = new QueueInstrumentation({ messagingSystem: 'boringqueue', // default executionSpanLinkMode: 'link', // or 'parent' }) instrumentation.enable() instrumentation.manuallyRegister(boringqueue)
The instrumentation patches QueueManager.init() to automatically inject its wrappers — no config changes needed in your queue setup.
Span Attributes
The instrumentation uses standard OTel messaging semantic conventions where they map cleanly, plus a few queue-specific custom attributes.
| Attribute | Kind | Description |
|---|---|---|
messaging.system |
Semconv | 'boringqueue' (configurable) |
messaging.operation.name |
Semconv | 'publish' or 'process' |
messaging.destination.name |
Semconv | Queue name |
messaging.message.id |
Semconv | Job ID for single-message spans |
messaging.batch.message_count |
Semconv | Number of jobs in a batch dispatch |
messaging.message.retry.count |
Custom | Retry count (0-based) for a job attempt |
messaging.job.name |
Custom | Job class name (e.g. SendEmailJob) |
messaging.job.status |
Custom | 'completed', 'failed', or 'retrying' |
messaging.job.group_id |
Custom | Queue-specific group identifier |
messaging.job.priority |
Custom | Queue-specific job priority |
messaging.job.delay_ms |
Custom | Delay before the job becomes available |
messaging.job.queue_time_ms |
Custom | Time spent waiting in queue before processing |
Trace Context Propagation
The instrumentation automatically propagates trace context from dispatch to execution:
- Link mode (default): Each job execution is an independent trace, linked to the dispatch span
- Parent mode: Job execution is a child of the dispatch span (same trace)
Child spans created inside execute() (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span.
diagnostics_channel
Raw telemetry events are available via diagnostics_channel for custom subscribers:
import { tracingChannels } from '@boringnode/queue' const { executeChannel } = tracingChannels executeChannel.subscribe({ start() {}, end() {}, asyncStart() {}, asyncEnd(message) { console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`) }, error() {}, })
Benchmarks
Performance comparison with BullMQ (5ms simulated work per job):
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
|---|---|---|---|---|
| 1000 | 5 | 1096ms | 1116ms | 1.8% faster |
| 1000 | 10 | 565ms | 579ms | 2.4% faster |
| 100K | 10 | 56.2s | 57.5s | 2.1% faster |
| 100K | 20 | 29.1s | 29.6s | 1.7% faster |
npm run benchmark -- --realistic