Skip to main content
Back to Blog
Tutorials
4 min read
January 30, 2025

How to Build a Notification Queue System in Next.js

Build a notification queue with priority levels, retry logic, batching, and delivery tracking for email, push, and in-app channels.

Ryel Banfield

Founder & Lead Developer

Notification systems need to handle multiple channels, retry failures, respect user preferences, and process messages reliably. A queue-based architecture handles this well.

Notification Types

export type Channel = "email" | "push" | "in-app" | "sms";
export type Priority = "critical" | "high" | "normal" | "low";
export type NotificationStatus = "pending" | "processing" | "delivered" | "failed" | "retrying";

export interface Notification {
  id: string;
  userId: string;
  channel: Channel;
  priority: Priority;
  status: NotificationStatus;
  subject: string;
  body: string;
  metadata?: Record<string, unknown>;
  attempts: number;
  maxAttempts: number;
  lastAttemptAt?: Date;
  scheduledFor?: Date;
  deliveredAt?: Date;
  createdAt: Date;
}

export interface UserPreferences {
  userId: string;
  channels: Partial<Record<Channel, boolean>>;
  quietHoursStart?: string; // "22:00"
  quietHoursEnd?: string; // "08:00"
  timezone: string;
}

Queue Implementation

// lib/notifications/queue.ts
import { redis } from "@/lib/redis";
import type { Notification, Priority } from "./types";

const PRIORITY_SCORES: Record<Priority, number> = {
  critical: 4,
  high: 3,
  normal: 2,
  low: 1,
};

const QUEUE_KEY = "notifications:queue";
const PROCESSING_KEY = "notifications:processing";
const DATA_PREFIX = "notifications:data:";

export async function enqueue(notification: Notification): Promise<void> {
  const pipeline = redis.pipeline();

  // Store notification data
  pipeline.set(
    `${DATA_PREFIX}${notification.id}`,
    JSON.stringify(notification),
    "EX",
    86400 * 7, // 7-day TTL
  );

  // Add to priority sorted set
  const score =
    PRIORITY_SCORES[notification.priority] * 1e12 +
    (1e12 - notification.createdAt.getTime());
  pipeline.zadd(QUEUE_KEY, score, notification.id);

  await pipeline.exec();
}

export async function dequeue(batchSize = 10): Promise<Notification[]> {
  // Atomically move items from queue to processing set
  const script = `
    local queue_key = KEYS[1]
    local processing_key = KEYS[2]
    local batch_size = tonumber(ARGV[1])
    local now = ARGV[2]

    local items = redis.call('ZREVRANGEBYSCORE', queue_key, '+inf', '-inf', 'LIMIT', 0, batch_size)
    
    if #items == 0 then
      return {}
    end

    for _, id in ipairs(items) do
      redis.call('ZREM', queue_key, id)
      redis.call('ZADD', processing_key, now, id)
    end

    return items
  `;

  const ids = (await redis.eval(
    script,
    2,
    QUEUE_KEY,
    PROCESSING_KEY,
    batchSize,
    Date.now(),
  )) as string[];

  if (ids.length === 0) return [];

  // Fetch notification data
  const pipeline = redis.pipeline();
  for (const id of ids) {
    pipeline.get(`${DATA_PREFIX}${id}`);
  }
  const results = await pipeline.exec();

  return (results || [])
    .map(([err, data]) => {
      if (err || !data) return null;
      return JSON.parse(data as string) as Notification;
    })
    .filter(Boolean) as Notification[];
}

export async function markDelivered(id: string): Promise<void> {
  const key = `${DATA_PREFIX}${id}`;
  const data = await redis.get(key);
  if (!data) return;

  const notification = JSON.parse(data) as Notification;
  notification.status = "delivered";
  notification.deliveredAt = new Date();

  await redis.pipeline()
    .set(key, JSON.stringify(notification), "EX", 86400 * 7)
    .zrem(PROCESSING_KEY, id)
    .exec();
}

export async function markFailed(id: string): Promise<void> {
  const key = `${DATA_PREFIX}${id}`;
  const data = await redis.get(key);
  if (!data) return;

  const notification = JSON.parse(data) as Notification;
  notification.attempts += 1;
  notification.lastAttemptAt = new Date();

  if (notification.attempts < notification.maxAttempts) {
    // Re-enqueue with lower priority for retry
    notification.status = "retrying";
    await redis.set(key, JSON.stringify(notification), "EX", 86400 * 7);
    await redis.zrem(PROCESSING_KEY, id);

    // Exponential backoff: re-enqueue after delay
    const delay = Math.pow(2, notification.attempts) * 1000;
    const score = Date.now() + delay;
    await redis.zadd("notifications:retry", score, id);
  } else {
    notification.status = "failed";
    await redis.pipeline()
      .set(key, JSON.stringify(notification), "EX", 86400 * 7)
      .zrem(PROCESSING_KEY, id)
      .sadd("notifications:dead-letter", id)
      .exec();
  }
}

