Why Your AI Feature Needs a Job Queue (And How to Add One with BullMQ)

Friday 29/05/2026

·11 min read
Share:

Your "Generate Report" button works perfectly in development. One user clicks, an LLM crunches their data for 45 seconds, and a beautiful PDF appears. Then you deploy, three users click at once, your Vercel function hits its 60-second limit, the API returns 504s, your retry logic re-triggers the LLM call, and the same report gets charged twice while showing the user nothing. This is the most common production failure mode for AI features, and it's almost always the same root cause: synchronous LLM calls in HTTP request handlers.

The fix is a background job queue. This post walks through adding a BullMQ + Redis job queue to a Next.js AI feature in TypeScript — refactoring a blocking "generate report" endpoint into a queued job with status polling, SSE progress updates, idempotency keys (so a retry doesn't double-bill you), and priority lanes for paid users. Pairs naturally with handling AI API rate limits and the real cost of running an AI feature.

Why synchronous LLM calls are an antipattern

A normal API call returns in 50–200ms. An LLM call returns in 2–60 seconds. That difference breaks most of the assumptions your HTTP stack makes.

  • Serverless timeouts. Vercel Hobby is 60s, Pro is 300s, Edge is even tighter. A multi-step agent or a long report easily exceeds these.
  • Load balancer timeouts. Cloudflare, Fastly, and most ELB configurations cut at 100s by default. You can raise the limit but you can't make a browser sit on a pending fetch forever.
  • Deploy interruptions. When you push a new version, in-flight requests get killed. With a 45-second job, every deploy will mid-call abort someone's expensive LLM run.
  • Browser retries. A stalled or 504'd fetch often triggers automatic retries (some load balancers, some SDKs, fetch with retry middleware). Each retry kicks off another LLM call. You pay for all of them.
  • No backpressure. A traffic spike with synchronous handlers means N concurrent LLM calls, hitting your provider's rate limit, returning 429s, and cascading failure to your users.

A queue moves the work off the request path. The HTTP request becomes "accept the job, return an ID" — fast, idempotent, retry-safe. A separate worker process does the LLM call on its own timeline.

The decision rule: when sync is fine

Don't queue everything. A chatbot turn that streams tokens is sync — the user is watching the stream, the response time is the UX. A simple completion that takes ~2 seconds and is part of an interactive flow is sync.

You need a queue when any of these is true:

  1. The job takes longer than 10 seconds.
  2. The user shouldn't have to keep the tab open until it finishes.
  3. The job is expensive enough that a duplicate run hurts (≥ $0.01 per call is my threshold).
  4. The job has side effects (sends an email, posts to Slack, writes to a paid third-party API).
  5. You want to control concurrency or prioritize paying users over free.

If you ticked one of these, queue it.

Setting up BullMQ and Redis

BullMQ is the de facto Node.js job queue. It's actively maintained, runs on Redis, supports priorities, delayed jobs, repeatable jobs, and rate limiting out of the box.

pnpm add bullmq ioredis zod
pnpm add -D @types/node

You need a Redis instance. Upstash has a generous free tier and works well with serverless; for self-hosted, anything ≥ Redis 6.2 works. BullMQ does not work with serverless Redis HTTP APIs that don't support BLPOP — Upstash's regular Redis connection (not the REST one) is what you want.

# .env.local
REDIS_URL=rediss://default:password@your-host.upstash.io:6379

A shared queue and connection module

// src/lib/queue.ts
import { Queue, QueueEvents } from 'bullmq'
import IORedis from 'ioredis'

export const connection = new IORedis(process.env.REDIS_URL!, {
    maxRetriesPerRequest: null,
    enableReadyCheck: false,
})

export type ReportJobData = {
    userId: string
    reportId: string
    inputDataUrl: string
    plan: 'free' | 'pro'
}

export type ReportJobResult = {
    pdfUrl: string
    tokensUsed: number
}

export const reportQueue = new Queue<ReportJobData, ReportJobResult>('reports', {
    connection,
    defaultJobOptions: {
        attempts: 3,
        backoff: { type: 'exponential', delay: 5000 },
        removeOnComplete: { age: 24 * 3600, count: 10000 },
        removeOnFail: { age: 7 * 24 * 3600 },
    },
})

export const reportQueueEvents = new QueueEvents('reports', { connection })

A few things worth flagging. maxRetriesPerRequest: null is required by BullMQ — without it, blocking commands fail. removeOnComplete keeps the last 10k completed jobs for 24h so you can still surface a "your report is ready" link if the user refreshes; older jobs get garbage-collected so Redis doesn't grow unbounded. The attempt count is 3, but the worker still needs to be careful about idempotency for those retries (more below).

The API route: accept the job, return the ID

// src/app/api/reports/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { randomUUID } from 'crypto'
import { reportQueue } from '@/src/lib/queue'

export const runtime = 'nodejs'

const BodySchema = z.object({
    inputDataUrl: z.string().url(),
    idempotencyKey: z.string().min(8).max(128).optional(),
})

export async function POST(req: NextRequest) {
    const session = await getSession(req)
    if (!session) return NextResponse.json({ error: 'unauthorized' }, { status: 401 })

    const parsed = BodySchema.safeParse(await req.json())
    if (!parsed.success) {
        return NextResponse.json({ error: 'bad_request', issues: parsed.error.issues }, { status: 400 })
    }

    const reportId = randomUUID()
    const idempotencyKey =
        parsed.data.idempotencyKey ?? req.headers.get('idempotency-key') ?? reportId

    const job = await reportQueue.add(
        'generate',
        {
            userId: session.userId,
            reportId,
            inputDataUrl: parsed.data.inputDataUrl,
            plan: session.plan,
        },
        {
            jobId: `${session.userId}:${idempotencyKey}`,
            priority: session.plan === 'pro' ? 1 : 10,
        }
    )

    return NextResponse.json({ jobId: job.id, reportId }, { status: 202 })
}

declare function getSession(req: NextRequest): Promise<{ userId: string; plan: 'free' | 'pro' } | null>

The two important details here:

jobId is the idempotency key. BullMQ will refuse to add a second job with the same jobId if the first is still in the queue or being processed. If a flaky network causes the client to retry the POST, the duplicate hits the same jobId and is silently deduped — you don't double-charge the LLM. The idempotency key comes from an explicit body field, falls back to the standard Idempotency-Key header, and falls back further to the generated UUID for clients that don't send one.

priority is lower-is-higher in BullMQ. Pro users get priority 1, free users get 10. When the worker is at capacity, pro jobs jump the line. Don't use this to block free users — use it to soften congestion during spikes.

The response is 202 Accepted with the job ID. The client now polls or subscribes for progress.

The worker: where the LLM actually runs

The worker is a separate process. Do not run it inside your Next.js server — serverless functions can't host long-running workers, and even on a long-running Node host, mixing request handlers with workers ruins your graceful shutdown story.

// src/worker/report-worker.ts
import { Worker } from 'bullmq'
import Anthropic from '@anthropic-ai/sdk'
import { connection, type ReportJobData, type ReportJobResult } from '@/src/lib/queue'
import { generatePdf, uploadPdf, getInputData, markReport } from '@/src/lib/reports'

const anthropic = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY! })

