Applicability
When to Use
✓When processing large datasets that take hours
✓When pipeline failures shouldn't require full restarts
✓When exactly-once processing semantics are needed
Overview
How It Works
This pattern implements ETL with checkpointing using MCP servers. The pipeline tracks its progress in a checkpoint store (Redis or PostgreSQL), so if it fails, it can resume from the last successful checkpoint rather than starting over.
Each stage of the pipeline writes a checkpoint after successful completion. The checkpoint includes the last processed record ID, timestamp, and any intermediate state. On restart, the pipeline reads the checkpoint and continues from where it left off. This is critical for large datasets where reprocessing from scratch would take hours.
Implementation
Code Example
typescript
async function runETLWithCheckpoint(pipelineId) {
const checkpoint = await redis.get(`checkpoint:${pipelineId}`) || { lastId: 0, stage: "extract" };
if (checkpoint.stage === "extract") {
const data = await postgres.query("SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 10000", [checkpoint.lastId]);
await redis.set(`staging:${pipelineId}`, JSON.stringify(data.rows));
await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ lastId: data.rows.at(-1)?.id || checkpoint.lastId, stage: "transform" }));
}
if (checkpoint.stage === "transform") {
const raw = JSON.parse(await redis.get(`staging:${pipelineId}`));
const transformed = raw.map(transformRecord);
await redis.set(`transformed:${pipelineId}`, JSON.stringify(transformed));
await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ ...checkpoint, stage: "load" }));
}
if (checkpoint.stage === "load") {
const data = JSON.parse(await redis.get(`transformed:${pipelineId}`));
await loadToWarehouse(data);
await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ ...checkpoint, stage: "extract" }));
}
}Quick Info
Categorydata-pipeline
ComplexityHard