Event sourcing stores every state change as an event. This gives you a complete audit trail and enables powerful patterns.
Event and Command Types
// lib/es/types.ts
export interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
type: string;
data: Record<string, unknown>;
metadata: {
userId?: string;
timestamp: string;
version: number;
correlationId?: string;
};
}
export interface Command {
type: string;
aggregateId: string;
data: Record<string, unknown>;
metadata: { userId: string; correlationId?: string };
}
Event Store
// lib/es/event-store.ts
import { db } from "@/lib/db";
import { events } from "@/lib/db/schema";
import { eq, and, gt } from "drizzle-orm";
import type { DomainEvent } from "./types";
export class EventStore {
async append(event: DomainEvent): Promise<void> {
// Optimistic concurrency check
const lastEvent = await db.query.events.findFirst({
where: and(
eq(events.aggregateId, event.aggregateId),
eq(events.aggregateType, event.aggregateType),
),
orderBy: (e, { desc }) => desc(e.version),
});
const expectedVersion = (lastEvent?.version ?? 0) + 1;
if (event.metadata.version !== expectedVersion) {
throw new Error(
`Concurrency conflict: expected version ${expectedVersion}, got ${event.metadata.version}`,
);
}
await db.insert(events).values({
id: event.id,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
type: event.type,
data: event.data,
userId: event.metadata.userId,
version: event.metadata.version,
timestamp: new Date(event.metadata.timestamp),
correlationId: event.metadata.correlationId,
});
}
async getEvents(
aggregateId: string,
afterVersion = 0,
): Promise<DomainEvent[]> {
const rows = await db.query.events.findMany({
where: and(
eq(events.aggregateId, aggregateId),
gt(events.version, afterVersion),
),
orderBy: (e, { asc }) => asc(e.version),
});
return rows.map((row) => ({
id: row.id,
aggregateId: row.aggregateId,
aggregateType: row.aggregateType,
type: row.type,
data: row.data as Record<string, unknown>,
metadata: {
userId: row.userId ?? undefined,
timestamp: row.timestamp.toISOString(),
version: row.version,
correlationId: row.correlationId ?? undefined,
},
}));
}
}
Aggregate Base Class
// lib/es/aggregate.ts
import type { DomainEvent, Command } from "./types";
export abstract class Aggregate {
id: string;
version = 0;
changes: DomainEvent[] = [];
constructor(id: string) {
this.id = id;
}
abstract apply(event: DomainEvent): void;
abstract handle(command: Command): DomainEvent[];
protected raise(
type: string,
data: Record<string, unknown>,
metadata: Partial<DomainEvent["metadata"]> = {},
): DomainEvent {
this.version++;
const event: DomainEvent = {
id: crypto.randomUUID(),
aggregateId: this.id,
aggregateType: this.constructor.name,
type,
data,
metadata: {
timestamp: new Date().toISOString(),
version: this.version,
...metadata,
},
};
this.apply(event);
this.changes.push(event);
return event;
}
loadFromHistory(events: DomainEvent[]): void {
for (const event of events) {
this.apply(event);
this.version = event.metadata.version;
}
}
}
Order Aggregate Example
// lib/es/aggregates/order.ts
import { Aggregate } from "../aggregate";
import type { DomainEvent, Command } from "../types";
interface OrderItem {
productId: string;
name: string;
quantity: number;
price: number;
}
type OrderStatus = "created" | "confirmed" | "shipped" | "delivered" | "cancelled";
export class OrderAggregate extends Aggregate {
items: OrderItem[] = [];
status: OrderStatus = "created";
total = 0;
customerId = "";
apply(event: DomainEvent): void {
switch (event.type) {
case "OrderCreated":
this.customerId = event.data.customerId as string;
this.status = "created";
break;
case "ItemAdded": {
const item = event.data as unknown as OrderItem;
this.items.push(item);
this.total += item.price * item.quantity;
break;
}
case "ItemRemoved": {
const idx = this.items.findIndex(
(i) => i.productId === event.data.productId,
);
if (idx !== -1) {
this.total -= this.items[idx].price * this.items[idx].quantity;
this.items.splice(idx, 1);
}
break;
}
case "OrderConfirmed":
this.status = "confirmed";
break;
case "OrderShipped":
this.status = "shipped";
break;
case "OrderCancelled":
this.status = "cancelled";
break;
}
}
handle(command: Command): DomainEvent[] {
switch (command.type) {
case "CreateOrder":
return [
this.raise("OrderCreated", {
customerId: command.data.customerId,
}, { userId: command.metadata.userId }),
];
case "AddItem":
if (this.status !== "created") {
throw new Error("Cannot add items to a confirmed order");
}
return [this.raise("ItemAdded", command.data)];
case "RemoveItem":
if (this.status !== "created") {
throw new Error("Cannot remove items from a confirmed order");
}
return [this.raise("ItemRemoved", command.data)];
case "ConfirmOrder":
if (this.items.length === 0) {
throw new Error("Cannot confirm an empty order");
}
return [this.raise("OrderConfirmed", {})];
case "CancelOrder":
if (this.status === "shipped" || this.status === "delivered") {
throw new Error("Cannot cancel a shipped order");
}
return [
this.raise("OrderCancelled", {
reason: command.data.reason,
}),
];
default:
throw new Error(`Unknown command: ${command.type}`);
}
}
}
Command Handler
// lib/es/command-handler.ts
import { EventStore } from "./event-store";
import { OrderAggregate } from "./aggregates/order";
import type { Command, DomainEvent } from "./types";
const eventStore = new EventStore();
export async function handleCommand(command: Command): Promise<DomainEvent[]> {
// Load aggregate from event history
const aggregate = new OrderAggregate(command.aggregateId);
const history = await eventStore.getEvents(command.aggregateId);
aggregate.loadFromHistory(history);
// Handle command and produce events
const newEvents = aggregate.handle(command);
// Persist events
for (const event of newEvents) {
await eventStore.append(event);
}
return newEvents;
}
API Route
// app/api/orders/[id]/commands/route.ts
import { NextResponse } from "next/server";
import { handleCommand } from "@/lib/es/command-handler";
export async function POST(
request: Request,
{ params }: { params: Promise<{ id: string }> },
) {
const { id } = await params;
const body = await request.json();
try {
const events = await handleCommand({
type: body.type,
aggregateId: id,
data: body.data ?? {},
metadata: { userId: body.userId },
});
return NextResponse.json({ events });
} catch (error) {
return NextResponse.json(
{ error: (error as Error).message },
{ status: 400 },
);
}
}
Need Event-Driven Architecture?
We build event-sourced systems with full audit trails. Contact us to discuss your project.