Skip to content

Example app

A complete, runnable NestJS app using @eddyq/nestjs lives at examples/nestjs-basic/.

It demonstrates the patterns every queue-using app needs:

  • A feature module declares its queue with registerQueue, a controller enqueues via @InjectQueue, and a processor handles it (the email/ module)
  • A queue-scoped cron schedule fires a job that the same module's processor handles (the reports/ module)
  • Fan-in batches with an onComplete callback (the reports/run-shards endpoint)

The app also splits API and worker entry points so they can scale independently.

Project layout

src/
├── main.ts                 # API entry (NestFactory.create)
├── worker.ts               # Worker entry (createApplicationContext)
├── app.module.ts           # API root — autoStart: false
├── workers.module.ts       # Worker root — runs the job runtime

├── email/
│   ├── email.module.ts
│   ├── email.controller.ts # POST /email/send → queue.enqueue("send.email", ...)
│   └── email.processor.ts  # @JobHandler("send.email")

└── reports/
    ├── reports.module.ts
    ├── reports.controller.ts # POST /reports/run-shards → queue.enqueueBatch(...)
    └── reports.processor.ts  # @JobHandler("report.generate" | "report.shard" | "report.summary")

API composition root

autoStart: false keeps the worker runtime off in the API process — API pods enqueue and serve admin reads, worker pods actually process jobs.

ts
import { Module } from "@nestjs/common";

import { EddyqWakeboardModule } from "@eddyq/wakeboard";
import { EddyqModule } from "@eddyq/nestjs";

import { EmailModule } from "./email/email.module.js";
import { ReportsModule } from "./reports/reports.module.js";

const DEFAULT_DATABASE_URL =
  "postgres://eddyq:eddyq@localhost:5433/eddyq_dev?options=-c%20search_path%3Dv01";

/**
 * API composition root. Imports feature modules so producers (controllers)
 * are available, but `autoStart: false` keeps the worker runtime off — API
 * pods enqueue and serve admin reads; worker pods (see workers.module.ts)
 * actually process jobs.
 */
@Module({
  imports: [
    EddyqModule.forRoot({
      databaseUrl: process.env.EDDYQ_DATABASE_URL ?? DEFAULT_DATABASE_URL,
      autoStart: false,
    }),
    EddyqWakeboardModule.forRoot({
      mountPath: "/wakeboard",
      auth: { password: process.env.WAKEBOARD_PASSWORD ?? "admin" },
    }),
    EmailModule,
    ReportsModule,
  ],
})
export class AppModule {}

Worker composition root

createApplicationContext boots the DI container without an HTTP listener — exactly what a worker pod needs.

ts
import "reflect-metadata";

import { Logger } from "@nestjs/common";
import { NestFactory } from "@nestjs/core";

import { WorkersModule } from "./workers.module.js";

/**
 * Worker entry. `createApplicationContext` boots the DI container without
 * an HTTP listener — that's what a worker pod needs.
 */
async function bootstrap(): Promise<void> {
  const app = await NestFactory.createApplicationContext(WorkersModule, {
    logger: ["log", "warn", "error"],
  });
  app.enableShutdownHooks();
  Logger.log("worker runtime started", "Bootstrap");
}

void bootstrap();

Declaring a queue in a feature module

EddyqModule.registerQueue lives in the feature module that owns the queue. Per-queue defaults (and schedules, groups) ride along here — no central queue registry.

ts
import { Module } from "@nestjs/common";

import { EddyqModule } from "@eddyq/nestjs";

import { EmailController } from "./email.controller.js";
import { EmailProcessor } from "./email.processor.js";

@Module({
  imports: [
    EddyqModule.registerQueue({
      name: "email",
      defaults: { maxAttempts: 5 },
    }),
  ],
  controllers: [EmailController],
  providers: [EmailProcessor],
})
export class EmailModule {}

Enqueueing from a controller

@InjectQueue('email') returns a QueueHandle pre-bound to the queue name + its defaults. Use uniqueKey to dedupe replays. enqueueMany is one round-trip for an entire batch.

ts
import { Body, Controller, Post } from "@nestjs/common";

import { InjectQueue, type QueueHandle } from "@eddyq/nestjs";

interface SendEmailBody {
  to: string;
  subject: string;
}

interface SendBulkBody {
  messages: SendEmailBody[];
}

@Controller("email")
export class EmailController {
  constructor(@InjectQueue("email") private readonly queue: QueueHandle) {}

  @Post("send")
  async send(@Body() body: SendEmailBody): Promise<{ jobId: number | undefined }> {
    const r = await this.queue.enqueue("send.email", body, {
      uniqueKey: `email:${body.to}:${Date.now()}`,
    });
    return { jobId: r.id };
  }

  /**
   * Bulk enqueue. One Postgres round-trip for the whole batch — a 500-message
   * payload takes roughly the same time as a 50-message payload. Per-message
   * `uniqueKey` still deduplicates against the existing queue, so resubmitting
   * the same list is safe.
   */
  @Post("send-bulk")
  async sendBulk(
    @Body() body: SendBulkBody,
  ): Promise<{ inserted: number; skipped: number }> {
    const stamp = Date.now();
    const r = await this.queue.enqueueMany(
      body.messages.map((m) => ({
        kind: "send.email",
        payload: m,
        uniqueKey: `email:${m.to}:${stamp}`,
      })),
    );
    return { inserted: Number(r.inserted), skipped: Number(r.skipped) };
  }
}

Handling jobs

@Processor() marks the class. @JobHandler('kind') binds methods to job kinds. Throw any error to retry with exponential backoff; throw CancelError to give up; throw RetryError for an explicit delay.

ts
import { Logger } from "@nestjs/common";

import { JobHandler, Processor, type JobCall } from "@eddyq/nestjs";

