Queue Manager

Lucent exposes a queue manager backed by BullMQ:

  • bullmq for Redis-backed distributed queues

Queues are optional. A common pattern is to initialize only when queue runtime config is available.

Setup

BullMQ (Redis)

// src/libs/queue.ts
import { LucentQueueManager } from "@codesordinatestudio/lucent";

const redisUrl = process.env.REDIS_URL;

export const appQueue = redisUrl
  ? LucentQueueManager.initialize({
      bullmq: {
        connection: { url: redisUrl },
        prefix: "lucent",
      },
    })
  : null;

Service Wrapper

// src/queues/EmailQueue.ts
import { appQueue } from "../libs/queue";

export interface SendEmailJob {
  to: string;
  subject: string;
  body: string;
}

export class EmailQueue {
  static async send(payload: SendEmailJob) {
    if (!appQueue) return;
    await appQueue.addJob("email", "send-email", payload);
  }

  static worker() {
    if (!appQueue) return;
    appQueue.registerWorker<SendEmailJob>("email", async (job) => {
      const { to, subject } = job.data;
      console.log(`[worker] sending email to ${to}: ${subject}`);
    });
  }
}

Startup Behavior

  • if queue runtime config is not present, your queue wrapper should no-op or fail fast explicitly
  • start workers only after the app is listening
app.listen(3000);
EmailQueue.worker();

Manager API

addJob(queue, name, data, options?)

await appQueue?.addJob("emails", "send-welcome", { to: "user@example.com" }, { delay: 5000, attempts: 3 });

addBulk(queue, jobs)

await appQueue?.addBulk("emails", [
  { name: "send-notification", data: { to: "a@example.com" } },
  { name: "send-notification", data: { to: "b@example.com" } },
]);

registerWorker(queue, handler, options?)

appQueue?.registerWorker(
  "emails",
  async (job) => {
    await sendEmail(job.data);
  },
  { concurrency: 5 },
);

getQueueEvents(queue)

const events = appQueue?.getQueueEvents("emails");
await events?.disconnect();

close()

await appQueue?.close();

Operational Guidance

  • use bullmq when you need shared/distributed workers on Redis
  • treat queue workers as production dependencies once enabled
  • keep queue integration tests separate from unit-only suites when queue runtime is optional