Filas (Queues)

BullMQ (Redis) e RabbitMQ para processamento assincrono com workers, producers e consumers.

O PlazerCLI suporta duas opcoes de filas: BullMQ (baseado em Redis) e RabbitMQ (AMQP). Ambas sao configuradas com producers, consumers/workers e tipagem completa.

  • BullMQ: Ideal para a maioria dos casos. Usa o Redis que ja esta no projeto. Suporta retry, delay, prioridades e cron jobs.
  • RabbitMQ: Para cenarios de alta escala, routing complexo ou quando voce precisa de exchanges/queues com bindings avancados.

BullMQ — Setup

// apps/api/src/queues/email.queue.ts
import { Queue, Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';

// Producer — enfileirar jobs
export const emailQueue = new Queue('email', {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: 1000,
    removeOnFail: 5000,
  },
});

// Worker — processar jobs
export const emailWorker = new Worker('email', async (job: Job) => {
  const { to, subject, html } = job.data;
  await mailService.sendMail({ to, subject, html });
  console.log(`Email sent to ${to}`);
}, {
  connection: redis,
  concurrency: 5,
  limiter: { max: 10, duration: 1000 }, // rate limit: 10 emails/s
});

emailWorker.on('completed', (job) => console.log(`Job ${job.id} completed`));
emailWorker.on('failed', (job, err) => console.error(`Job ${job?.id} failed:`, err));

// Uso: enfileirar um email
await emailQueue.add('send-welcome', {
  to: 'user@example.com',
  subject: 'Bem-vindo!',
  html: welcomeTemplate('Joao', 'MeuApp'),
});

BullMQ — NestJS com @nestjs/bullmq

// apps/api/src/queues/queues.module.ts
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { EmailProcessor } from './email.processor';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST || 'localhost',
        port: Number(process.env.REDIS_PORT) || 6379,
      },
    }),
    BullModule.registerQueue({ name: 'email' }),
  ],
  providers: [EmailProcessor],
  exports: [BullModule],
})
export class QueuesModule {}

// apps/api/src/queues/email.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('email')
export class EmailProcessor extends WorkerHost {
  async process(job: Job): Promise<void> {
    switch (job.name) {
      case 'send-welcome':
        await this.mailService.sendMail(job.data);
        break;
      case 'send-invoice':
        await this.mailService.sendMail(job.data);
        break;
    }
  }
}

RabbitMQ — Setup

# docker-compose.yml (servico RabbitMQ)
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    container_name: ${PROJECT_NAME}-rabbitmq
    ports:
      - '5672:5672'   # AMQP
      - '15672:15672' # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

// apps/api/src/queues/rabbitmq.ts
import amqp, { Channel, Connection } from 'amqplib';

let connection: Connection;
let channel: Channel;

export async function connectRabbitMQ() {
  connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
  channel = await connection.createChannel();
  await channel.assertQueue('email', { durable: true });
  return channel;
}

// Producer
export async function publishToQueue(queue: string, data: any) {
  channel.sendToQueue(queue, Buffer.from(JSON.stringify(data)), {
    persistent: true,
  });
}

// Consumer
export async function consumeQueue(queue: string, handler: (data: any) => Promise<void>) {
  channel.consume(queue, async (msg) => {
    if (!msg) return;
    try {
      const data = JSON.parse(msg.content.toString());
      await handler(data);
      channel.ack(msg);
    } catch (err) {
      channel.nack(msg, false, true); // requeue
    }
  });
}