ETL (Extract, Transform, Load) pipelines process data from external sources into your application. Here is how to build them.
Pipeline Framework
// lib/pipeline.ts
export type StepStatus = "pending" | "running" | "completed" | "failed" | "skipped";
export interface PipelineStep<TInput = unknown, TOutput = unknown> {
name: string;
execute: (input: TInput, context: PipelineContext) => Promise<TOutput>;
retries?: number;
retryDelay?: number;
skipIf?: (input: TInput, context: PipelineContext) => boolean;
}
export interface PipelineContext {
pipelineId: string;
startedAt: Date;
metadata: Record<string, unknown>;
log: (message: string) => void;
}
export interface StepResult {
step: string;
status: StepStatus;
duration: number;
error?: string;
recordsProcessed?: number;
}
export class Pipeline {
private steps: PipelineStep[] = [];
private name: string;
constructor(name: string) {
this.name = name;
}
addStep<TInput, TOutput>(step: PipelineStep<TInput, TOutput>) {
this.steps.push(step as PipelineStep);
return this;
}
async execute(initialInput: unknown = null): Promise<{
success: boolean;
results: StepResult[];
duration: number;
}> {
const pipelineStart = Date.now();
const results: StepResult[] = [];
const logs: string[] = [];
const context: PipelineContext = {
pipelineId: `${this.name}-${Date.now()}`,
startedAt: new Date(),
metadata: {},
log: (msg) => {
logs.push(`[${new Date().toISOString()}] ${msg}`);
console.log(`[Pipeline:${this.name}] ${msg}`);
},
};
let currentInput = initialInput;
for (const step of this.steps) {
const stepStart = Date.now();
if (step.skipIf?.(currentInput, context)) {
results.push({
step: step.name,
status: "skipped",
duration: 0,
});
continue;
}
context.log(`Starting step: ${step.name}`);
const maxRetries = step.retries ?? 0;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
currentInput = await step.execute(currentInput, context);
results.push({
step: step.name,
status: "completed",
duration: Date.now() - stepStart,
});
context.log(`Completed step: ${step.name} in ${Date.now() - stepStart}ms`);
break;
} catch (error) {
if (attempt === maxRetries) {
const errorMsg = error instanceof Error ? error.message : "Unknown error";
context.log(`Failed step: ${step.name} — ${errorMsg}`);
results.push({
step: step.name,
status: "failed",
duration: Date.now() - stepStart,
error: errorMsg,
});
return {
success: false,
results,
duration: Date.now() - pipelineStart,
};
}
context.log(`Retrying step: ${step.name} (attempt ${attempt + 2}/${maxRetries + 1})`);
await new Promise((r) => setTimeout(r, step.retryDelay ?? 1000));
}
}
}
return {
success: true,
results,
duration: Date.now() - pipelineStart,
};
}
}
Example: Sync Products from External API
// pipelines/sync-products.ts
import { Pipeline, type PipelineStep } from "@/lib/pipeline";
interface RawProduct {
id: string;
name: string;
price_cents: number;
category_id: string;
description: string;
in_stock: boolean;
updated_at: string;
}
interface TransformedProduct {
externalId: string;
name: string;
price: number;
category: string;
description: string;
available: boolean;
lastSynced: Date;
}
const extractStep: PipelineStep<null, RawProduct[]> = {
name: "Extract products from API",
retries: 3,
retryDelay: 2000,
execute: async (_input, context) => {
const products: RawProduct[] = [];
let page = 1;
let hasMore = true;
while (hasMore) {
const res = await fetch(
`${process.env.PRODUCTS_API_URL}/products?page=${page}&limit=100`,
{ headers: { Authorization: `Bearer ${process.env.PRODUCTS_API_KEY}` } }
);
if (!res.ok) throw new Error(`API returned ${res.status}`);
const data = await res.json();
products.push(...data.products);
hasMore = data.has_more;
page++;
}
context.log(`Extracted ${products.length} products`);
context.metadata.totalExtracted = products.length;
return products;
},
};
const transformStep: PipelineStep<RawProduct[], TransformedProduct[]> = {
name: "Transform product data",
execute: async (products, context) => {
const transformed = products
.filter((p) => p.name && p.price_cents > 0)
.map((p) => ({
externalId: p.id,
name: p.name.trim(),
price: p.price_cents / 100,
category: p.category_id,
description: p.description?.slice(0, 500) ?? "",
available: p.in_stock,
lastSynced: new Date(),
}));
const removed = products.length - transformed.length;
if (removed > 0) {
context.log(`Filtered out ${removed} invalid products`);
}
return transformed;
},
};
const loadStep: PipelineStep<TransformedProduct[], { inserted: number; updated: number }> = {
name: "Load into database",
retries: 2,
execute: async (products, context) => {
let inserted = 0;
let updated = 0;
// Process in batches of 50
for (let i = 0; i < products.length; i += 50) {
const batch = products.slice(i, i + 50);
for (const product of batch) {
// Upsert pattern:
// const existing = await db.select().from(productsTable).where(eq(productsTable.externalId, product.externalId));
// if (existing.length > 0) {
// await db.update(productsTable).set(product).where(eq(productsTable.externalId, product.externalId));
// updated++;
// } else {
// await db.insert(productsTable).values(product);
// inserted++;
// }
inserted++; // Placeholder
}
context.log(`Processed batch ${Math.ceil(i / 50) + 1}`);
}
return { inserted, updated };
},
};
export function createProductSyncPipeline() {
return new Pipeline("product-sync")
.addStep(extractStep)
.addStep(transformStep)
.addStep(loadStep);
}
Cron Job API Route
// app/api/cron/sync-products/route.ts
import { NextRequest, NextResponse } from "next/server";
import { createProductSyncPipeline } from "@/pipelines/sync-products";
export async function POST(request: NextRequest) {
// Verify cron secret (Vercel Cron or external scheduler)
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const pipeline = createProductSyncPipeline();
const result = await pipeline.execute();
if (!result.success) {
console.error("Pipeline failed:", result.results);
return NextResponse.json(
{ success: false, results: result.results, duration: result.duration },
{ status: 500 }
);
}
return NextResponse.json({
success: true,
results: result.results,
duration: result.duration,
});
}
Vercel Cron Config
// vercel.json
{
"crons": [
{
"path": "/api/cron/sync-products",
"schedule": "0 */6 * * *"
}
]
}
Need Data Integration Solutions?
We build ETL pipelines, API integrations, and data synchronization systems. Contact us to discuss your data needs.