Batch Processing with Parallel Workers

data-pipelineMedium
Applicability

When to Use

When processing millions of records
When individual record processing is slow
When throughput needs to scale horizontally
Overview

How It Works

This pattern divides a large dataset into chunks and processes them in parallel using multiple worker agents. A coordinator agent assigns chunks to workers, tracks progress, and handles failures. Redis MCP Server provides the shared state for coordination. The coordinator queries the total dataset size, creates work items for each chunk, and pushes them to a Redis queue. Worker agents pull from the queue, process their chunk using the appropriate MCP servers, and report completion. If a worker fails, its chunk can be re-queued.
Implementation

Code Example

typescript
async function coordinateBatch(query, chunkSize = 1000) {
  const total = await postgres.query("SELECT count(*) FROM (" + query + ") t");
  const chunks = Math.ceil(total / chunkSize);
  
  for (let i = 0; i < chunks; i++) {
    await redis.lpush("work-queue", JSON.stringify({ offset: i * chunkSize, limit: chunkSize }));
  }
  await redis.set("batch:total", chunks);
  await redis.set("batch:completed", 0);
}

async function worker() {
  while (true) {
    const work = await redis.brpop("work-queue", 5);
    if (!work) break;
    const { offset, limit } = JSON.parse(work);
    const data = await postgres.query(`${QUERY} OFFSET $1 LIMIT $2`, [offset, limit]);
    await processChunk(data.rows);
    await redis.incr("batch:completed");
  }
}

Quick Info

Categorydata-pipeline
ComplexityMedium

Need Architecture Help?

Our team designs custom automation architectures.

Get in Touch
CortexAgent Customer Service

Want to skip the form?

Our team is available to help you get started with CortexAgent.

This chat may be recorded for quality assurance. You can view our Privacy Policy.