@Processor()
export class EmailProcessor {
  private readonly logger = new Logger(EmailProcessor.name);

  @JobHandler("send.email")
  async send({ payload, id }: JobCall): Promise<void> {
    const { to, subject } = payload as { to: string; subject: string };
    this.logger.log(`send.email #${id}: to=${to} subject="${subject}"`);
    // In a real app: await sendgrid.send({ to, subject, ... })
    // Throw any Error to retry with exponential backoff.
  }
}

Cron-scheduled jobs

Declare schedules on the owning queue via registerQueue({ schedules }) — eddyq inserts the job at the right time, and your processor handles it like any other. The schedule's queue defaults to the enclosing registerQueue name.

ts
import { Module } from "@nestjs/common";

import { EddyqModule } from "@eddyq/nestjs";

import { ReportsController } from "./reports.controller.js";
import { ReportsProcessor } from "./reports.processor.js";

@Module({
  imports: [
    EddyqModule.registerQueue({
      name: "reports",
      defaults: { priority: 5 },
      schedules: [
        {
          // every day at 08:00:00 UTC (sec min hour dom month dow)
          name: "daily-report",
          cronExpr: "0 0 8 * * *",
          kind: "report.generate",
          payload: { scope: "daily" },
          priority: 5,
          // queue defaults to the enclosing registerQueue's name ("reports")
        },
      ],
    }),
  ],
  controllers: [ReportsController],
  providers: [ReportsProcessor],
})
export class ReportsModule {}
ts
import { Logger } from "@nestjs/common";

import { JobHandler, Processor, type JobCall } from "@eddyq/nestjs";

interface BatchEnvelope {
  batchId: number;
  total: number;
  completed: number;
  failed: number;
  cancelled: number;
  durationMs: number;
}

@Processor()
export class ReportsProcessor {
  private readonly logger = new Logger(ReportsProcessor.name);

  @JobHandler("report.generate")
  async generate({ payload, id }: JobCall): Promise<void> {
    this.logger.log(`report.generate #${id}: ${JSON.stringify(payload)}`);
    // In a real app: build the report, write it somewhere, notify stakeholders.
  }

  @JobHandler("report.shard")
  async shard({ payload, id }: JobCall): Promise<void> {
    const { scope, shard } = payload as { scope: string; shard: number };
    this.logger.log(`report.shard #${id}: scope=${scope} shard=${shard}`);
    // In a real app: process this slice of the report.
  }

  /**
   * Fires once per batch run — see `reports.controller.ts`. The `_eddyq_batch`
   * envelope is injected by eddyq alongside the user payload; everything else
   * in `payload` is whatever you set on `onComplete`.
   */
  @JobHandler("report.summary")
  async summary({ payload, id }: JobCall): Promise<void> {
    const { _eddyq_batch, ...user } = payload as {
      _eddyq_batch: BatchEnvelope;
      [k: string]: unknown;
    };
    if (_eddyq_batch.failed > 0 || _eddyq_batch.cancelled > 0) {
      this.logger.warn(
        `report.summary #${id}: scope=${(user as { scope?: string }).scope} ` +
          `${_eddyq_batch.completed}/${_eddyq_batch.total} ok ` +
          `(${_eddyq_batch.failed} failed, ${_eddyq_batch.cancelled} cancelled) ` +
          `in ${_eddyq_batch.durationMs}ms`,
      );
      return;
    }
    this.logger.log(
      `report.summary #${id}: scope=${(user as { scope?: string }).scope} ` +
        `all ${_eddyq_batch.total} shards complete in ${_eddyq_batch.durationMs}ms`,
    );
  }
}

Batches with a fan-in callback

enqueueBatch enqueues N items and fires a single callback when every item terminates. The reports controller injects the raw client with @InjectEddyq() because the batch spans multiple kinds — a QueueHandle works too when every item lands on the same queue. The summary handler reads counts under _eddyq_batch in its payload — see the Batches page for the envelope shape.

ts
import { Body, Controller, Post } from "@nestjs/common";

import { InjectEddyq, type Eddyq } from "@eddyq/nestjs";

interface RunShardsBody {
  scope: string;
  shards: number;
}

@Controller("reports")
export class ReportsController {
  constructor(@InjectEddyq() private readonly queue: Eddyq) {}

  /**
   * Fan-in pattern. Enqueue N shard jobs as one batch; eddyq fires
   * `report.summary` exactly once when every shard reaches a terminal state
   * (completed / failed / cancelled). The summary handler receives counts
   * under `_eddyq_batch` in its payload — branch on those if you want to
   * treat partial failures differently from a clean run.
   */
  @Post("run-shards")
  async runShards(
    @Body() body: RunShardsBody,
  ): Promise<{ batchId: number; inserted: number }> {
    const stamp = Date.now();
    const r = await this.queue.enqueueBatch({
      items: Array.from({ length: body.shards }, (_, i) => ({
        kind: "report.shard",
        payload: { scope: body.scope, shard: i },
        uniqueKey: `report:${body.scope}:${stamp}:${i}`,
      })),
      onComplete: {
        kind: "report.summary",
        payload: { scope: body.scope, runAt: stamp },
      },
      metadata: { scope: body.scope, runAt: stamp },
    });
    return { batchId: Number(r.batchId), inserted: Number(r.inserted) };
  }
}

Running it locally

bash
docker compose -f docker-compose.dev.yml up -d
pnpm -C packages/queue build:debug
pnpm -C packages/nestjs build
pnpm -C examples/nestjs-basic dev:api      # http://localhost:3000
pnpm -C examples/nestjs-basic dev:worker   # picks up jobs

See the example's README for end-to-end test scripts (load.mjs).

Released under the MIT or Apache-2.0 License.