perf: Redis pub/sub, PgBouncer, optimistic UI for high concurrency

- Add Redis 7 for pub/sub (lobby + chat real-time), rate limiting, caching
- Replace SSE DB polling with Redis pub/sub (lobby: instant approval, chat: instant delivery)
- Add PgBouncer (transaction mode, 500 client → 25 pool connections)
- Chat SSE stream via Redis pub/sub instead of 3s polling
- Optimistic UI in ChatPanel (messages appear before server confirms)
- Redis-based rate limiter (works across multiple app replicas)
- Prisma query optimization (select only needed fields)
- Chat message cache in Redis (10s TTL)
- Docker Compose: add redis, pgbouncer services with healthchecks
- Production: resource limits, 2 app replicas behind Traefik
- Update CLAUDE.md, README.md, .env.example, setup.sh

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-22 14:17:25 +03:00
parent 3e50c57ee0
commit a42ec96965
21 changed files with 568 additions and 122 deletions

View File

@@ -2,6 +2,8 @@ import { NextResponse } from "next/server";
import { z } from "zod";
import { prisma } from "@/lib/prisma";
import { getSessionFromRequest } from "@/lib/auth-helpers";
import { redis } from "@/lib/redis";
import { publishChatMessage } from "@/lib/chat-pubsub";
const sendMessageSchema = z.object({
sessionId: z.string().min(1),
@@ -49,6 +51,13 @@ export async function GET(
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
// Check Redis cache first (10s TTL to reduce DB hits during active chat)
const cacheKey = `chat:${roomId}:${cursor || "latest"}:${limit}`;
const cached = await redis.get(cacheKey);
if (cached) {
return NextResponse.json(JSON.parse(cached));
}
const messages = await prisma.chatMessage.findMany({
where: {
roomId,
@@ -56,13 +65,25 @@ export async function GET(
},
orderBy: { createdAt: "desc" },
take: limit,
select: {
id: true,
sessionId: true,
senderName: true,
content: true,
createdAt: true,
},
});
const nextCursor = messages.length === limit
? messages[messages.length - 1].createdAt.toISOString()
: null;
return NextResponse.json({ messages: messages.reverse(), nextCursor });
const result = { messages: messages.reverse(), nextCursor };
// Cache for 10 seconds
await redis.set(cacheKey, JSON.stringify(result), "EX", 10);
return NextResponse.json(result);
}
export async function POST(
@@ -100,5 +121,13 @@ export async function POST(
},
});
// Publish to Redis for real-time delivery via SSE
await publishChatMessage(roomId, {
id: message.id,
senderName: message.senderName,
content: message.content,
createdAt: message.createdAt.toISOString(),
});
return NextResponse.json(message, { status: 201 });
}

View File

@@ -0,0 +1,74 @@
import { prisma } from "@/lib/prisma";
import { subscribeChatMessages } from "@/lib/chat-pubsub";
export async function GET(
req: Request,
{ params }: { params: Promise<{ roomId: string }> }
) {
const { roomId } = await params;
const url = new URL(req.url);
const sessionId = url.searchParams.get("sessionId");
if (!sessionId) {
return new Response(JSON.stringify({ error: "sessionId required" }), {
status: 400,
headers: { "Content-Type": "application/json" },
});
}
// Verify participant
const participant = await prisma.participantHistory.findFirst({
where: { roomId, sessionId, leftAt: null },
});
if (!participant) {
return new Response(JSON.stringify({ error: "Not a participant" }), {
status: 403,
headers: { "Content-Type": "application/json" },
});
}
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const send = (data: Record<string, unknown>) => {
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
} catch { /* stream closed */ }
};
const { unsubscribe } = subscribeChatMessages(roomId, (msg) => {
send(msg);
});
// Keep-alive every 30s
const keepAlive = setInterval(() => {
try {
controller.enqueue(encoder.encode(": keepalive\n\n"));
} catch { /* stream closed */ }
}, 30000);
// Timeout after 1 hour
const timeout = setTimeout(() => cleanup(), 60 * 60 * 1000);
let cleaned = false;
function cleanup() {
if (cleaned) return;
cleaned = true;
clearInterval(keepAlive);
clearTimeout(timeout);
unsubscribe().catch(() => {});
try { controller.close(); } catch { /* already closed */ }
}
req.signal.addEventListener("abort", () => cleanup());
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}

View File

