Filas (Queues)
BullMQ (Redis) e RabbitMQ para processamento assincrono com workers, producers e consumers.
O PlazerCLI suporta duas opcoes de filas: BullMQ (baseado em Redis) e RabbitMQ (AMQP). Ambas sao configuradas com producers, consumers/workers e tipagem completa.
- BullMQ: Ideal para a maioria dos casos. Usa o Redis que ja esta no projeto. Suporta retry, delay, prioridades e cron jobs.
- RabbitMQ: Para cenarios de alta escala, routing complexo ou quando voce precisa de exchanges/queues com bindings avancados.
BullMQ — Setup
// apps/api/src/queues/email.queue.ts
import { Queue, Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
// Producer — enfileirar jobs
export const emailQueue = new Queue('email', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 1000,
removeOnFail: 5000,
},
});
// Worker — processar jobs
export const emailWorker = new Worker('email', async (job: Job) => {
const { to, subject, html } = job.data;
await mailService.sendMail({ to, subject, html });
console.log(`Email sent to ${to}`);
}, {
connection: redis,
concurrency: 5,
limiter: { max: 10, duration: 1000 }, // rate limit: 10 emails/s
});
emailWorker.on('completed', (job) => console.log(`Job ${job.id} completed`));
emailWorker.on('failed', (job, err) => console.error(`Job ${job?.id} failed:`, err));
// Uso: enfileirar um email
await emailQueue.add('send-welcome', {
to: 'user@example.com',
subject: 'Bem-vindo!',
html: welcomeTemplate('Joao', 'MeuApp'),
});
BullMQ — NestJS com @nestjs/bullmq
// apps/api/src/queues/queues.module.ts
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { EmailProcessor } from './email.processor';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: Number(process.env.REDIS_PORT) || 6379,
},
}),
BullModule.registerQueue({ name: 'email' }),
],
providers: [EmailProcessor],
exports: [BullModule],
})
export class QueuesModule {}
// apps/api/src/queues/email.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('email')
export class EmailProcessor extends WorkerHost {
async process(job: Job): Promise<void> {
switch (job.name) {
case 'send-welcome':
await this.mailService.sendMail(job.data);
break;
case 'send-invoice':
await this.mailService.sendMail(job.data);
break;
}
}
}
RabbitMQ — Setup
# docker-compose.yml (servico RabbitMQ)
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: ${PROJECT_NAME}-rabbitmq
ports:
- '5672:5672' # AMQP
- '15672:15672' # Management UI
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
// apps/api/src/queues/rabbitmq.ts
import amqp, { Channel, Connection } from 'amqplib';
let connection: Connection;
let channel: Channel;
export async function connectRabbitMQ() {
connection = await amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost');
channel = await connection.createChannel();
await channel.assertQueue('email', { durable: true });
return channel;
}
// Producer
export async function publishToQueue(queue: string, data: any) {
channel.sendToQueue(queue, Buffer.from(JSON.stringify(data)), {
persistent: true,
});
}
// Consumer
export async function consumeQueue(queue: string, handler: (data: any) => Promise<void>) {
channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await handler(data);
channel.ack(msg);
} catch (err) {
channel.nack(msg, false, true); // requeue
}
});
}