Message Queues and Asynchronous Processing - Production Patterns for Scalable Systems
Master message queues and async processing with RabbitMQ, Redis, AWS SQS, job queues, event-driven architectures, retry strategies, and dead letter queues for production applications.
Message queues enable asynchronous processing, decoupling services and improving scalability by offloading time-consuming tasks from request-response cycles. This comprehensive guide covers production-ready patterns for implementing message queues, processing background jobs, handling failures gracefully, and building event-driven architectures that scale to millions of messages per day.
Why Message Queues Matter
Performance: Offload slow operations (email sending, image processing, report generation) from HTTP requests, reducing response times from seconds to milliseconds.
Reliability: Guarantee message delivery through persistence and retry mechanisms, ensuring critical tasks complete even during failures.
Scalability: Add worker processes independently of web servers, scaling processing capacity without touching frontend infrastructure.
Shopify processes 500M+ background jobs daily using Sidekiq and Redis, handling Black Friday traffic spikes (10x normal load) without impacting checkout performance.
Message Queue Fundamentals
Core Concepts
Producer: Service that creates and sends messages to the queue
Consumer/Worker: Process that reads and processes messages from the queue
Message: Unit of work containing task data and metadata
Queue: FIFO buffer storing messages between producers and consumers
Exchange: Routes messages to appropriate queues based on rules (RabbitMQ)
Dead Letter Queue (DLQ): Holds messages that failed processing after max retries
When to Use Message Queues
Use message queues for:
- Email/SMS sending
- Image/video processing
- Report generation
- Data export/import
- Third-party API calls
- Batch processing
- Scheduled tasks
Don't use for:
- Real-time user interactions requiring immediate feedback
- Simple CRUD operations
- Tasks completing in <100ms
Redis as a Message Queue
Redis provides lightweight message queue functionality with pub/sub and list operations.
Bull Queue (Node.js)
import Queue from 'bull';
import { sendEmail } from './email';
import { processImage } from './image';
// Create queues
export const emailQueue = new Queue('email', {
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379')
}
});
export const imageQueue = new Queue('image-processing', {
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379')
}
});
// Producer - add jobs to queue
app.post('/api/users', async (req, res) => {
const user = await prisma.user.create({
data: req.body
});
// Send welcome email asynchronously
await emailQueue.add('welcome-email', {
to: user.email,
userId: user.id,
template: 'welcome'
});
// Return immediately without waiting for email
res.status(201).json(user);
});
// Consumer - process jobs from queue
emailQueue.process('welcome-email', async (job) => {
const { to, userId, template } = job.data;
await sendEmail({
to,
subject: 'Welcome to our platform!',
template,
data: { userId }
});
console.log(Welcome email sent to ${to});
});
// Image processing with retry configuration
imageQueue.process('thumbnail', 5, async (job) => {
const { imageUrl, userId } = job.data;
const thumbnail = await processImage(imageUrl, {
width: 200,
height: 200,
format: 'webp'
});
await prisma.user.update({
where: { id: userId },
data: { thumbnailUrl: thumbnail.url }
});
return { thumbnailUrl: thumbnail.url };
});
// Job events
emailQueue.on('completed', (job, result) => {
console.log(Job ${job.id} completed);
});
emailQueue.on('failed', (job, err) => {
console.error(Job ${job.id} failed:, err.message);
// Alert on repeated failures
if (job.attemptsMade >= job.opts.attempts) {
alertOnCriticalFailure(job, err);
}
});
emailQueue.on('stalled', (job) => {
console.warn(Job ${job.id} stalled - worker may have crashed);
});
Job Options and Configuration
// Priority queue - high priority jobs processed first
await emailQueue.add(
'urgent-notification',
{ to: 'admin@example.com', message: 'Server down!' },
{
priority: 1, // Lower number = higher priority
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000 // Start with 2s, then 4s, then 8s
}
}
);
// Delayed jobs - process after specific delay
await emailQueue.add(
'reminder-email',
{ userId: user.id },
{
delay: 24 * 60 * 60 * 1000 // 24 hours
}
);
// Scheduled jobs - process at specific time
await emailQueue.add(
'scheduled-report',
{ reportId: report.id },
{
delay: new Date('2026-03-01T09:00:00Z').getTime() - Date.now()
}
);
// Remove duplicate jobs
await emailQueue.add(
'daily-digest',
{ userId: user.id },
{
jobId: daily-digest:${user.id}:${today}, // Unique ID prevents duplicates
removeOnComplete: true,
removeOnFail: false
}
);
RabbitMQ for Robust Messaging
RabbitMQ provides enterprise-grade message queuing with advanced routing, clustering, and guaranteed delivery.
Setup and Basic Usage
import amqp from 'amqplib';
class RabbitMQService {
private connection?: amqp.Connection;
private channel?: amqp.Channel;
async connect(): Promise<void> {
this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
this.channel = await this.connection.createChannel();
// Handle connection errors
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
setTimeout(() => this.connect(), 5000); // Reconnect after 5s
});
this.connection.on('close', () => {
console.log('RabbitMQ connection closed, reconnecting...');
setTimeout(() => this.connect(), 5000);
});
}
async publishMessage(
queue: string,
message: any,
options?: amqp.Options.Publish
): Promise<void> {
if (!this.channel) await this.connect();
// Ensure queue exists
await this.channel!.assertQueue(queue, {
durable: true // Survive broker restart
});
// Publish message
this.channel!.sendToQueue(
queue,
Buffer.from(JSON.stringify(message)),
{
persistent: true, // Survive broker restart
...options
}
);
}
async consumeMessages(
queue: string,
handler: (message: any) => Promise<void>,
options?: { prefetch?: number }
): Promise<void> {
if (!this.channel) await this.connect();
await this.channel!.assertQueue(queue, { durable: true });
// Limit unacknowledged messages per worker
if (options?.prefetch) {
this.channel!.prefetch(options.prefetch);
}
this.channel!.consume(queue, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await handler(data);
// Acknowledge message - removes from queue
this.channel!.ack(msg);
} catch (error) {
console.error('Message processing error:', error);
// Reject and requeue message
this.channel!.nack(msg, false, true);
}
});
}
}
export const rabbitmq = new RabbitMQService();
// Initialize on startup
rabbitmq.connect();
// Producer
app.post('/api/orders', async (req, res) => {
const order = await prisma.order.create({
data: req.body
});
// Publish order confirmation task
await rabbitmq.publishMessage('order-processing', {
orderId: order.id,
userId: order.userId,
total: order.total
});
res.status(201).json(order);
});
// Consumer (separate worker process)
rabbitmq.consumeMessages(
'order-processing',
async (message) => {
const { orderId, userId, total } = message;
// Send confirmation email
await sendOrderConfirmation(userId, orderId);
// Update inventory
await updateInventory(orderId);
// Process payment
await processPayment(orderId, total);
console.log(`Order ${orderId} processed successfully`);
},
{ prefetch: 10 } // Process 10 messages concurrently
);
Exchange Patterns
Fanout Exchange - Broadcast to All Queues
async function setupFanoutExchange() {
const exchange = 'user-events';
// Create exchange
await channel.assertExchange(exchange, 'fanout', { durable: true });
// Create queues for different consumers
await channel.assertQueue('email-notifications');
await channel.assertQueue('analytics-events');
await channel.assertQueue('audit-logs');
// Bind queues to exchange
await channel.bindQueue('email-notifications', exchange, '');
await channel.bindQueue('analytics-events', exchange, '');
await channel.bindQueue('audit-logs', exchange, '');
// Publish event - delivered to all queues
channel.publish(
exchange,
'',
Buffer.from(JSON.stringify({
type: 'user.created',
userId: user.id,
timestamp: Date.now()
}))
);
}
Topic Exchange - Pattern-Based Routing
async function setupTopicExchange() {
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: true });
// Queue for critical errors
await channel.assertQueue('critical-alerts');
await channel.bindQueue('critical-alerts', exchange, '*.critical');
// Queue for all application logs
await channel.assertQueue('app-logs');
await channel.bindQueue('app-logs', exchange, 'app.*');
// Queue for all logs
await channel.assertQueue('all-logs');
await channel.bindQueue('all-logs', exchange, '#');
// Publish messages with routing keys
channel.publish(
exchange,
'app.critical',
Buffer.from(JSON.stringify({ message: 'Database connection lost!' }))
);
// Routes to: critical-alerts, app-logs, all-logs
channel.publish(
exchange,
'app.info',
Buffer.from(JSON.stringify({ message: 'User logged in' }))
);
// Routes to: app-logs, all-logs
}
AWS SQS for Serverless
AWS SQS provides fully managed message queuing without infrastructure management.
SQS with Node.js
import { SQSClient, SendMessageCommand, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const QUEUE_URL = process.env.SQS_QUEUE_URL!;
// Producer
async function sendToQueue(message: any) {
const command = new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify(message),
MessageAttributes: {
'messageType': {
DataType: 'String',
StringValue: 'order.created'
}
},
DelaySeconds: 0
});
const result = await sqsClient.send(command);
console.log('Message sent:', result.MessageId);
}
// Consumer (polling)
async function pollQueue() {
while (true) {
try {
const command = new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10, // Batch processing
WaitTimeSeconds: 20, // Long polling
MessageAttributeNames: ['All']
});
const { Messages } = await sqsClient.send(command);
if (!Messages || Messages.length === 0) {
continue;
}
// Process messages in parallel
await Promise.all(
Messages.map(async (message) => {
try {
const data = JSON.parse(message.Body!);
await processMessage(data);
// Delete message after successful processing
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: message.ReceiptHandle!
}));
} catch (error) {
console.error('Message processing failed:', error);
// Message returns to queue after visibility timeout
}
})
);
} catch (error) {
console.error('Polling error:', error);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// Start consumer
pollQueue();
Dead Letter Queue
// Configure DLQ in AWS Console or via SDK
const DLQ_URL = process.env.SQS_DLQ_URL!;
// Monitor DLQ for failed messages
async function processDLQ() {
const command = new ReceiveMessageCommand({
QueueUrl: DLQ_URL,
MaxNumberOfMessages: 10
});
const { Messages } = await sqsClient.send(command);
for (const message of Messages || []) {
const data = JSON.parse(message.Body!);
// Log failure for investigation
console.error('Message failed after max retries:', {
messageId: message.MessageId,
data,
attributes: message.MessageAttributes
});
// Alert team
await sendAlert({
severity: 'high',
title: 'Message processing permanently failed',
body: JSON.stringify(data, null, 2)
});
// Archive for later analysis
await archiveFailedMessage(data);
// Remove from DLQ
await sqsClient.send(new DeleteMessageCommand({
QueueUrl: DLQ_URL,
ReceiptHandle: message.ReceiptHandle!
}));
}
}
// Run DLQ processor every 5 minutes
setInterval(processDLQ, 5 * 60 * 1000);
Error Handling and Retry Strategies
Exponential Backoff
async function processWithRetry<T>(
fn: () => Promise<T>,
maxAttempts: number = 5,
baseDelay: number = 1000
): Promise<T> {
let attempt = 1;
let lastError: Error;
while (attempt <= maxAttempts) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
if (attempt === maxAttempts) {
throw new Error(`Failed after ${maxAttempts} attempts: ${lastError.message}`);
}
// Exponential backoff with jitter
const delay = Math.min(
baseDelay * Math.pow(2, attempt - 1) + Math.random() * 1000,
30000 // Max 30 seconds
);
console.warn(`Attempt ${attempt} failed, retrying in ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
attempt++;
}
}
throw lastError!;
}
// Usage in job processor
imageQueue.process('resize', async (job) => {
const { imageUrl } = job.data;
return await processWithRetry(
() => resizeImage(imageUrl),
5,
2000
);
});
Idempotent Message Processing
// Store processed message IDs to prevent duplicates
const processedMessages = new Set<string>();
async function processMessageIdempotent(messageId: string, handler: () => Promise<void>) {
// Check if already processed
const alreadyProcessed = await redis.get(processed:${messageId});
if (alreadyProcessed) {
console.log(Message ${messageId} already processed, skipping);
return;
}
// Process message
await handler();
// Mark as processed (TTL 7 days)
await redis.setex(processed:${messageId}, 7 * 24 * 60 * 60, '1');
}
// Usage
rabbitmq.consumeMessages('orders', async (message) => {
await processMessageIdempotent(message.messageId, async () => {
await processOrder(message.orderId);
});
});
Monitoring and Observability
Queue Metrics
import { Registry, Gauge, Counter } from 'prom-client';
const registry = new Registry();
// Queue depth metric
const queueDepthGauge = new Gauge({
name: 'queue_depth',
help: 'Number of messages waiting in queue',
labelNames: ['queue_name'],
registers: [registry]
});
// Messages processed counter
const messagesProcessedCounter = new Counter({
name: 'messages_processed_total',
help: 'Total messages processed',
labelNames: ['queue_name', 'status'],
registers: [registry]
});
// Update metrics every 10 seconds
setInterval(async () => {
const emailCount = await emailQueue.count();
const imageCount = await imageQueue.count();
queueDepthGauge.set({ queue_name: 'email' }, emailCount);
queueDepthGauge.set({ queue_name: 'image' }, imageCount);
}, 10000);
// Track processing
emailQueue.on('completed', (job) => {
messagesProcessedCounter.inc({ queue_name: 'email', status: 'success' });
});
emailQueue.on('failed', (job) => {
messagesProcessedCounter.inc({ queue_name: 'email', status: 'failed' });
});
// Metrics endpoint for Prometheus
app.get('/metrics', async (req, res) => {
res.set('Content-Type', registry.contentType);
res.end(await registry.metrics());
});
Real-World Examples
Shopify's Background Jobs
Shopify processes 500M+ daily jobs with Sidekiq (Redis-based):
- Order processing (payment, fulfillment, notifications)
- Product synchronization across channels
- Inventory updates
- Analytics aggregation
Their architecture scales to 10x normal load during Black Friday by adding worker capacity independently of web servers.
GitHub's Background Processing
GitHub uses Resque (Redis) for:
- Repository indexing and search updates
- Webhook delivery
- Email notifications
- CI/CD pipeline triggers
Average processing: 2M jobs/hour with p99 latency under 5 seconds.
Uber's Event-Driven Architecture
Uber processes billions of events daily with Apache Kafka:
- Trip events (request, accept, start, complete)
- Location updates (driver/rider positioning)
- Payment processing
- Fraud detection
Event-driven design enables real-time features (live ETA, driver tracking) while decoupling services for independent scaling.
Conclusion
Message queues are essential for building scalable, resilient backend systems. Redis with Bull provides lightweight queuing for most applications, while RabbitMQ offers advanced routing and guaranteed delivery for complex workflows. AWS SQS enables serverless architectures without infrastructure management.
Key patterns - exponential backoff for retries, dead letter queues for failed messages, idempotent processing for duplicates, and comprehensive monitoring - ensure reliable message processing at scale. Offload time-consuming tasks to background workers, improving API response times by 80%+ while maintaining system reliability through guaranteed delivery and automatic retries.
Start with Redis for simplicity, migrate to RabbitMQ when requiring advanced routing, and consider cloud-managed solutions like AWS SQS for serverless deployments.
Related Articles
GraphQL API Design - Production Architecture and Best Practices for Scalable Systems
Master GraphQL API design covering schema design principles, resolver optimization, N+1 query prevention with DataLoader, authentication and authorization patterns, caching strategies, error handling, and production deployment for high-performance GraphQL systems.
Testing Strategies - Unit, Integration, and E2E Testing Best Practices for Production Quality
Comprehensive guide to testing strategies covering unit tests, integration tests, end-to-end testing, test-driven development, mocking patterns, testing pyramid, and production testing practices for reliable software delivery.
Monitoring and Observability - Production Systems Performance and Debugging at Scale
Master monitoring and observability covering metrics collection with Prometheus, distributed tracing with OpenTelemetry, log aggregation, alerting strategies, SLOs/SLIs, and production debugging techniques for reliable systems.
Written by StaticBlock Editorial
StaticBlock Editorial is a technical writer and software engineer specializing in web development, performance optimization, and developer tooling.