@@ -3,6 +3,7 @@ import { z } from "zod";
import { compare } from "bcryptjs";
import { prisma } from "@/lib/prisma";
import { createToken } from "@/lib/livekit";
import { checkRateLimit } from "@/lib/rate-limit";
const joinSchema = z.object({
displayName: z.string().min(1).max(100),
@@ -11,11 +12,6 @@ const joinSchema = z.object({
sessionFingerprint: z.string().min(1),
});
// CRITICAL-10: In-memory rate limiter for PIN attempts
const pinAttempts = new Map<string, { count: number; resetTime: number }>();
const PIN_MAX_ATTEMPTS = 5;
const PIN_WINDOW_MS = 60_000; // 1 minute
export async function POST(
req: Request,
{ params }: { params: Promise<{ roomId: string }> }
@@ -63,22 +59,14 @@ export async function POST(
return NextResponse.json({ error: "PIN required" }, { status: 401 });
}
// Rate limit PIN attempts by roomId + IP
const ip = req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() || "unknown";
const rateLimitKey = `${roomId}:${ip}`;
const now = Date.now();
const attempt = pinAttempts.get(rateLimitKey);
if (attempt && now < attempt.resetTime) {
if (attempt.count >= PIN_MAX_ATTEMPTS) {
return NextResponse.json(
{ error: "Too many PIN attempts. Try again later." },
{ status: 429 }
);
}
attempt.count++;
} else {
pinAttempts.set(rateLimitKey, { count: 1, resetTime: now + PIN_WINDOW_MS });
// Rate limit PIN attempts by roomId + IP (Redis-based)
const clientIp = req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() || "unknown";
const { allowed } = await checkRateLimit(`pin:${roomId}:${clientIp}`, 5, 60);
if (!allowed) {
return NextResponse.json(
{ error: "Too many PIN attempts. Try again later." },
{ status: 429 }
);
}
const valid = await compare(pin, room.pinHash);

View File

@@ -3,6 +3,7 @@ import { z } from "zod";
import { prisma } from "@/lib/prisma";
import { getSessionFromRequest } from "@/lib/auth-helpers";
import { createToken } from "@/lib/livekit";
import { publishLobbyEvent } from "@/lib/lobby-pubsub";
const lobbyActionSchema = z.object({
lobbyEntryId: z.string().min(1),
@@ -74,6 +75,7 @@ export async function POST(
where: { id: lobbyEntryId },
data: { status: "REJECTED" },
});
await publishLobbyEvent(roomId, entry.sessionId, { status: "REJECTED" });
return NextResponse.json({ status: "rejected" });
}
@@ -105,9 +107,11 @@ export async function POST(
}),
]);
// Store token temporarily in lobbyEntry metadata — we use the updatedAt as a signal
// The SSE endpoint will query the entry status and generate a fresh token
// We store it via a simple in-memory approach or re-generate in SSE
// For simplicity, we return it here too (host can relay it)
// Publish to Redis so the SSE stream picks it up instantly
await publishLobbyEvent(roomId, entry.sessionId, {
status: "APPROVED",
token,
});
return NextResponse.json({ status: "approved", token });
}

View File

