Notification systems need to handle multiple channels, retry failures, respect user preferences, and process messages reliably. A queue-based architecture handles this well.
Notification Types
export type Channel = "email" | "push" | "in-app" | "sms";
export type Priority = "critical" | "high" | "normal" | "low";
export type NotificationStatus = "pending" | "processing" | "delivered" | "failed" | "retrying";
export interface Notification {
id: string;
userId: string;
channel: Channel;
priority: Priority;
status: NotificationStatus;
subject: string;
body: string;
metadata?: Record<string, unknown>;
attempts: number;
maxAttempts: number;
lastAttemptAt?: Date;
scheduledFor?: Date;
deliveredAt?: Date;
createdAt: Date;
}
export interface UserPreferences {
userId: string;
channels: Partial<Record<Channel, boolean>>;
quietHoursStart?: string; // "22:00"
quietHoursEnd?: string; // "08:00"
timezone: string;
}
Queue Implementation
// lib/notifications/queue.ts
import { redis } from "@/lib/redis";
import type { Notification, Priority } from "./types";
const PRIORITY_SCORES: Record<Priority, number> = {
critical: 4,
high: 3,
normal: 2,
low: 1,
};
const QUEUE_KEY = "notifications:queue";
const PROCESSING_KEY = "notifications:processing";
const DATA_PREFIX = "notifications:data:";
export async function enqueue(notification: Notification): Promise<void> {
const pipeline = redis.pipeline();
// Store notification data
pipeline.set(
`${DATA_PREFIX}${notification.id}`,
JSON.stringify(notification),
"EX",
86400 * 7, // 7-day TTL
);
// Add to priority sorted set
const score =
PRIORITY_SCORES[notification.priority] * 1e12 +
(1e12 - notification.createdAt.getTime());
pipeline.zadd(QUEUE_KEY, score, notification.id);
await pipeline.exec();
}
export async function dequeue(batchSize = 10): Promise<Notification[]> {
// Atomically move items from queue to processing set
const script = `
local queue_key = KEYS[1]
local processing_key = KEYS[2]
local batch_size = tonumber(ARGV[1])
local now = ARGV[2]
local items = redis.call('ZREVRANGEBYSCORE', queue_key, '+inf', '-inf', 'LIMIT', 0, batch_size)
if #items == 0 then
return {}
end
for _, id in ipairs(items) do
redis.call('ZREM', queue_key, id)
redis.call('ZADD', processing_key, now, id)
end
return items
`;
const ids = (await redis.eval(
script,
2,
QUEUE_KEY,
PROCESSING_KEY,
batchSize,
Date.now(),
)) as string[];
if (ids.length === 0) return [];
// Fetch notification data
const pipeline = redis.pipeline();
for (const id of ids) {
pipeline.get(`${DATA_PREFIX}${id}`);
}
const results = await pipeline.exec();
return (results || [])
.map(([err, data]) => {
if (err || !data) return null;
return JSON.parse(data as string) as Notification;
})
.filter(Boolean) as Notification[];
}
export async function markDelivered(id: string): Promise<void> {
const key = `${DATA_PREFIX}${id}`;
const data = await redis.get(key);
if (!data) return;
const notification = JSON.parse(data) as Notification;
notification.status = "delivered";
notification.deliveredAt = new Date();
await redis.pipeline()
.set(key, JSON.stringify(notification), "EX", 86400 * 7)
.zrem(PROCESSING_KEY, id)
.exec();
}
export async function markFailed(id: string): Promise<void> {
const key = `${DATA_PREFIX}${id}`;
const data = await redis.get(key);
if (!data) return;
const notification = JSON.parse(data) as Notification;
notification.attempts += 1;
notification.lastAttemptAt = new Date();
if (notification.attempts < notification.maxAttempts) {
// Re-enqueue with lower priority for retry
notification.status = "retrying";
await redis.set(key, JSON.stringify(notification), "EX", 86400 * 7);
await redis.zrem(PROCESSING_KEY, id);
// Exponential backoff: re-enqueue after delay
const delay = Math.pow(2, notification.attempts) * 1000;
const score = Date.now() + delay;
await redis.zadd("notifications:retry", score, id);
} else {
notification.status = "failed";
await redis.pipeline()
.set(key, JSON.stringify(notification), "EX", 86400 * 7)
.zrem(PROCESSING_KEY, id)
.sadd("notifications:dead-letter", id)
.exec();
}
}
Channel Dispatchers
// lib/notifications/dispatchers.ts
import type { Notification, Channel } from "./types";
type Dispatcher = (notification: Notification) => Promise<void>;
const dispatchers: Record<Channel, Dispatcher> = {
email: async (notification) => {
await fetch(`${process.env.EMAIL_API_URL}/send`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.EMAIL_API_KEY}`,
},
body: JSON.stringify({
to: notification.metadata?.email,
subject: notification.subject,
html: notification.body,
}),
});
},
push: async (notification) => {
const token = notification.metadata?.pushToken as string;
if (!token) throw new Error("No push token");
await fetch("https://fcm.googleapis.com/fcm/send", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `key=${process.env.FCM_SERVER_KEY}`,
},
body: JSON.stringify({
to: token,
notification: {
title: notification.subject,
body: notification.body.slice(0, 200),
},
}),
});
},
"in-app": async (notification) => {
// Store in database for retrieval via API
const { db } = await import("@/lib/db");
await db.insert({
userId: notification.userId,
title: notification.subject,
body: notification.body,
read: false,
createdAt: new Date(),
});
},
sms: async (notification) => {
const phone = notification.metadata?.phone as string;
if (!phone) throw new Error("No phone number");
await fetch(`${process.env.SMS_API_URL}/messages`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.SMS_API_KEY}`,
},
body: JSON.stringify({
to: phone,
body: `${notification.subject}: ${notification.body.slice(0, 140)}`,
}),
});
},
};
export async function dispatch(notification: Notification): Promise<void> {
const dispatcher = dispatchers[notification.channel];
await dispatcher(notification);
}
Worker Processor
// lib/notifications/worker.ts
import { dequeue, markDelivered, markFailed } from "./queue";
import { dispatch } from "./dispatchers";
import { checkPreferences } from "./preferences";
import type { Notification } from "./types";
async function processNotification(notification: Notification): Promise<void> {
// Check user preferences
const allowed = await checkPreferences(notification.userId, notification.channel);
if (!allowed && notification.priority !== "critical") {
await markDelivered(notification.id); // Skip silently
return;
}
try {
await dispatch(notification);
await markDelivered(notification.id);
} catch (error) {
console.error(`Failed to deliver ${notification.id}:`, error);
await markFailed(notification.id);
}
}
export async function processQueue(): Promise<number> {
const batch = await dequeue(10);
if (batch.length === 0) return 0;
await Promise.allSettled(batch.map(processNotification));
return batch.length;
}
API Route for Processing
// app/api/notifications/process/route.ts
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { processQueue } from "@/lib/notifications/worker";
export async function POST(request: NextRequest) {
// Verify cron secret
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
let total = 0;
let processed = 1;
// Process until queue is empty or time limit
const start = Date.now();
const maxDuration = 25_000; // 25 seconds (Vercel limit padding)
while (processed > 0 && Date.now() - start < maxDuration) {
processed = await processQueue();
total += processed;
}
return NextResponse.json({ processed: total });
}
Sending Notifications
// lib/notifications/send.ts
import { enqueue } from "./queue";
import type { Channel, Priority, Notification } from "./types";
interface SendOptions {
userId: string;
channels: Channel[];
subject: string;
body: string;
priority?: Priority;
metadata?: Record<string, unknown>;
scheduledFor?: Date;
}
export async function sendNotification(options: SendOptions): Promise<string[]> {
const ids: string[] = [];
for (const channel of options.channels) {
const notification: Notification = {
id: crypto.randomUUID(),
userId: options.userId,
channel,
priority: options.priority ?? "normal",
status: "pending",
subject: options.subject,
body: options.body,
metadata: options.metadata,
attempts: 0,
maxAttempts: channel === "email" ? 3 : 2,
scheduledFor: options.scheduledFor,
createdAt: new Date(),
};
await enqueue(notification);
ids.push(notification.id);
}
return ids;
}
// Usage
await sendNotification({
userId: "user-123",
channels: ["email", "in-app"],
subject: "New comment on your post",
body: "<p>Someone commented on your article.</p>",
metadata: { email: "user@example.com" },
});
Need a Notification System?
We build robust notification infrastructure for SaaS products. Contact us for a consultation.