const worker = new Worker<ReportJobData, ReportJobResult>(
    'reports',
    async (job) => {
        const { reportId, inputDataUrl, plan } = job.data

        const existing = await markReport(reportId, 'processing')
        if (existing?.status === 'completed' && existing.pdfUrl) {
            return { pdfUrl: existing.pdfUrl, tokensUsed: 0 }
        }

        await job.updateProgress(10)
        const input = await getInputData(inputDataUrl)

        await job.updateProgress(30)
        const llmResponse = await anthropic.messages.create({
            model: plan === 'pro' ? 'claude-opus-4-7' : 'claude-haiku-4-5-20251001',
            max_tokens: 4000,
            messages: [
                {
                    role: 'user',
                    content: `Summarize this dataset into an executive report:\n\n${input}`,
                },
            ],
        })
        const textBlock = llmResponse.content.find((b) => b.type === 'text')
        if (!textBlock || textBlock.type !== 'text') {
            throw new Error('no_text_response')
        }

        await job.updateProgress(70)
        const pdfBytes = await generatePdf(textBlock.text)
        const pdfUrl = await uploadPdf(reportId, pdfBytes)

        await markReport(reportId, 'completed', { pdfUrl })
        await job.updateProgress(100)

        return { pdfUrl, tokensUsed: llmResponse.usage.input_tokens + llmResponse.usage.output_tokens }
    },
    {
        connection,
        concurrency: 5,
        limiter: { max: 30, duration: 60_000 },
    }
)

worker.on('failed', (job, err) => {
    console.error(`Job ${job?.id} failed:`, err.message)
})