Channel Dispatchers

// lib/notifications/dispatchers.ts
import type { Notification, Channel } from "./types";

type Dispatcher = (notification: Notification) => Promise<void>;

const dispatchers: Record<Channel, Dispatcher> = {
  email: async (notification) => {
    await fetch(`${process.env.EMAIL_API_URL}/send`, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${process.env.EMAIL_API_KEY}`,
      },
      body: JSON.stringify({
        to: notification.metadata?.email,
        subject: notification.subject,
        html: notification.body,
      }),
    });
  },

  push: async (notification) => {
    const token = notification.metadata?.pushToken as string;
    if (!token) throw new Error("No push token");

    await fetch("https://fcm.googleapis.com/fcm/send", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `key=${process.env.FCM_SERVER_KEY}`,
      },
      body: JSON.stringify({
        to: token,
        notification: {
          title: notification.subject,
          body: notification.body.slice(0, 200),
        },
      }),
    });
  },

  "in-app": async (notification) => {
    // Store in database for retrieval via API
    const { db } = await import("@/lib/db");
    await db.insert({
      userId: notification.userId,
      title: notification.subject,
      body: notification.body,
      read: false,
      createdAt: new Date(),
    });
  },

  sms: async (notification) => {
    const phone = notification.metadata?.phone as string;
    if (!phone) throw new Error("No phone number");

    await fetch(`${process.env.SMS_API_URL}/messages`, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${process.env.SMS_API_KEY}`,
      },
      body: JSON.stringify({
        to: phone,
        body: `${notification.subject}: ${notification.body.slice(0, 140)}`,
      }),
    });
  },
};

export async function dispatch(notification: Notification): Promise<void> {
  const dispatcher = dispatchers[notification.channel];
  await dispatcher(notification);
}

Worker Processor

// lib/notifications/worker.ts
import { dequeue, markDelivered, markFailed } from "./queue";
import { dispatch } from "./dispatchers";
import { checkPreferences } from "./preferences";
import type { Notification } from "./types";

async function processNotification(notification: Notification): Promise<void> {
  // Check user preferences
  const allowed = await checkPreferences(notification.userId, notification.channel);
  if (!allowed && notification.priority !== "critical") {
    await markDelivered(notification.id); // Skip silently
    return;
  }

  try {
    await dispatch(notification);
    await markDelivered(notification.id);
  } catch (error) {
    console.error(`Failed to deliver ${notification.id}:`, error);
    await markFailed(notification.id);
  }
}

export async function processQueue(): Promise<number> {
  const batch = await dequeue(10);
  if (batch.length === 0) return 0;

  await Promise.allSettled(batch.map(processNotification));
  return batch.length;
}

API Route for Processing

// app/api/notifications/process/route.ts
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { processQueue } from "@/lib/notifications/worker";

export async function POST(request: NextRequest) {
  // Verify cron secret
  const authHeader = request.headers.get("authorization");
  if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
    return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
  }

  let total = 0;
  let processed = 1;

  // Process until queue is empty or time limit
  const start = Date.now();
  const maxDuration = 25_000; // 25 seconds (Vercel limit padding)

  while (processed > 0 && Date.now() - start < maxDuration) {
    processed = await processQueue();
    total += processed;
  }

  return NextResponse.json({ processed: total });
}

Sending Notifications

// lib/notifications/send.ts
import { enqueue } from "./queue";
import type { Channel, Priority, Notification } from "./types";

interface SendOptions {
  userId: string;
  channels: Channel[];
  subject: string;
  body: string;
  priority?: Priority;
  metadata?: Record<string, unknown>;
  scheduledFor?: Date;
}

export async function sendNotification(options: SendOptions): Promise<string[]> {
  const ids: string[] = [];

  for (const channel of options.channels) {
    const notification: Notification = {
      id: crypto.randomUUID(),
      userId: options.userId,
      channel,
      priority: options.priority ?? "normal",
      status: "pending",
      subject: options.subject,
      body: options.body,
      metadata: options.metadata,
      attempts: 0,
      maxAttempts: channel === "email" ? 3 : 2,
      scheduledFor: options.scheduledFor,
      createdAt: new Date(),
    };

    await enqueue(notification);
    ids.push(notification.id);
  }

  return ids;
}

// Usage
await sendNotification({
  userId: "user-123",
  channels: ["email", "in-app"],
  subject: "New comment on your post",
  body: "<p>Someone commented on your article.</p>",
  metadata: { email: "user@example.com" },
});

Need a Notification System?

We build robust notification infrastructure for SaaS products. Contact us for a consultation.

notificationsqueuebackground jobsNext.jsRedistutorial

Ready to Start Your Project?

RCB Software builds world-class websites and applications for businesses worldwide.

Get in Touch

Related Articles