A robust, MongoDB-backed background job processing library for Node.js with TypeScript support
npm install @clipboard-health/mongo-jobs
Job handlers implement the HandlerInterface and define how your jobs are processed:
import type { HandlerInterface } from "@clipboard-health/mongo-jobs";
export interface WelcomeEmailData {
userId: string;
email: string;
}
export class WelcomeEmailJob implements HandlerInterface<WelcomeEmailData> {
public name = "WelcomeEmailJob";
public maxAttempts = 3;
async perform({ userId, email }: WelcomeEmailData) {
await this.sendEmail(email, `Welcome, user ${userId}!`);
}
private async sendEmail(_to: string, _message: string) {
// Email sending logic
}
}
Create a BackgroundJobs instance and register your handlers to groups:
import { BackgroundJobs } from "@clipboard-health/mongo-jobs";
import { WelcomeEmailJob } from "./welcomeEmailJob";
const backgroundJobs = new BackgroundJobs();
backgroundJobs.register(WelcomeEmailJob, "emails");
export { backgroundJobs };
Add jobs to the queue to be processed:
import { backgroundJobs } from "./jobsRegistry";
import { WelcomeEmailJob } from "./welcomeEmailJob";
await backgroundJobs.enqueue(WelcomeEmailJob, {
userId: "123",
email: "user@example.com",
});
Start processing jobs from the queue:
import { backgroundJobs } from "./jobsRegistry";
await backgroundJobs.start(["emails"], {
maxConcurrency: 10,
});
Jobs are defined as classes that implement the HandlerInterface:
import type { BackgroundJobType, HandlerInterface } from "@clipboard-health/mongo-jobs";
export interface MyJobData {
userId: string;
action: string;
}
export class MyJob implements HandlerInterface<MyJobData> {
// Required: unique name for this job type
public name = "MyJob";
// Optional: max retry attempts (default: 10)
public maxAttempts = 5;
// Required: the actual job logic
async perform(data: MyJobData, job?: BackgroundJobType<MyJobData>) {
// Job implementation
console.log(`Processing ${data.action} for user ${data.userId}`);
// Optional: access job metadata
if (job) {
console.log(`Job ID: ${job._id.toString()}`);
console.log(`Attempt: ${job.attemptsCount}`);
}
}
}
name (required): Unique identifier for the job typemaxAttempts (optional): Maximum number of retry attempts before marking the job as failed. Default is 10. Uses exponential backoff: 2^attempt seconds between retriesperform (required): Async function that executes the job logic
data: The job payload passed when enqueueingjob: Optional metadata about the job execution (id, attempts, timestamps, etc.)Register job handlers with the BackgroundJobs instance and assign them to processing groups:
import { BackgroundJobs } from "@clipboard-health/mongo-jobs";
import { CleanupJob } from "./jobs/cleanupJob";
import { EmailJob } from "./jobs/emailJob";
import { ReportJob } from "./jobs/reportJob";
import { SmsJob } from "./jobs/smsJob";
const backgroundJobs = new BackgroundJobs();
// Register jobs to groups
backgroundJobs.register(EmailJob, "notifications");
backgroundJobs.register(ReportJob, "reports");
backgroundJobs.register(CleanupJob, "maintenance");
// You can register multiple jobs to the same group
backgroundJobs.register(SmsJob, "notifications");
Groups allow you to:
If your job requires dependencies (like services, database connections, etc.) passed through the constructor, you must register an instance instead of the class:
import { BackgroundJobs } from "@clipboard-health/mongo-jobs";
import { EmailServiceJob } from "./jobs/emailServiceJob";
const backgroundJobs = new BackgroundJobs();
// For jobs with constructor dependencies, register an instance
const emailService = {
async send(to: string, subject: string, body: string) {
console.log(`Sending email to ${to}: ${subject} : ${body}`);
},
};
backgroundJobs.register(new EmailServiceJob(emailService), "notifications");
Example job with dependencies:
import type { HandlerInterface } from "@clipboard-health/mongo-jobs";
interface EmailService {
send(to: string, subject: string, body: string): Promise<void>;
}
export interface EmailServiceJobData {
to: string;
subject: string;
body: string;
}
export class EmailServiceJob implements HandlerInterface<EmailServiceJobData> {
public name = "EmailServiceJob";
public maxAttempts = 3;
constructor(private readonly emailService: EmailService) {}
async perform({ to, subject, body }: EmailServiceJobData) {
await this.emailService.send(to, subject, body);
}
}
Important: When registering job instances, the library will use the instance directly rather than instantiating the class. This means:
data parameter)Note: Even when registering an instance, you can still enqueue jobs using the class, instance, or handler name:
// All of these work, regardless of whether you registered a class or instance
await backgroundJobs.enqueue(EmailServiceJob, data); // By class
await backgroundJobs.enqueue(emailServiceJobInstance, data); // By instance
await backgroundJobs.enqueue("EmailServiceJob", data); // By name
The enqueued class/instance/name is only used to look up the registered handler. The registered instance is always used for execution, not the instance passed to enqueue().
Add jobs to the queue for processing:
import { backgroundJobs } from "./jobsRegistry";
import { MyJob } from "./myJob";
// Basic enqueue
await backgroundJobs.enqueue(MyJob, {
userId: "123",
action: "process",
});
import type { ClientSession } from "mongodb";
import { backgroundJobs } from "./jobsRegistry";
import { MyJob } from "./myJob";
declare const mongoSession: ClientSession;
// Enqueue with options
await backgroundJobs.enqueue(
MyJob,
{ userId: "123", action: "process" },
{
// Schedule for later
startAt: new Date("2024-12-31T23:59:59Z"),
// Ensure uniqueness (see uniqueness section below)
unique: "user-123-process",
// Use within a MongoDB transaction
session: mongoSession,
},
);
import { backgroundJobs } from "./jobsRegistry";
// Enqueue by job name (when handler is already registered)
await backgroundJobs.enqueue("MyJob", { userId: "123", action: "process" });
startAt: Schedule the job to run at a specific time. Default is immediateunique: Ensure only one instance of the job exists (see Job uniqueness section)session: MongoDB session for transactional job creationStart processing jobs from one or more groups:
import { backgroundJobs } from "./jobsRegistry";
// Start a worker for specific groups
await backgroundJobs.start(["notifications", "reports"], {
maxConcurrency: 20,
});
import { backgroundJobs } from "./jobsRegistry";
// Start with all available options
await backgroundJobs.start(["notifications"], {
// Maximum concurrent jobs (default: 10)
maxConcurrency: 10,
// Time to wait when no jobs available, in ms (default: 10000)
newJobCheckWaitMS: 5000,
// Use MongoDB change streams for instant job detection (default: true)
useChangeStream: true,
// Lock timeout for stuck jobs, in ms (default: 600000 = 10 minutes)
lockTimeoutMS: 300_000,
// Interval to check for stuck jobs, in ms (default: 60000 = 1 minute)
unlockJobsIntervalMS: 30_000,
// Interval to refresh queue list, in ms (default: 30000 = 30 seconds)
refreshQueuesIntervalMS: 60_000,
// Exclude specific queues from processing
exclude: ["low-priority-queue"],
});
import { backgroundJobs } from "./jobsRegistry";
// Graceful shutdown
await backgroundJobs.stop(30_000); // Wait up to 30 seconds for jobs to complete
maxConcurrency: Number of jobs to process simultaneouslyuseChangeStream: Enable instant job detection using MongoDB change streams. When true, workers are notified immediately when new jobs are addednewJobCheckWaitMS: Fallback polling interval when no jobs are availablelockTimeoutMS: Maximum time a job can be locked before being considered stuckunlockJobsIntervalMS: How often to check for and unlock stuck jobsrefreshQueuesIntervalMS: How often to refresh the list of queues to consumeexclude: Array of queue names to skip processingSchedule recurring jobs using cron expressions:
import { BackgroundJobs } from "@clipboard-health/mongo-jobs";
import { DailyReportJob } from "./jobs/dailyReportJob";
const backgroundJobs = new BackgroundJobs();
// Register a cron job
await backgroundJobs.registerCron(DailyReportJob, {
// Group assignment (same as regular registration)
group: "reports",
// Unique name for this schedule
scheduleName: "daily-report",
// Cron expression (standard 5-field format)
cronExpression: "0 9 * * *", // Every day at 9 AM
// Optional: timezone for cron evaluation (default: "utc")
timeZone: "America/New_York",
// Data to pass to each job execution
data: { reportType: "daily" },
});
import { backgroundJobs } from "./jobsRegistry";
// Remove a cron schedule and its pending jobs
await backgroundJobs.removeCron("daily-report");
minute hour day month weekdaymaxAttempts, but the next scheduled job will still be enqueuedImportant: When you register a cron schedule, it is persisted in the database. Even if you remove the schedule registration from your code, it will continue executing. To stop a cron schedule, you must explicitly remove it using the removeCron API:
await backgroundJobs.removeCron("daily-report");
This will:
Prevent duplicate jobs from being enqueued or running simultaneously:
import { ProcessUserJob } from "./jobs/processUserJob";
import { backgroundJobs } from "./jobsRegistry";
// Simple uniqueness - single unique key for both enqueued and running
await backgroundJobs.enqueue(
ProcessUserJob,
{ userId: "123" },
{
unique: "process-user-123",
},
);
It's possible to have separate enqueued and running key. When the job is enqueued, the library will ensure that we can't enqueue another one but once it starts running it switches to its running key so we can enqueue another one that will wait to be executed until the first one finishes.
An example where this can be useful is recalculating some kind of a cache. We don't want to enqueue more than one non-running job to not explode number of enqueued jobs. But once it starts running and there is another trigger that may warrant cache recalculation we want to schedule another one to do another recalculation even if there is one running, cause we don't know if the current recalculation will include the newest change.
import { ProcessUserJob } from "./jobs/processUserJob";
import { backgroundJobs } from "./jobsRegistry";
// Advanced uniqueness - separate keys for enqueued vs running states
await backgroundJobs.enqueue(
ProcessUserJob,
{ userId: "123" },
{
unique: {
// Only one enqueued job per user
enqueuedKey: "process-user-123",
// Only one running job per user
runningKey: "process-user-123-running",
},
},
);
import { SendEmailJob } from "./jobs/sendEmailJob";
import { backgroundJobs } from "./jobsRegistry";
// Example: Allow multiple enqueued but only one running
await backgroundJobs.enqueue(
SendEmailJob,
{ userId: "123", emailType: "welcome" },
{
unique: {
enqueuedKey: undefined, // Allow multiple enqueued emails
runningKey: "send-email-123", // But only one sending at a time
},
},
);
enqueuedKey already exists and hasn't started, the new enqueue returns undefinedenqueuedKey to runningKey. This prevents multiple instances from running simultaneouslyundefineduniqueKey fieldThe library automatically reports metrics using StatsD by default. Metrics are reported every 60 seconds for each queue and include:
background_jobs.queue.scheduled - Number of jobs scheduled for future executionbackground_jobs.queue.pending - Number of jobs ready to be processedbackground_jobs.queue.created - Total jobs (scheduled + pending)background_jobs.queue.failed - Number of jobs that exhausted all retry attemptsbackground_jobs.queue.retry - Counter incremented when a job is retriedbackground_jobs.queue.expired - Counter incremented when a job lock expires (stuck jobs)background_jobs.queue.delay - Timing metric for execution delay (time between nextRunAt and actual execution)All metrics are tagged with queue to identify which queue the metric belongs to.
You can provide a custom metrics reporter by implementing the MetricsReporter interface:
import { BackgroundJobs, type MetricsReporter } from "@clipboard-health/mongo-jobs";
class CustomMetricsReporter implements MetricsReporter {
gauge(name: string, value: number, tags: Record<string, string>): void {
// Report gauge metric
console.log(`Gauge: ${name} = ${value}`, tags);
}
increment(name: string, tags: Record<string, string>): void {
// Report counter increment
console.log(`Increment: ${name}`, tags);
}
timing(name: string, value: number | Date, tags: Record<string, string>): void {
// Report timing metric
console.log(`Timing: ${name} = ${value}`, tags);
}
}
const backgroundJobs = new BackgroundJobs({
metricsReporter: new CustomMetricsReporter(),
});
The default metrics reporter uses the hot-shots StatsD client. You can configure it by passing options:
import { BackgroundJobs, defaultMetricsReporter } from "@clipboard-health/mongo-jobs";
const backgroundJobs = new BackgroundJobs({
metricsReporter: defaultMetricsReporter({
host: "localhost",
port: 8125,
globalTags: { env: "production" },
}),
});
The library provides built-in OpenTelemetry distributed tracing support. Traces are automatically created for job enqueueing (producer) and execution (consumer), allowing you to track jobs across your distributed system.
Three types of spans are created:
Producer spans (background-jobs.producer) - Created when a job is enqueued
PRODUCERConsumer spans (background-jobs.consumer) - Created when a job is executed
CONSUMERInternal spans (background-jobs.internals) - Created for internal operations
INTERNALTo enable tracing, configure the OpenTelemetry SDK in your application:
import { NodeSDK } from "@opentelemetry/sdk-node";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter({
url: "http://localhost:4318/v1/traces",
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
When a job is enqueued, trace context is automatically injected into the job data via the _traceHeaders field. When the job is executed, this context is extracted to link the consumer span to the producer span, enabling end-to-end trace visibility.
HTTP Request → Enqueue Job (Producer Span)
↓
[Job in Queue]
↓
Execute Job (Consumer Span) → Your Handler
MIT