Applicability
When to Use
✓When processing millions of records
✓When individual record processing is slow
✓When throughput needs to scale horizontally
Overview
How It Works
This pattern divides a large dataset into chunks and processes them in parallel using multiple worker agents. A coordinator agent assigns chunks to workers, tracks progress, and handles failures. Redis MCP Server provides the shared state for coordination.
The coordinator queries the total dataset size, creates work items for each chunk, and pushes them to a Redis queue. Worker agents pull from the queue, process their chunk using the appropriate MCP servers, and report completion. If a worker fails, its chunk can be re-queued.
Implementation
Code Example
typescript
async function coordinateBatch(query, chunkSize = 1000) {
const total = await postgres.query("SELECT count(*) FROM (" + query + ") t");
const chunks = Math.ceil(total / chunkSize);
for (let i = 0; i < chunks; i++) {
await redis.lpush("work-queue", JSON.stringify({ offset: i * chunkSize, limit: chunkSize }));
}
await redis.set("batch:total", chunks);
await redis.set("batch:completed", 0);
}
async function worker() {
while (true) {
const work = await redis.brpop("work-queue", 5);
if (!work) break;
const { offset, limit } = JSON.parse(work);
const data = await postgres.query(`${QUERY} OFFSET $1 LIMIT $2`, [offset, limit]);
await processChunk(data.rows);
await redis.incr("batch:completed");
}
}Quick Info
Categorydata-pipeline
ComplexityMedium