@@ -1,5 +1,6 @@
import { prisma } from "@/lib/prisma";
import { createToken } from "@/lib/livekit";
import { subscribeLobbyEvents } from "@/lib/lobby-pubsub";
export async function GET(
req: Request,
@@ -16,89 +17,103 @@ export async function GET(
});
}
const room = await prisma.room.findUnique({ where: { id: roomId } });
if (!room) {
return new Response(JSON.stringify({ error: "Room not found" }), {
status: 404,
headers: { "Content-Type": "application/json" },
});
}
// CRITICAL-8: Verify sessionId matches an existing LobbyEntry for this room
// Verify sessionId has a LobbyEntry for this room
const existingEntry = await prisma.lobbyEntry.findFirst({
where: { roomId, sessionId },
orderBy: { createdAt: "desc" },
});
if (!existingEntry) {
return new Response(JSON.stringify({ error: "No lobby entry found for this session" }), {
status: 403,
headers: { "Content-Type": "application/json" },
return new Response(
JSON.stringify({ error: "No lobby entry found for this session" }),
{ status: 403, headers: { "Content-Type": "application/json" } }
);
}
// Check if already approved/rejected
if (existingEntry.status === "APPROVED") {
const freshRoom = await prisma.room.findUnique({ where: { id: roomId } });
if (!freshRoom?.livekitRoomName) {
return new Response(
JSON.stringify({ error: "Room not started yet" }),
{ status: 400, headers: { "Content-Type": "application/json" } }
);
}
const canPublish = !freshRoom.webinarMode;
const token = await createToken(sessionId, freshRoom.livekitRoomName, {
name: existingEntry.displayName,
canPublish,
canSubscribe: true,
canPublishData: true,
});
const body = `data: ${JSON.stringify({ status: "APPROVED", token })}\n\n`;
return new Response(body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
if (existingEntry.status === "REJECTED") {
const body = `data: ${JSON.stringify({ status: "REJECTED" })}\n\n`;
return new Response(body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
// PENDING — subscribe to Redis channel and wait for event
const stream = new ReadableStream({
async start(controller) {
start(controller) {
const encoder = new TextEncoder();
const send = (data: Record<string, unknown>) => {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
try {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
} catch {
/* stream may be closed */
}
};
let resolved = false;
const maxPolls = 150; // 5 minutes at 2s intervals
// Send initial PENDING status
send({ status: "PENDING" });
for (let i = 0; i < maxPolls && !resolved; i++) {
try {
const entry = await prisma.lobbyEntry.findFirst({
where: { roomId, sessionId },
orderBy: { createdAt: "desc" },
});
if (!entry) {
send({ status: "error", message: "Lobby entry not found" });
resolved = true;
break;
}
if (entry.status === "APPROVED") {
// Re-fetch room to get fresh livekitRoomName (may have been started after lobby entry)
const freshRoom = await prisma.room.findUnique({ where: { id: roomId } });
if (!freshRoom?.livekitRoomName) {
send({ status: "error", message: "Room not started yet" });
resolved = true;
break;
}
const canPublish = !freshRoom.webinarMode;
const token = await createToken(sessionId, freshRoom.livekitRoomName, {
name: entry.displayName,
canPublish,
canSubscribe: true,
canPublishData: true,
});
send({ status: "APPROVED", token });
resolved = true;
break;
}
if (entry.status === "REJECTED") {
send({ status: "REJECTED" });
resolved = true;
break;
}
send({ status: "PENDING" });
} catch {
send({ status: "error", message: "Internal error" });
resolved = true;
break;
// Subscribe to Redis pub/sub
const { unsubscribe } = subscribeLobbyEvents(
roomId,
sessionId,
(event) => {
send(event);
cleanup();
}
);
await new Promise((r) => setTimeout(r, 2000));
}
if (!resolved) {
// Timeout after 5 minutes
const timeout = setTimeout(() => {
send({ status: "timeout" });
cleanup();
}, 5 * 60 * 1000);
let cleaned = false;
function cleanup() {
if (cleaned) return;
cleaned = true;
clearTimeout(timeout);
unsubscribe().catch(() => {});
try {
controller.close();
} catch {
/* already closed */
}
}
controller.close();
// Handle client disconnect via AbortSignal
req.signal.addEventListener("abort", () => {
cleanup();
});
},
});

View File

@@ -59,6 +59,18 @@ export async function GET(req: Request) {
const rooms = await prisma.room.findMany({
where,
orderBy: { createdAt: "desc" },
select: {
id: true,
name: true,
code: true,
status: true,
lobbyEnabled: true,
webinarMode: true,
isLocked: true,
createdAt: true,
startedAt: true,
endedAt: true,
},
});
return NextResponse.json(rooms);

View File

@@ -1,6 +1,6 @@
"use client";
import { useState, useEffect, useRef, type FormEvent } from "react";
import { useState, useEffect, useRef, useCallback, type FormEvent } from "react";
interface ChatPanelProps {
roomId: string;
@@ -20,29 +20,53 @@ export default function ChatPanel({ roomId, sessionId, senderName }: ChatPanelPr
const [input, setInput] = useState("");
const [sending, setSending] = useState(false);
const bottomRef = useRef<HTMLDivElement>(null);
const pollRef = useRef<ReturnType<typeof setInterval>>(undefined);
const seenIds = useRef(new Set<string>());
const addMessage = useCallback((msg: ChatMessage) => {
if (seenIds.current.has(msg.id)) return;
seenIds.current.add(msg.id);
setMessages((prev) => [...prev, msg]);
}, []);
// Load history + connect SSE
useEffect(() => {
async function fetchMessages() {
let eventSource: EventSource | null = null;
async function init() {
// Fetch history
try {
const res = await fetch(`/api/rooms/${roomId}/chat?sessionId=${sessionId}`);
if (res.ok) {
const data = await res.json();
setMessages(data.messages);
const msgs = data.messages as ChatMessage[];
msgs.forEach((m) => seenIds.current.add(m.id));
setMessages(msgs);
}
} catch {
// silent
}
} catch { /* silent */ }
// Connect SSE for real-time messages
eventSource = new EventSource(
`/api/rooms/${roomId}/chat/stream?sessionId=${sessionId}`
);
eventSource.onmessage = (e) => {
try {
const msg = JSON.parse(e.data) as ChatMessage;
addMessage(msg);
} catch { /* ignore */ }
};
eventSource.onerror = () => {
// Reconnect handled by browser automatically
};
}
fetchMessages();
pollRef.current = setInterval(fetchMessages, 3000);
init();
return () => {
if (pollRef.current) clearInterval(pollRef.current);
eventSource?.close();
};
}, [roomId]);
}, [roomId, sessionId, addMessage]);
// Auto-scroll
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
@@ -51,20 +75,36 @@ export default function ChatPanel({ roomId, sessionId, senderName }: ChatPanelPr
e.preventDefault();
if (!input.trim() || sending) return;
const content = input.trim();
// Optimistic UI — show message immediately
const optimisticMsg: ChatMessage = {
id: `temp-${Date.now()}`,
senderName,
content,
createdAt: new Date().toISOString(),
};
setMessages((prev) => [...prev, optimisticMsg]);
setInput("");
setSending(true);
try {
const res = await fetch(`/api/rooms/${roomId}/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ sessionId, senderName, content: input.trim() }),
body: JSON.stringify({ sessionId, senderName, content }),
});
if (res.ok) {
const msg = await res.json();
setMessages((prev) => [...prev, msg]);
setInput("");
// Replace optimistic message with real one
seenIds.current.add(msg.id);
setMessages((prev) =>
prev.map((m) => (m.id === optimisticMsg.id ? { ...msg, createdAt: msg.createdAt } : m))
);
}
} catch {
// silent
// Remove optimistic message on failure
setMessages((prev) => prev.filter((m) => m.id !== optimisticMsg.id));
} finally {
setSending(false);
}

36
src/lib/chat-pubsub.ts Normal file
View File

@@ -0,0 +1,36 @@
import { redis, createSubscriber } from "./redis";
const CHAT_CHANNEL = (roomId: string) => `chat:${roomId}`;
export type ChatEvent = {
id: string;
senderName: string;
content: string;
createdAt: string;
};
export async function publishChatMessage(roomId: string, message: ChatEvent) {
await redis.publish(CHAT_CHANNEL(roomId), JSON.stringify(message));
}
export function subscribeChatMessages(
roomId: string,
onMessage: (msg: ChatEvent) => void
): { unsubscribe: () => Promise<void> } {
const sub = createSubscriber();
const channel = CHAT_CHANNEL(roomId);
sub.subscribe(channel);
sub.on("message", (_ch: string, data: string) => {
try {
onMessage(JSON.parse(data) as ChatEvent);
} catch { /* ignore */ }
});
return {
async unsubscribe() {
await sub.unsubscribe(channel);
await sub.quit();
},
};
}

46
src/lib/lobby-pubsub.ts Normal file
View File

@@ -0,0 +1,46 @@
import { redis, createSubscriber } from "./redis";
const LOBBY_CHANNEL = (roomId: string, sessionId: string) =>
`lobby:${roomId}:${sessionId}`;
export type LobbyEvent = {
status: "APPROVED" | "REJECTED";
token?: string;
};
export async function publishLobbyEvent(
roomId: string,
sessionId: string,
event: LobbyEvent
) {
await redis.publish(
LOBBY_CHANNEL(roomId, sessionId),
JSON.stringify(event)
);
}
export function subscribeLobbyEvents(
roomId: string,
sessionId: string,
onEvent: (event: LobbyEvent) => void
): { unsubscribe: () => Promise<void> } {
const sub = createSubscriber();
const channel = LOBBY_CHANNEL(roomId, sessionId);
sub.subscribe(channel);
sub.on("message", (_ch: string, message: string) => {
try {
const event = JSON.parse(message) as LobbyEvent;
onEvent(event);
} catch {
/* ignore malformed */
}
});
return {
async unsubscribe() {
await sub.unsubscribe(channel);
await sub.quit();
},
};
}

17
src/lib/rate-limit.ts Normal file
View File

@@ -0,0 +1,17 @@
import { redis } from "./redis";
export async function checkRateLimit(
key: string,
maxAttempts: number,
windowSeconds: number
): Promise<{ allowed: boolean; remaining: number }> {
const redisKey = `ratelimit:${key}`;
const current = await redis.incr(redisKey);
if (current === 1) {
await redis.expire(redisKey, windowSeconds);
}
return {
allowed: current <= maxAttempts,
remaining: Math.max(0, maxAttempts - current),
};
}

20
src/lib/redis.ts Normal file
View File

@@ -0,0 +1,20 @@
import Redis from "ioredis";
const globalForRedis = globalThis as unknown as { redis: Redis | undefined };
export const redis =
globalForRedis.redis ??
new Redis(process.env.REDIS_URL || "redis://localhost:6379", {
maxRetriesPerRequest: 3,
lazyConnect: true,
});
if (process.env.NODE_ENV !== "production") globalForRedis.redis = redis;
// Separate instance for pub/sub (Redis requires dedicated connections for subscribers)
export function createSubscriber() {
return new Redis(process.env.REDIS_URL || "redis://localhost:6379", {
maxRetriesPerRequest: 3,
lazyConnect: true,
});
}