ETL Pipeline with Checkpointing

data-pipelineHard
Applicability

When to Use

When processing large datasets that take hours
When pipeline failures shouldn't require full restarts
When exactly-once processing semantics are needed
Overview

How It Works

This pattern implements ETL with checkpointing using MCP servers. The pipeline tracks its progress in a checkpoint store (Redis or PostgreSQL), so if it fails, it can resume from the last successful checkpoint rather than starting over. Each stage of the pipeline writes a checkpoint after successful completion. The checkpoint includes the last processed record ID, timestamp, and any intermediate state. On restart, the pipeline reads the checkpoint and continues from where it left off. This is critical for large datasets where reprocessing from scratch would take hours.
Implementation

Code Example

typescript
async function runETLWithCheckpoint(pipelineId) {
  const checkpoint = await redis.get(`checkpoint:${pipelineId}`) || { lastId: 0, stage: "extract" };
  
  if (checkpoint.stage === "extract") {
    const data = await postgres.query("SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 10000", [checkpoint.lastId]);
    await redis.set(`staging:${pipelineId}`, JSON.stringify(data.rows));
    await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ lastId: data.rows.at(-1)?.id || checkpoint.lastId, stage: "transform" }));
  }
  
  if (checkpoint.stage === "transform") {
    const raw = JSON.parse(await redis.get(`staging:${pipelineId}`));
    const transformed = raw.map(transformRecord);
    await redis.set(`transformed:${pipelineId}`, JSON.stringify(transformed));
    await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ ...checkpoint, stage: "load" }));
  }
  
  if (checkpoint.stage === "load") {
    const data = JSON.parse(await redis.get(`transformed:${pipelineId}`));
    await loadToWarehouse(data);
    await redis.set(`checkpoint:${pipelineId}`, JSON.stringify({ ...checkpoint, stage: "extract" }));
  }
}

Quick Info

Categorydata-pipeline
ComplexityHard

Need Architecture Help?

Our team designs custom automation architectures.

Get in Touch
CortexAgent Customer Service

Want to skip the form?

Our team is available to help you get started with CortexAgent.

This chat may be recorded for quality assurance. You can view our Privacy Policy.