Queue Manager
Lucent exposes a queue manager backed by BullMQ:
bullmqfor Redis-backed distributed queues
Queues are optional. A common pattern is to initialize only when queue runtime config is available.
Setup
BullMQ (Redis)
// src/libs/queue.ts
import { LucentQueueManager } from "@codesordinatestudio/lucent";
const redisUrl = process.env.REDIS_URL;
export const appQueue = redisUrl
? LucentQueueManager.initialize({
bullmq: {
connection: { url: redisUrl },
prefix: "lucent",
},
})
: null;
Service Wrapper
// src/queues/EmailQueue.ts
import { appQueue } from "../libs/queue";
export interface SendEmailJob {
to: string;
subject: string;
body: string;
}
export class EmailQueue {
static async send(payload: SendEmailJob) {
if (!appQueue) return;
await appQueue.addJob("email", "send-email", payload);
}
static worker() {
if (!appQueue) return;
appQueue.registerWorker<SendEmailJob>("email", async (job) => {
const { to, subject } = job.data;
console.log(`[worker] sending email to ${to}: ${subject}`);
});
}
}
Startup Behavior
- if queue runtime config is not present, your queue wrapper should no-op or fail fast explicitly
- start workers only after the app is listening
app.listen(3000);
EmailQueue.worker();
Manager API
addJob(queue, name, data, options?)
await appQueue?.addJob("emails", "send-welcome", { to: "user@example.com" }, { delay: 5000, attempts: 3 });
addBulk(queue, jobs)
await appQueue?.addBulk("emails", [
{ name: "send-notification", data: { to: "a@example.com" } },
{ name: "send-notification", data: { to: "b@example.com" } },
]);
registerWorker(queue, handler, options?)
appQueue?.registerWorker(
"emails",
async (job) => {
await sendEmail(job.data);
},
{ concurrency: 5 },
);
getQueueEvents(queue)
const events = appQueue?.getQueueEvents("emails");
await events?.disconnect();
close()
await appQueue?.close();
Operational Guidance
- use
bullmqwhen you need shared/distributed workers on Redis - treat queue workers as production dependencies once enabled
- keep queue integration tests separate from unit-only suites when queue runtime is optional