Key choices:

  • concurrency: 5 — each worker process handles up to 5 jobs in parallel. Tune this against your LLM provider's rate limits and your own DB connection pool. You can run multiple worker processes for horizontal scale.
  • limiter: { max: 30, duration: 60_000 } — caps the worker at 30 jobs per minute regardless of concurrency. This is application-level rate limiting that sits in front of your Anthropic/OpenAI quota and prevents you from blowing past it.
  • The early existing?.status === 'completed' check — second layer of idempotency. If the job retried for any reason after the LLM call succeeded but the worker died before returning, we don't re-run the expensive call.
  • updateProgress — emits events that the frontend can subscribe to via SSE (next section).

Streaming progress to the frontend with SSE

The client got a jobId back. Two options for updates: polling or Server-Sent Events. Polling is simpler; SSE feels better.

// src/app/api/reports/[jobId]/events/route.ts
import { NextRequest } from 'next/server'
import { reportQueue, reportQueueEvents } from '@/src/lib/queue'

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
    const { jobId } = params

    const stream = new ReadableStream({
        async start(controller) {
            const send = (event: string, data: unknown) => {
                controller.enqueue(
                    new TextEncoder().encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`)
                )
            }

            const job = await reportQueue.getJob(jobId)
            if (!job) {
                send('error', { message: 'job_not_found' })
                controller.close()
                return
            }
            send('progress', { progress: job.progress, state: await job.getState() })

            const onProgress = ({ jobId: id, data }: { jobId: string; data: number | object }) => {
                if (id === jobId) send('progress', { progress: data })
            }
            const onCompleted = ({ jobId: id, returnvalue }: { jobId: string; returnvalue: string }) => {
                if (id === jobId) {
                    send('completed', JSON.parse(returnvalue))
                    controller.close()
                }
            }
            const onFailed = ({ jobId: id, failedReason }: { jobId: string; failedReason: string }) => {
                if (id === jobId) {
                    send('failed', { reason: failedReason })
                    controller.close()
                }
            }

            reportQueueEvents.on('progress', onProgress)
            reportQueueEvents.on('completed', onCompleted)
            reportQueueEvents.on('failed', onFailed)

            req.signal.addEventListener('abort', () => {
                reportQueueEvents.off('progress', onProgress)
                reportQueueEvents.off('completed', onCompleted)
                reportQueueEvents.off('failed', onFailed)
                controller.close()
            })
        },
    })

    return new Response(stream, {
        headers: {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache, no-transform',
            Connection: 'keep-alive',
        },
    })
}

On the client, new EventSource(`/api/reports/${jobId}/events`) gives you a real-time progress bar that survives tab refresh. If the user closes and reopens, you re-subscribe and the next progress event fills the bar back in.

Graceful shutdown

The worker is going to be redeployed regularly. Without graceful shutdown, in-flight jobs get killed mid-LLM-call — you pay for the tokens but the user gets nothing.

// src/worker/report-worker.ts (bottom of the file)
async function shutdown() {
    console.log('shutting down worker...')
    await worker.close()
    await connection.quit()
    process.exit(0)
}

process.on('SIGTERM', shutdown)
process.on('SIGINT', shutdown)

worker.close() waits for the current jobs to finish before exiting. On most orchestrators (Fly, Render, Railway, Kubernetes), you get 30 seconds between SIGTERM and SIGKILL — usually plenty for one LLM call to finish. If your jobs are longer, extend the grace period in your platform's config.

A few production gotchas

  • Don't put the worker on Vercel. Run it on Railway, Fly, Render, or any always-on Node host. Vercel cron functions can trigger jobs but can't be the worker.
  • Watch your Redis memory. BullMQ stores full job data and results in Redis. If your inputs are large (uploaded files, long contexts), store them in object storage and pass a URL — don't shove the bytes through Redis.
  • Set explicit attempts. Default retry behavior can re-trigger expensive LLM calls. The idempotency check inside the worker is your safety net, but you should also set attempts: 1 for jobs you don't want retried at all.
  • Track cost per job. Save tokensUsed from each completed job to your DB so you can bill, monitor, and detect a runaway prompt.

What's next

Once you have a queue and observability around jobs, the natural next step is systematic evaluation — making sure prompt edits don't silently regress quality across thousands of historical inputs. That's what I'll cover next: building an AI eval suite with Promptfoo to catch prompt regressions before production.

Share:
VA

Vadim Alakhverdov

Software developer writing about JavaScript, web development, and developer tools.

Related Posts