Skip to content

Queue Manager

Lucent provides a Redis-backed queue implementation powered by BullMQ.

Setup

First, initialize the @codesordinatestudio/lucent-cms LucentQueueManager singleton with your Redis connection:

typescript
// src/libs/queue.ts
import { LucentQueueManager } from "@codesordinatestudio/lucent-cms";

export const appQueue = LucentQueueManager.initialize({
  connection: process.env.REDIS_URL || "redis://localhost:6379",
});

Creating a Queue Service

It is recommended to wrap queue interactions in a dedicated service for type safety and easy usage:

typescript
// src/queues/EmailQueue.ts
import { appQueue } from "../libs/queue";

export interface SendEmailJob {
  to: string;
  subject: string;
  body: string;
}

export class EmailQueue {
  /** Add job to queue */
  static async send(payload: SendEmailJob) {
    await appQueue.addJob("email", "send-welcome", payload);
  }

  /** Start processing jobs */
  static worker() {
    appQueue.registerWorker<SendEmailJob>("email", async (job) => {
      const { to, subject } = job.data;
      console.log(`[worker] Sending email to ${to}: ${subject}`);
      // Implement logic...
    });
  }
}

Running Workers

In your main entry point, start the queue workers after initializing the server:

typescript
// src/index.ts
import { EmailQueue } from "./queues/EmailQueue";

const app = new Elysia()
  /* ... */
  .listen(3000);

// Start workers
EmailQueue.worker();

Manager API

The LucentQueueManager provides several methods for job management:

Jobs

  • addJob(queue, name, data, options?): Add a single job to the specified queue.
    • options.delay: Wait for X milliseconds before processing.
    • options.attempts: Number of retry attempts.
    • options.backoff: Retrying with exponential backoff strategy.
  • addBulk(queue, jobs): Perform a single Redis trip to enqueue multiple jobs efficiently.

Job Processing

  • registerWorker(queue, handler, options?): Define how jobs from a queue should be processed.
    • options.concurrency: How many jobs this worker processes at the same time (default: 1).

Events & Maintenance

  • getQueueEvents(queue): Listen to global job life cycle events (completed, failed, etc.).
  • close(): Gracefully shut down all queue instances and workers. Useful for testing or server shutdowns.

Usage in Hooks

Queues are perfect for offloading expensive tasks from collection hooks:

typescript
// src/collections/Posts.ts
import { EmailQueue } from "../queues/EmailQueue";

afterCreate: [
  async ({ doc }) => {
    // Non-blocking background job
    await EmailQueue.send({
      to: "admin@example.com",
      subject: "New Post Created",
      body: `Post ID: ${doc.id}`,
    });
  },
];

Released under the MIT License.