Our content pipeline broke at exactly 47 concurrent jobs.
Not 50. Not 100. Forty-seven — because that's when Redis started dropping connections faster than our retry logic could catch them, and the jobs weren't failing loudly. They were failing silently. Articles were getting swallowed with no trace in the logs, no error in the queue, no entry in the database.
That's the part nobody writes about in queue tutorials. I'm writing about it here.
This is the architecture we landed on after three failed versions — the one that now powers CAT's bulk content generation at 500+ jobs/hour without data loss. I'll walk you through the full implementation, including the retry logic and the dead letter queue that saved us.
Why a Queue in the First Place
If you're building anything that calls an external API — especially OpenAI — at scale, you already know the naive approach breaks fast:
// The naive approach. Works at 5 req/min. Collapses at 50.
for (const article of articles) {
const content = await generateContent(article.prompt);
await db.save({ id: article.id, content });
}
Three things kill you here:
Rate limits — OpenAI caps requests per minute and tokens per minute independently. A for-loop doesn't know or care.
Partial failures — If job #23 throws, what happens to jobs #24-100? In sequential code, they don't run. In a queue, they can.
Observability — When something goes wrong, you want a record of what was in-flight. A for-loop leaves nothing behind.
A queue gives you concurrency control, retry logic, and a paper trail. Here's how to build one that actually holds up.
The Stack
- Node.js 20+ (native --watch, stable fetch)
- Redis 7 (for the queue backend)
- BullMQ (not Bull — BullMQ is the maintained fork with TypeScript support and better flow control)
- ioredis (BullMQ's required Redis client)
If you're wondering why BullMQ over plain Redis lists: BullMQ gives you job states (waiting, active, completed, failed), automatic retries with backoff, and a web UI (Bull Board) without building any of that yourself. The tradeoff is a slightly more complex API. Worth it at scale.
npm install bullmq ioredis
Step 1: The Redis Connection
This is where most tutorials gloss over the part that matters. Your Redis connection configuration will determine how your queue behaves under load.
// lib/redis.js
import { Redis } from "ioredis";
// BullMQ requires TWO separate Redis connections:
// one for the queue (subscriber) and one for the worker (publisher).
// Reusing a single connection will cause silent failures under load.
export function createRedisConnection() {
return new Redis({
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
maxRetriesPerRequest: null, // Required by BullMQ — do not omit
enableReadyCheck: false, // Prevents startup race conditions
lazyConnect: true, // Don't connect until first command
});
}
The maxRetriesPerRequest: null line is the one that gets people. BullMQ manages its own retry loop. If you let ioredis also retry, you get double-retry behavior that makes jobs appear to hang. Set it to null and let BullMQ own the retry logic.
Step 2: Defining the Queue
// lib/contentQueue.js
import { Queue } from "bullmq";
import { createRedisConnection } from "./redis.js";
export const contentQueue = new Queue("content-generation", {
connection: createRedisConnection(),
defaultJobOptions: {
attempts: 3, // Retry up to 3 times before marking failed
backoff: {
type: "exponential",
delay: 2000, // First retry: 2s. Second: 4s. Third: 8s.
},
removeOnComplete: {
age: 86400, // Keep completed jobs for 24h for debugging
count: 1000, // But cap at 1000 to prevent memory bloat
},
removeOnFail: false, // Never auto-remove failed jobs — you want them
},
});
removeOnFail: false is the line I wish I'd set from day one. Failed jobs are your audit trail. When a client says "where's my article?" and the database has no record, you need to be able to check the dead letter queue. If you're auto-removing failed jobs, that investigation starts at "I don't know."
Step 3: Adding Jobs to the Queue
// api/generateArticles.js
import { contentQueue } from "../lib/contentQueue.js";
export async function queueArticleBatch(articles) {
const jobs = articles.map((article) => ({
name: "generate-article",
data: {
articleId: article.id,
keyword: article.keyword,
wordCount: article.wordCount || 1000,
userId: article.userId,
},
opts: {
// Job ID deduplication: same article won't queue twice
// even if the endpoint is called twice (e.g., double-submit)
jobId: `article-${article.id}`,
priority: article.isPriority ? 1 : 10, // Lower number = higher priority
},
}));
// addBulk is significantly faster than calling add() in a loop.
// Internally it uses a single Redis pipeline instead of N round trips.
const result = await contentQueue.addBulk(jobs);
return result.map((job) => ({ jobId: job.id, articleId: job.data.articleId }));
}
The jobId deduplication is something I added after we had a bug where a UI double-submit resulted in the same article being generated twice, billed twice, and returned twice. Setting a deterministic _jobId _makes BullMQ idempotent — adding a job that already exists returns the existing job instead of creating a duplicate.
Step 4: The Worker
This is where the actual work happens. The worker pulls jobs from the queue and executes them.
// workers/contentWorker.js
import { Worker } from "bullmq";
import { createRedisConnection } from "../lib/redis.js";
import { generateWithRetry } from "../lib/openai.js";
import { db } from "../lib/db.js";
const worker = new Worker(
"content-generation",
async (job) => {
const { articleId, keyword, wordCount, userId } = job.data;
// Update progress so the UI can show a spinner instead of nothing
await job.updateProgress(10);
const content = await generateWithRetry({ keyword, wordCount });
await job.updateProgress(80);
await db.articles.update({
where: { id: articleId },
data: {
content,
status: "completed",
completedAt: new Date(),
},
});
await job.updateProgress(100);
return { articleId, wordCount: content.split(" ").length };
},
{
connection: createRedisConnection(), // Separate connection from the queue
concurrency: 5, // Process 5 jobs simultaneously
limiter: {
max: 10, // Max 10 jobs processed
duration: 1000, // per 1000ms (per second)
},
}
);
// Worker event handlers — non-negotiable for production
worker.on("completed", (job, result) => {
console.log(`[${job.id}] Completed: ${result.wordCount} words for article ${result.articleId}`);
});
worker.on("failed", (job, error) => {
console.error(`[${job?.id}] Failed after ${job?.attemptsMade} attempts:`, error.message);
// Add your alerting here — Slack, PagerDuty, whatever you use
});
worker.on("error", (error) => {
// This catches connection errors, not job errors
console.error("Worker connection error:", error);
});
The limiter config is how you respect OpenAI's rate limits without sleeping between requests. The worker will automatically throttle itself to 10 jobs/second. Adjust this based on your API tier.
Step 5: Retry Logic for the OpenAI Calls
BullMQ retries at the job level. But you also want request-level retry for transient 429s and 500s from OpenAI — errors that should resolve in seconds, not after a full job retry cycle.
// lib/openai.js
import OpenAI from "openai";
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
export async function generateWithRetry({ keyword, wordCount }, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const response = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{
role: "system",
content: "You are a professional content writer. Respond with article content only, no preamble.",
},
{
role: "user",
content: `Write a ${wordCount}-word article about "${keyword}".
Include: engaging introduction, 3 H2 sections, practical examples, conclusion.
Tone: professional but approachable.`,
},
],
max_tokens: Math.ceil(wordCount * 1.5), // ~1.5 tokens per word average
temperature: 0.7,
});
return response.choices[0].message.content;
} catch (error) {
const isRateLimit = error.status === 429;
const isServerError = error.status >= 500;
const isRetryable = isRateLimit || isServerError;
if (!isRetryable || attempt === retries) {
throw error; // Surface to BullMQ for job-level retry
}
// Exponential backoff: 1s, 2s, 4s
const delay = Math.pow(2, attempt - 1) * 1000;
console.warn(`OpenAI ${error.status} on attempt ${attempt}. Retrying in ${delay}ms...`);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
}
Two levels of retry — request-level for transient errors, job-level for failures that need a fresh start — gives you resilience without hammering the API when it's actually down.
Step 6: The Dead Letter Queue
Failed jobs that exhaust all retries live in BullMQ's failed state. You need a way to inspect and requeue them. This is the admin endpoint we use:
// api/admin/jobs.js
import { contentQueue } from "../../lib/contentQueue.js";
// GET /admin/jobs/failed
export async function getFailedJobs(req, res) {
const failedJobs = await contentQueue.getFailed(0, 50); // First 50 failed jobs
res.json({
count: failedJobs.length,
jobs: failedJobs.map((job) => ({
id: job.id,
articleId: job.data.articleId,
failedReason: job.failedReason,
attemptsMade: job.attemptsMade,
timestamp: new Date(job.timestamp).toISOString(),
})),
});
}
// POST /admin/jobs/:jobId/retry
export async function retryFailedJob(req, res) {
const { jobId } = req.params;
const job = await contentQueue.getJob(jobId);
if (!job) return res.status(404).json({ error: "Job not found" });
await job.retry();
res.json({ message: `Job ${jobId} requeued`, status: "waiting" });
}
// POST /admin/jobs/retry-all-failed
export async function retryAllFailed(req, res) {
const failedJobs = await contentQueue.getFailed(0, 100);
await Promise.all(failedJobs.map((job) => job.retry()));
res.json({ message: `${failedJobs.length} jobs requeued` });
}
This saved us three times in the first month after launch. A bad deploy caused all jobs to fail with a misconfiguration error. We fixed the config, hit /admin/jobs/retry-all-failed, and 340 queued articles processed without any customer data loss.
What Can Go Wrong
Silent job loss at high concurrency. This is what broke us at 47 concurrent jobs. The cause: using a single Redis connection for both the Queue and the Worker. BullMQ's internal pub/sub conflicts with command execution when they share a connection. The fix is in Step 1 — always createRedisConnection() separately for each BullMQ instance.
Workers getting stuck on long-running jobs. BullMQ has a lockDuration (default: 30 seconds). If a job takes longer than that, BullMQ assumes the worker crashed and moves the job back to waiting — where another worker picks it up. You'll get duplicate processing. Fix: set lockDuration and a corresponding lockRenewTime:
const worker = new Worker("content-generation", processor, {
connection: createRedisConnection(),
lockDuration: 60000, // 60 seconds
lockRenewTime: 20000, // Renew every 20s (should be lockDuration / 3)
});
Memory bloat from completed jobs. If you set removeOnComplete: false and never clean up, Redis memory will grow until your instance dies. The removeOnComplete: { age: 86400, count: 1000 } config in Step 2 handles this — but double-check it's actually set. I've seen it get accidentally overridden in per-job options.
Rate limit miscalculation. The limiter config controls how many jobs BullMQ starts, not how many OpenAI requests you make. If your worker calls OpenAI multiple times per job (e.g., outline generation + full article), your actual request rate is concurrency * openai_calls_per_job. Plan accordingly.
Monitoring Without a Dashboard
Bull Board is the standard UI for BullMQ, and it's worth adding. But if you want visibility immediately with zero setup, add this health endpoint:
// api/health/queue.js
export async function getQueueHealth(req, res) {
const [waiting, active, completed, failed, delayed] = await Promise.all([
contentQueue.getWaitingCount(),
contentQueue.getActiveCount(),
contentQueue.getCompletedCount(),
contentQueue.getFailedCount(),
contentQueue.getDelayedCount(),
]);
const isHealthy = failed === 0 || (failed / (completed + failed)) < 0.05; // < 5% failure rate
res.status(isHealthy ? 200 : 503).json({
status: isHealthy ? "healthy" : "degraded",
queue: { waiting, active, completed, failed, delayed },
failureRate: completed + failed > 0
? `${((failed / (completed + failed)) * 100).toFixed(1)}%`
: "0%",
});
}
Hook this up to whatever uptime monitoring you use. A spike in failed count with a corresponding drop in active is the pattern that precedes queue collapse — you want an alert before users start noticing.
What I'd Do Differently
I'd add the dead letter queue admin endpoints on day one. I built them after our first production incident. That was backwards. The moment you have production jobs, you need a recovery mechanism.
I'd separate "generation" jobs from "post-processing" jobs. Our first version had one big job: generate content → validate it → format it → save it. When formatting failed, the generation work was lost and had to be retried from scratch. Now we use BullMQ Flows — parent/child job dependencies — so each stage can fail and retry independently. More complex setup, dramatically better resilience.
I'd instrument token usage from the start. Every OpenAI response includes usage.total_tokens. I started logging it six weeks in, and immediately discovered that 12% of our jobs were consuming 3x more tokens than expected due to prompt edge cases. That's billing information I was flying blind on for a month and a half.
The full working implementation — with Bull Board, the admin endpoints, and the health check — is on GitHub: [link]. Drop a star if this helped; it keeps me writing more of these.
What's your retry strategy for OpenAI rate limits at scale? I've seen people use token bucket algorithms instead of BullMQ's built-in limiter — curious if anyone's found that worth the complexity.













