Example app
A complete, runnable NestJS app using @eddyq/nestjs lives at examples/nestjs-basic/.
It demonstrates the patterns every queue-using app needs:
- A feature module declares its queue with
registerQueue, a controller enqueues via@InjectQueue, and a processor handles it (theemail/module) - A queue-scoped cron schedule fires a job that the same module's processor handles (the
reports/module) - Fan-in batches with an
onCompletecallback (thereports/run-shardsendpoint)
The app also splits API and worker entry points so they can scale independently.
Project layout
src/
├── main.ts # API entry (NestFactory.create)
├── worker.ts # Worker entry (createApplicationContext)
├── app.module.ts # API root — autoStart: false
├── workers.module.ts # Worker root — runs the job runtime
│
├── email/
│ ├── email.module.ts
│ ├── email.controller.ts # POST /email/send → queue.enqueue("send.email", ...)
│ └── email.processor.ts # @JobHandler("send.email")
│
└── reports/
├── reports.module.ts
├── reports.controller.ts # POST /reports/run-shards → queue.enqueueBatch(...)
└── reports.processor.ts # @JobHandler("report.generate" | "report.shard" | "report.summary")API composition root
autoStart: false keeps the worker runtime off in the API process — API pods enqueue and serve admin reads, worker pods actually process jobs.
import { Module } from "@nestjs/common";
import { EddyqWakeboardModule } from "@eddyq/wakeboard";
import { EddyqModule } from "@eddyq/nestjs";
import { EmailModule } from "./email/email.module.js";
import { ReportsModule } from "./reports/reports.module.js";
const DEFAULT_DATABASE_URL =
"postgres://eddyq:eddyq@localhost:5433/eddyq_dev?options=-c%20search_path%3Dv01";
/**
* API composition root. Imports feature modules so producers (controllers)
* are available, but `autoStart: false` keeps the worker runtime off — API
* pods enqueue and serve admin reads; worker pods (see workers.module.ts)
* actually process jobs.
*/
@Module({
imports: [
EddyqModule.forRoot({
databaseUrl: process.env.EDDYQ_DATABASE_URL ?? DEFAULT_DATABASE_URL,
autoStart: false,
}),
EddyqWakeboardModule.forRoot({
mountPath: "/wakeboard",
auth: { password: process.env.WAKEBOARD_PASSWORD ?? "admin" },
}),
EmailModule,
ReportsModule,
],
})
export class AppModule {}Worker composition root
createApplicationContext boots the DI container without an HTTP listener — exactly what a worker pod needs.
import "reflect-metadata";
import { Logger } from "@nestjs/common";
import { NestFactory } from "@nestjs/core";
import { WorkersModule } from "./workers.module.js";
/**
* Worker entry. `createApplicationContext` boots the DI container without
* an HTTP listener — that's what a worker pod needs.
*/
async function bootstrap(): Promise<void> {
const app = await NestFactory.createApplicationContext(WorkersModule, {
logger: ["log", "warn", "error"],
});
app.enableShutdownHooks();
Logger.log("worker runtime started", "Bootstrap");
}
void bootstrap();Declaring a queue in a feature module
EddyqModule.registerQueue lives in the feature module that owns the queue. Per-queue defaults (and schedules, groups) ride along here — no central queue registry.
import { Module } from "@nestjs/common";
import { EddyqModule } from "@eddyq/nestjs";
import { EmailController } from "./email.controller.js";
import { EmailProcessor } from "./email.processor.js";
@Module({
imports: [
EddyqModule.registerQueue({
name: "email",
defaults: { maxAttempts: 5 },
}),
],
controllers: [EmailController],
providers: [EmailProcessor],
})
export class EmailModule {}Enqueueing from a controller
@InjectQueue('email') returns a QueueHandle pre-bound to the queue name + its defaults. Use uniqueKey to dedupe replays. enqueueMany is one round-trip for an entire batch.
import { Body, Controller, Post } from "@nestjs/common";
import { InjectQueue, type QueueHandle } from "@eddyq/nestjs";
interface SendEmailBody {
to: string;
subject: string;
}
interface SendBulkBody {
messages: SendEmailBody[];
}
@Controller("email")
export class EmailController {
constructor(@InjectQueue("email") private readonly queue: QueueHandle) {}
@Post("send")
async send(@Body() body: SendEmailBody): Promise<{ jobId: number | undefined }> {
const r = await this.queue.enqueue("send.email", body, {
uniqueKey: `email:${body.to}:${Date.now()}`,
});
return { jobId: r.id };
}
/**
* Bulk enqueue. One Postgres round-trip for the whole batch — a 500-message
* payload takes roughly the same time as a 50-message payload. Per-message
* `uniqueKey` still deduplicates against the existing queue, so resubmitting
* the same list is safe.
*/
@Post("send-bulk")
async sendBulk(
@Body() body: SendBulkBody,
): Promise<{ inserted: number; skipped: number }> {
const stamp = Date.now();
const r = await this.queue.enqueueMany(
body.messages.map((m) => ({
kind: "send.email",
payload: m,
uniqueKey: `email:${m.to}:${stamp}`,
})),
);
return { inserted: Number(r.inserted), skipped: Number(r.skipped) };
}
}Handling jobs
@Processor() marks the class. @JobHandler('kind') binds methods to job kinds. Throw any error to retry with exponential backoff; throw CancelError to give up; throw RetryError for an explicit delay.
import { Logger } from "@nestjs/common";
import { JobHandler, Processor, type JobCall } from "@eddyq/nestjs";
@Processor()
export class EmailProcessor {
private readonly logger = new Logger(EmailProcessor.name);
@JobHandler("send.email")
async send({ payload, id }: JobCall): Promise<void> {
const { to, subject } = payload as { to: string; subject: string };
this.logger.log(`send.email #${id}: to=${to} subject="${subject}"`);
// In a real app: await sendgrid.send({ to, subject, ... })
// Throw any Error to retry with exponential backoff.
}
}Cron-scheduled jobs
Declare schedules on the owning queue via registerQueue({ schedules }) — eddyq inserts the job at the right time, and your processor handles it like any other. The schedule's queue defaults to the enclosing registerQueue name.
import { Module } from "@nestjs/common";
import { EddyqModule } from "@eddyq/nestjs";
import { ReportsController } from "./reports.controller.js";
import { ReportsProcessor } from "./reports.processor.js";
@Module({
imports: [
EddyqModule.registerQueue({
name: "reports",
defaults: { priority: 5 },
schedules: [
{
// every day at 08:00:00 UTC (sec min hour dom month dow)
name: "daily-report",
cronExpr: "0 0 8 * * *",
kind: "report.generate",
payload: { scope: "daily" },
priority: 5,
// queue defaults to the enclosing registerQueue's name ("reports")
},
],
}),
],
controllers: [ReportsController],
providers: [ReportsProcessor],
})
export class ReportsModule {}import { Logger } from "@nestjs/common";
import { JobHandler, Processor, type JobCall } from "@eddyq/nestjs";
interface BatchEnvelope {
batchId: number;
total: number;
completed: number;
failed: number;
cancelled: number;
durationMs: number;
}
@Processor()
export class ReportsProcessor {
private readonly logger = new Logger(ReportsProcessor.name);
@JobHandler("report.generate")
async generate({ payload, id }: JobCall): Promise<void> {
this.logger.log(`report.generate #${id}: ${JSON.stringify(payload)}`);
// In a real app: build the report, write it somewhere, notify stakeholders.
}
@JobHandler("report.shard")
async shard({ payload, id }: JobCall): Promise<void> {
const { scope, shard } = payload as { scope: string; shard: number };
this.logger.log(`report.shard #${id}: scope=${scope} shard=${shard}`);
// In a real app: process this slice of the report.
}
/**
* Fires once per batch run — see `reports.controller.ts`. The `_eddyq_batch`
* envelope is injected by eddyq alongside the user payload; everything else
* in `payload` is whatever you set on `onComplete`.
*/
@JobHandler("report.summary")
async summary({ payload, id }: JobCall): Promise<void> {
const { _eddyq_batch, ...user } = payload as {
_eddyq_batch: BatchEnvelope;
[k: string]: unknown;
};
if (_eddyq_batch.failed > 0 || _eddyq_batch.cancelled > 0) {
this.logger.warn(
`report.summary #${id}: scope=${(user as { scope?: string }).scope} ` +
`${_eddyq_batch.completed}/${_eddyq_batch.total} ok ` +
`(${_eddyq_batch.failed} failed, ${_eddyq_batch.cancelled} cancelled) ` +
`in ${_eddyq_batch.durationMs}ms`,
);
return;
}
this.logger.log(
`report.summary #${id}: scope=${(user as { scope?: string }).scope} ` +
`all ${_eddyq_batch.total} shards complete in ${_eddyq_batch.durationMs}ms`,
);
}
}Batches with a fan-in callback
enqueueBatch enqueues N items and fires a single callback when every item terminates. The reports controller injects the raw client with @InjectEddyq() because the batch spans multiple kinds — a QueueHandle works too when every item lands on the same queue. The summary handler reads counts under _eddyq_batch in its payload — see the Batches page for the envelope shape.
import { Body, Controller, Post } from "@nestjs/common";
import { InjectEddyq, type Eddyq } from "@eddyq/nestjs";
interface RunShardsBody {
scope: string;
shards: number;
}
@Controller("reports")
export class ReportsController {
constructor(@InjectEddyq() private readonly queue: Eddyq) {}
/**
* Fan-in pattern. Enqueue N shard jobs as one batch; eddyq fires
* `report.summary` exactly once when every shard reaches a terminal state
* (completed / failed / cancelled). The summary handler receives counts
* under `_eddyq_batch` in its payload — branch on those if you want to
* treat partial failures differently from a clean run.
*/
@Post("run-shards")
async runShards(
@Body() body: RunShardsBody,
): Promise<{ batchId: number; inserted: number }> {
const stamp = Date.now();
const r = await this.queue.enqueueBatch({
items: Array.from({ length: body.shards }, (_, i) => ({
kind: "report.shard",
payload: { scope: body.scope, shard: i },
uniqueKey: `report:${body.scope}:${stamp}:${i}`,
})),
onComplete: {
kind: "report.summary",
payload: { scope: body.scope, runAt: stamp },
},
metadata: { scope: body.scope, runAt: stamp },
});
return { batchId: Number(r.batchId), inserted: Number(r.inserted) };
}
}Running it locally
docker compose -f docker-compose.dev.yml up -d
pnpm -C packages/queue build:debug
pnpm -C packages/nestjs build
pnpm -C examples/nestjs-basic dev:api # http://localhost:3000
pnpm -C examples/nestjs-basic dev:worker # picks up jobsSee the example's README for end-to-end test scripts (load.mjs).