From a42ec969653df681ace220731223bc45aad9c6d6 Mon Sep 17 00:00:00 2001 From: joylessorchid Date: Sun, 22 Mar 2026 14:17:25 +0300 Subject: [PATCH] perf: Redis pub/sub, PgBouncer, optimistic UI for high concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Redis 7 for pub/sub (lobby + chat real-time), rate limiting, caching - Replace SSE DB polling with Redis pub/sub (lobby: instant approval, chat: instant delivery) - Add PgBouncer (transaction mode, 500 client → 25 pool connections) - Chat SSE stream via Redis pub/sub instead of 3s polling - Optimistic UI in ChatPanel (messages appear before server confirms) - Redis-based rate limiter (works across multiple app replicas) - Prisma query optimization (select only needed fields) - Chat message cache in Redis (10s TTL) - Docker Compose: add redis, pgbouncer services with healthchecks - Production: resource limits, 2 app replicas behind Traefik - Update CLAUDE.md, README.md, .env.example, setup.sh Co-Authored-By: Claude Opus 4.6 --- .env.example | 3 + .gitignore | 1 + CLAUDE.md | 33 +++- README.md | 7 +- docker-compose.prod.yml | 25 +++ docker-compose.yml | 37 ++++- next.config.ts | 2 +- package-lock.json | 81 +++++++++- package.json | 1 + setup.sh | 7 +- src/app/api/rooms/[roomId]/chat/route.ts | 31 +++- .../api/rooms/[roomId]/chat/stream/route.ts | 74 +++++++++ src/app/api/rooms/[roomId]/join/route.ts | 30 ++-- src/app/api/rooms/[roomId]/lobby/route.ts | 12 +- .../api/rooms/[roomId]/lobby/stream/route.ts | 145 ++++++++++-------- src/app/api/rooms/route.ts | 12 ++ src/components/room/ChatPanel.tsx | 70 +++++++-- src/lib/chat-pubsub.ts | 36 +++++ src/lib/lobby-pubsub.ts | 46 ++++++ src/lib/rate-limit.ts | 17 ++ src/lib/redis.ts | 20 +++ 21 files changed, 568 insertions(+), 122 deletions(-) create mode 100644 src/app/api/rooms/[roomId]/chat/stream/route.ts create mode 100644 src/lib/chat-pubsub.ts create mode 100644 src/lib/lobby-pubsub.ts create mode 100644 src/lib/rate-limit.ts create mode 100644 src/lib/redis.ts diff --git a/.env.example b/.env.example index b5e7039..2f74fd1 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,9 @@ S3_BUCKET=liveserver MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin +# === Redis === +REDIS_URL=redis://localhost:6379 + # === Auth === BETTER_AUTH_SECRET=your-secret-key-change-in-production BETTER_AUTH_URL=http://localhost:3000 diff --git a/.gitignore b/.gitignore index 253eb4c..d056bde 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,4 @@ docker-compose.override.yml .eslintcache coverage/ *.lcov +.claude/ diff --git a/CLAUDE.md b/CLAUDE.md index f58fb77..7599c1e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -22,7 +22,8 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co | Backend | Next.js Route Handlers, `livekit-server-sdk` | | Auth | `better-auth` + Prisma adapter | | AI Agent | Python, `livekit-agents`, `livekit-plugins-deepgram` (STT), `livekit-plugins-openai` (LLM) | -| DB | PostgreSQL + Prisma 7 ORM | +| DB | PostgreSQL + Prisma 7 ORM + PgBouncer (connection pooling) | +| Cache/PubSub | Redis 7 (rate limiting, lobby pub/sub, chat real-time, caching) | | Storage | MinIO (S3-compatible) | | Proxy | Traefik v3 + Let's Encrypt (prod only) | @@ -44,12 +45,12 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co │ ├── components/ │ │ ├── room/ # ChatPanel, ModerationPanel │ │ └── lobby/ # WaitingRoom, LobbyManager -│ ├── lib/ # prisma, auth, auth-helpers, livekit +│ ├── lib/ # prisma, auth, auth-helpers, livekit, redis, rate-limit, lobby-pubsub, chat-pubsub │ ├── middleware.ts # Dev protection (DEV_ACCESS_KEY, ALLOWED_IPS) │ └── types/ ├── ai-agent/ # Python LiveKit Agent ├── prisma/schema.prisma # 9 models, 3 enums -├── docker-compose.yml # Base: postgres, minio, ai-agent, app +├── docker-compose.yml # Base: postgres, minio, redis, pgbouncer, ai-agent, app ├── docker-compose.override.yml # Local dev: direct ports, no SSL ├── docker-compose.prod.yml # Traefik + Let's Encrypt └── Dockerfile # Multi-stage Next.js standalone build @@ -58,12 +59,15 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ### Key Data Flows 1. **Guest join (with lobby):** - Guest → POST `/api/rooms/[id]/join` → LobbyEntry(PENDING) → SSE `/api/rooms/[id]/lobby/stream` → Host approves → LiveKit token → connect + Guest → POST `/api/rooms/[id]/join` → LobbyEntry(PENDING) → SSE via Redis pub/sub → Host approves → LiveKit token → connect -2. **Real-time transcription:** +2. **Real-time chat:** + POST message → DB + Redis PUBLISH → SSE `/api/rooms/[id]/chat/stream` → instant delivery to all participants + +3. **Real-time transcription:** Audio tracks → AI Agent (Deepgram STT) → DataChannel → Live captions -3. **Post-lecture (`room_finished` webhook):** +4. **Post-lecture (`room_finished` webhook):** Transcript → OpenAI GPT → LectureArtifact → `/lectures/[id]` ### User Roles @@ -82,12 +86,27 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co - `DEV_ACCESS_KEY` middleware: protects local dev from network access - LiveKit webhook: signature verification via `WebhookReceiver` +## Performance Architecture + +``` +Clients → Traefik (LB) → Next.js (x2 replicas) → PgBouncer (pool 25, max 500) → PostgreSQL + → Redis (pub/sub, cache, rate limit) + → LiveKit (video/audio SFU) +``` + +- **Redis pub/sub** replaces DB polling for lobby SSE and chat — instant delivery (<10ms) +- **PgBouncer** multiplexes 500 client connections into 25 real PostgreSQL connections +- **Chat SSE** via Redis pub/sub + optimistic UI on client — messages appear before server confirms +- **Rate limiting** via Redis INCR+EXPIRE — works across multiple app replicas +- **Chat cache** in Redis (10s TTL) — reduces DB reads during active rooms +- **Prisma select** — only fetch needed columns, not full rows + ## Commands ```bash # Dev npm run dev # Next.js dev server (localhost:3000) -docker compose up -d postgres minio # DB + Storage only +docker compose up -d postgres minio redis # DB + Storage + Redis npm run lint # TypeScript type-check (tsc --noEmit) # Database diff --git a/README.md b/README.md index a199a74..0b17484 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,8 @@ - **Backend:** Next.js Route Handlers, `livekit-server-sdk` - **Auth:** `better-auth` + Prisma adapter - **AI Agent:** Python, `livekit-agents`, Deepgram STT, OpenAI GPT -- **DB:** PostgreSQL + Prisma 7 +- **DB:** PostgreSQL + Prisma 7 + PgBouncer (connection pooling) +- **Cache/PubSub:** Redis 7 (real-time chat, lobby, rate limiting) - **Storage:** MinIO (S3-compatible) - **Proxy:** Traefik v3 + Let's Encrypt (production) @@ -53,10 +54,10 @@ cp .env.example .env ### 3. Запустить базу данных ```bash -docker compose up -d postgres minio +docker compose up -d postgres minio redis ``` -Поднимет PostgreSQL на `localhost:5432` и MinIO на `localhost:9000` (консоль: `localhost:9001`). +Поднимет PostgreSQL (`localhost:5432`), MinIO (`localhost:9000`, консоль: `9001`), Redis (`localhost:6379`), PgBouncer (`localhost:6432`). ### 4. Применить миграции diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 962a623..a204b97 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -25,6 +25,13 @@ services: environment: NEXT_PUBLIC_APP_URL: https://${DOMAIN} BETTER_AUTH_URL: https://${DOMAIN} + deploy: + resources: + limits: + memory: 1G + reservations: + memory: 512M + replicas: 2 labels: - "traefik.enable=true" - "traefik.http.routers.app.rule=Host(`${DOMAIN}`)" @@ -46,5 +53,23 @@ services: - "traefik.http.routers.minio-console.service=minio-console" - "traefik.http.services.minio-console.loadbalancer.server.port=9001" + redis: + deploy: + resources: + limits: + memory: 256M + + postgres: + deploy: + resources: + limits: + memory: 1G + + ai-agent: + deploy: + resources: + limits: + memory: 2G + volumes: letsencrypt_data: diff --git a/docker-compose.yml b/docker-compose.yml index 73463b5..c4418ab 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,9 +6,14 @@ services: depends_on: postgres: condition: service_healthy + redis: + condition: service_healthy + pgbouncer: + condition: service_healthy env_file: .env environment: - DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@postgres:5432/liveserver + DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@pgbouncer:6432/liveserver + REDIS_URL: redis://redis:6379 restart: unless-stopped postgres: @@ -36,6 +41,34 @@ services: - minio_data:/data restart: unless-stopped + redis: + image: redis:7-alpine + command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru --save "" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + restart: unless-stopped + + pgbouncer: + image: edoburu/pgbouncer:latest + depends_on: + postgres: + condition: service_healthy + environment: + DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@postgres:5432/liveserver + POOL_MODE: transaction + MAX_CLIENT_CONN: 500 + DEFAULT_POOL_SIZE: 25 + MIN_POOL_SIZE: 5 + healthcheck: + test: ["CMD-SHELL", "pg_isready -h 127.0.0.1 -p 6432"] + interval: 5s + timeout: 3s + retries: 5 + restart: unless-stopped + ai-agent: build: ./ai-agent depends_on: @@ -43,7 +76,7 @@ services: condition: service_healthy env_file: .env environment: - DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@postgres:5432/liveserver + DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD:-postgres}@pgbouncer:6432/liveserver restart: unless-stopped volumes: diff --git a/next.config.ts b/next.config.ts index 755bf9e..eb3e7f5 100644 --- a/next.config.ts +++ b/next.config.ts @@ -2,7 +2,7 @@ import type { NextConfig } from "next"; const nextConfig: NextConfig = { output: "standalone", - serverExternalPackages: ["@prisma/client", "bcryptjs"], + serverExternalPackages: ["@prisma/client", "bcryptjs", "ioredis"], }; export default nextConfig; diff --git a/package-lock.json b/package-lock.json index 58b17de..c1e14fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,6 +18,7 @@ "@types/react-dom": "^19.2.3", "bcryptjs": "^3.0.3", "better-auth": "^1.5.5", + "ioredis": "^5.10.1", "livekit-client": "^2.17.3", "livekit-server-sdk": "^2.15.0", "next": "^16.2.1", @@ -1281,6 +1282,12 @@ "url": "https://opencollective.com/libvips" } }, + "node_modules/@ioredis/commands": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.5.1.tgz", + "integrity": "sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==", + "license": "MIT" + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.13", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.13.tgz", @@ -3494,6 +3501,15 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -3628,7 +3644,6 @@ "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", - "dev": true, "license": "MIT", "dependencies": { "ms": "^2.1.3" @@ -5073,6 +5088,30 @@ "node": ">= 0.4" } }, + "node_modules/ioredis": { + "version": "5.10.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", + "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.5.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/is-array-buffer": { "version": "3.0.5", "resolved": "https://registry.npmjs.org/is-array-buffer/-/is-array-buffer-3.0.5.tgz", @@ -6021,6 +6060,18 @@ "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", "license": "MIT" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.merge": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/lodash.merge/-/lodash.merge-4.6.2.tgz", @@ -6235,7 +6286,6 @@ "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", - "dev": true, "license": "MIT" }, "node_modules/mysql2": { @@ -6982,6 +7032,27 @@ "url": "https://paulmillr.com/funding/" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.10.tgz", @@ -7485,6 +7556,12 @@ "dev": true, "license": "MIT" }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/std-env": { "version": "3.10.0", "resolved": "https://registry.npmjs.org/std-env/-/std-env-3.10.0.tgz", diff --git a/package.json b/package.json index 6cf1cd7..e94e775 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "@types/react-dom": "^19.2.3", "bcryptjs": "^3.0.3", "better-auth": "^1.5.5", + "ioredis": "^5.10.1", "livekit-client": "^2.17.3", "livekit-server-sdk": "^2.15.0", "next": "^16.2.1", diff --git a/setup.sh b/setup.sh index 5cc57a7..8e4dff3 100644 --- a/setup.sh +++ b/setup.sh @@ -194,6 +194,11 @@ ACME_EMAIL=${ACME_EMAIL} DATABASE_URL=postgresql://postgres:${PG_PASSWORD}@localhost:5432/liveserver POSTGRES_PASSWORD=${PG_PASSWORD} +# Redis +# For Docker containers: redis://redis:6379 +# For local npm run dev: redis://localhost:6379 +REDIS_URL=redis://localhost:6379 + # LiveKit LIVEKIT_URL=${LK_URL} NEXT_PUBLIC_LIVEKIT_URL=${LK_URL} @@ -246,7 +251,7 @@ echo -e " Запуск Docker контейнеров..." if [[ "$MODE" == "2" ]]; then docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d --build 2>&1 | tail -5 else - docker compose up -d postgres minio 2>&1 | tail -5 + docker compose up -d postgres minio redis 2>&1 | tail -5 fi log_ok "Docker контейнеры запущены" diff --git a/src/app/api/rooms/[roomId]/chat/route.ts b/src/app/api/rooms/[roomId]/chat/route.ts index 2832d1d..6ec3e31 100644 --- a/src/app/api/rooms/[roomId]/chat/route.ts +++ b/src/app/api/rooms/[roomId]/chat/route.ts @@ -2,6 +2,8 @@ import { NextResponse } from "next/server"; import { z } from "zod"; import { prisma } from "@/lib/prisma"; import { getSessionFromRequest } from "@/lib/auth-helpers"; +import { redis } from "@/lib/redis"; +import { publishChatMessage } from "@/lib/chat-pubsub"; const sendMessageSchema = z.object({ sessionId: z.string().min(1), @@ -49,6 +51,13 @@ export async function GET( return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); } + // Check Redis cache first (10s TTL to reduce DB hits during active chat) + const cacheKey = `chat:${roomId}:${cursor || "latest"}:${limit}`; + const cached = await redis.get(cacheKey); + if (cached) { + return NextResponse.json(JSON.parse(cached)); + } + const messages = await prisma.chatMessage.findMany({ where: { roomId, @@ -56,13 +65,25 @@ export async function GET( }, orderBy: { createdAt: "desc" }, take: limit, + select: { + id: true, + sessionId: true, + senderName: true, + content: true, + createdAt: true, + }, }); const nextCursor = messages.length === limit ? messages[messages.length - 1].createdAt.toISOString() : null; - return NextResponse.json({ messages: messages.reverse(), nextCursor }); + const result = { messages: messages.reverse(), nextCursor }; + + // Cache for 10 seconds + await redis.set(cacheKey, JSON.stringify(result), "EX", 10); + + return NextResponse.json(result); } export async function POST( @@ -100,5 +121,13 @@ export async function POST( }, }); + // Publish to Redis for real-time delivery via SSE + await publishChatMessage(roomId, { + id: message.id, + senderName: message.senderName, + content: message.content, + createdAt: message.createdAt.toISOString(), + }); + return NextResponse.json(message, { status: 201 }); } diff --git a/src/app/api/rooms/[roomId]/chat/stream/route.ts b/src/app/api/rooms/[roomId]/chat/stream/route.ts new file mode 100644 index 0000000..cf316a0 --- /dev/null +++ b/src/app/api/rooms/[roomId]/chat/stream/route.ts @@ -0,0 +1,74 @@ +import { prisma } from "@/lib/prisma"; +import { subscribeChatMessages } from "@/lib/chat-pubsub"; + +export async function GET( + req: Request, + { params }: { params: Promise<{ roomId: string }> } +) { + const { roomId } = await params; + const url = new URL(req.url); + const sessionId = url.searchParams.get("sessionId"); + + if (!sessionId) { + return new Response(JSON.stringify({ error: "sessionId required" }), { + status: 400, + headers: { "Content-Type": "application/json" }, + }); + } + + // Verify participant + const participant = await prisma.participantHistory.findFirst({ + where: { roomId, sessionId, leftAt: null }, + }); + if (!participant) { + return new Response(JSON.stringify({ error: "Not a participant" }), { + status: 403, + headers: { "Content-Type": "application/json" }, + }); + } + + const stream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + const send = (data: Record) => { + try { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`)); + } catch { /* stream closed */ } + }; + + const { unsubscribe } = subscribeChatMessages(roomId, (msg) => { + send(msg); + }); + + // Keep-alive every 30s + const keepAlive = setInterval(() => { + try { + controller.enqueue(encoder.encode(": keepalive\n\n")); + } catch { /* stream closed */ } + }, 30000); + + // Timeout after 1 hour + const timeout = setTimeout(() => cleanup(), 60 * 60 * 1000); + + let cleaned = false; + function cleanup() { + if (cleaned) return; + cleaned = true; + clearInterval(keepAlive); + clearTimeout(timeout); + unsubscribe().catch(() => {}); + try { controller.close(); } catch { /* already closed */ } + } + + req.signal.addEventListener("abort", () => cleanup()); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); +} diff --git a/src/app/api/rooms/[roomId]/join/route.ts b/src/app/api/rooms/[roomId]/join/route.ts index f45a435..b76866b 100644 --- a/src/app/api/rooms/[roomId]/join/route.ts +++ b/src/app/api/rooms/[roomId]/join/route.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { compare } from "bcryptjs"; import { prisma } from "@/lib/prisma"; import { createToken } from "@/lib/livekit"; +import { checkRateLimit } from "@/lib/rate-limit"; const joinSchema = z.object({ displayName: z.string().min(1).max(100), @@ -11,11 +12,6 @@ const joinSchema = z.object({ sessionFingerprint: z.string().min(1), }); -// CRITICAL-10: In-memory rate limiter for PIN attempts -const pinAttempts = new Map(); -const PIN_MAX_ATTEMPTS = 5; -const PIN_WINDOW_MS = 60_000; // 1 minute - export async function POST( req: Request, { params }: { params: Promise<{ roomId: string }> } @@ -63,22 +59,14 @@ export async function POST( return NextResponse.json({ error: "PIN required" }, { status: 401 }); } - // Rate limit PIN attempts by roomId + IP - const ip = req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() || "unknown"; - const rateLimitKey = `${roomId}:${ip}`; - const now = Date.now(); - const attempt = pinAttempts.get(rateLimitKey); - - if (attempt && now < attempt.resetTime) { - if (attempt.count >= PIN_MAX_ATTEMPTS) { - return NextResponse.json( - { error: "Too many PIN attempts. Try again later." }, - { status: 429 } - ); - } - attempt.count++; - } else { - pinAttempts.set(rateLimitKey, { count: 1, resetTime: now + PIN_WINDOW_MS }); + // Rate limit PIN attempts by roomId + IP (Redis-based) + const clientIp = req.headers.get("x-forwarded-for")?.split(",")[0]?.trim() || "unknown"; + const { allowed } = await checkRateLimit(`pin:${roomId}:${clientIp}`, 5, 60); + if (!allowed) { + return NextResponse.json( + { error: "Too many PIN attempts. Try again later." }, + { status: 429 } + ); } const valid = await compare(pin, room.pinHash); diff --git a/src/app/api/rooms/[roomId]/lobby/route.ts b/src/app/api/rooms/[roomId]/lobby/route.ts index dabb0d2..e47c53e 100644 --- a/src/app/api/rooms/[roomId]/lobby/route.ts +++ b/src/app/api/rooms/[roomId]/lobby/route.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { prisma } from "@/lib/prisma"; import { getSessionFromRequest } from "@/lib/auth-helpers"; import { createToken } from "@/lib/livekit"; +import { publishLobbyEvent } from "@/lib/lobby-pubsub"; const lobbyActionSchema = z.object({ lobbyEntryId: z.string().min(1), @@ -74,6 +75,7 @@ export async function POST( where: { id: lobbyEntryId }, data: { status: "REJECTED" }, }); + await publishLobbyEvent(roomId, entry.sessionId, { status: "REJECTED" }); return NextResponse.json({ status: "rejected" }); } @@ -105,9 +107,11 @@ export async function POST( }), ]); - // Store token temporarily in lobbyEntry metadata — we use the updatedAt as a signal - // The SSE endpoint will query the entry status and generate a fresh token - // We store it via a simple in-memory approach or re-generate in SSE - // For simplicity, we return it here too (host can relay it) + // Publish to Redis so the SSE stream picks it up instantly + await publishLobbyEvent(roomId, entry.sessionId, { + status: "APPROVED", + token, + }); + return NextResponse.json({ status: "approved", token }); } diff --git a/src/app/api/rooms/[roomId]/lobby/stream/route.ts b/src/app/api/rooms/[roomId]/lobby/stream/route.ts index 5abb5e8..5aa2bc0 100644 --- a/src/app/api/rooms/[roomId]/lobby/stream/route.ts +++ b/src/app/api/rooms/[roomId]/lobby/stream/route.ts @@ -1,5 +1,6 @@ import { prisma } from "@/lib/prisma"; import { createToken } from "@/lib/livekit"; +import { subscribeLobbyEvents } from "@/lib/lobby-pubsub"; export async function GET( req: Request, @@ -16,89 +17,103 @@ export async function GET( }); } - const room = await prisma.room.findUnique({ where: { id: roomId } }); - if (!room) { - return new Response(JSON.stringify({ error: "Room not found" }), { - status: 404, - headers: { "Content-Type": "application/json" }, - }); - } - - // CRITICAL-8: Verify sessionId matches an existing LobbyEntry for this room + // Verify sessionId has a LobbyEntry for this room const existingEntry = await prisma.lobbyEntry.findFirst({ where: { roomId, sessionId }, + orderBy: { createdAt: "desc" }, }); if (!existingEntry) { - return new Response(JSON.stringify({ error: "No lobby entry found for this session" }), { - status: 403, - headers: { "Content-Type": "application/json" }, + return new Response( + JSON.stringify({ error: "No lobby entry found for this session" }), + { status: 403, headers: { "Content-Type": "application/json" } } + ); + } + + // Check if already approved/rejected + if (existingEntry.status === "APPROVED") { + const freshRoom = await prisma.room.findUnique({ where: { id: roomId } }); + if (!freshRoom?.livekitRoomName) { + return new Response( + JSON.stringify({ error: "Room not started yet" }), + { status: 400, headers: { "Content-Type": "application/json" } } + ); + } + const canPublish = !freshRoom.webinarMode; + const token = await createToken(sessionId, freshRoom.livekitRoomName, { + name: existingEntry.displayName, + canPublish, + canSubscribe: true, + canPublishData: true, + }); + const body = `data: ${JSON.stringify({ status: "APPROVED", token })}\n\n`; + return new Response(body, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, }); } + if (existingEntry.status === "REJECTED") { + const body = `data: ${JSON.stringify({ status: "REJECTED" })}\n\n`; + return new Response(body, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); + } + + // PENDING — subscribe to Redis channel and wait for event const stream = new ReadableStream({ - async start(controller) { + start(controller) { const encoder = new TextEncoder(); const send = (data: Record) => { - controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`)); + try { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(data)}\n\n`)); + } catch { + /* stream may be closed */ + } }; - let resolved = false; - const maxPolls = 150; // 5 minutes at 2s intervals + // Send initial PENDING status + send({ status: "PENDING" }); - for (let i = 0; i < maxPolls && !resolved; i++) { - try { - const entry = await prisma.lobbyEntry.findFirst({ - where: { roomId, sessionId }, - orderBy: { createdAt: "desc" }, - }); - - if (!entry) { - send({ status: "error", message: "Lobby entry not found" }); - resolved = true; - break; - } - - if (entry.status === "APPROVED") { - // Re-fetch room to get fresh livekitRoomName (may have been started after lobby entry) - const freshRoom = await prisma.room.findUnique({ where: { id: roomId } }); - if (!freshRoom?.livekitRoomName) { - send({ status: "error", message: "Room not started yet" }); - resolved = true; - break; - } - const canPublish = !freshRoom.webinarMode; - const token = await createToken(sessionId, freshRoom.livekitRoomName, { - name: entry.displayName, - canPublish, - canSubscribe: true, - canPublishData: true, - }); - send({ status: "APPROVED", token }); - resolved = true; - break; - } - - if (entry.status === "REJECTED") { - send({ status: "REJECTED" }); - resolved = true; - break; - } - - send({ status: "PENDING" }); - } catch { - send({ status: "error", message: "Internal error" }); - resolved = true; - break; + // Subscribe to Redis pub/sub + const { unsubscribe } = subscribeLobbyEvents( + roomId, + sessionId, + (event) => { + send(event); + cleanup(); } + ); - await new Promise((r) => setTimeout(r, 2000)); - } - - if (!resolved) { + // Timeout after 5 minutes + const timeout = setTimeout(() => { send({ status: "timeout" }); + cleanup(); + }, 5 * 60 * 1000); + + let cleaned = false; + function cleanup() { + if (cleaned) return; + cleaned = true; + clearTimeout(timeout); + unsubscribe().catch(() => {}); + try { + controller.close(); + } catch { + /* already closed */ + } } - controller.close(); + // Handle client disconnect via AbortSignal + req.signal.addEventListener("abort", () => { + cleanup(); + }); }, }); diff --git a/src/app/api/rooms/route.ts b/src/app/api/rooms/route.ts index bfb7929..4355cfe 100644 --- a/src/app/api/rooms/route.ts +++ b/src/app/api/rooms/route.ts @@ -59,6 +59,18 @@ export async function GET(req: Request) { const rooms = await prisma.room.findMany({ where, orderBy: { createdAt: "desc" }, + select: { + id: true, + name: true, + code: true, + status: true, + lobbyEnabled: true, + webinarMode: true, + isLocked: true, + createdAt: true, + startedAt: true, + endedAt: true, + }, }); return NextResponse.json(rooms); diff --git a/src/components/room/ChatPanel.tsx b/src/components/room/ChatPanel.tsx index ea974a2..7e40b77 100644 --- a/src/components/room/ChatPanel.tsx +++ b/src/components/room/ChatPanel.tsx @@ -1,6 +1,6 @@ "use client"; -import { useState, useEffect, useRef, type FormEvent } from "react"; +import { useState, useEffect, useRef, useCallback, type FormEvent } from "react"; interface ChatPanelProps { roomId: string; @@ -20,29 +20,53 @@ export default function ChatPanel({ roomId, sessionId, senderName }: ChatPanelPr const [input, setInput] = useState(""); const [sending, setSending] = useState(false); const bottomRef = useRef(null); - const pollRef = useRef>(undefined); + const seenIds = useRef(new Set()); + const addMessage = useCallback((msg: ChatMessage) => { + if (seenIds.current.has(msg.id)) return; + seenIds.current.add(msg.id); + setMessages((prev) => [...prev, msg]); + }, []); + + // Load history + connect SSE useEffect(() => { - async function fetchMessages() { + let eventSource: EventSource | null = null; + + async function init() { + // Fetch history try { const res = await fetch(`/api/rooms/${roomId}/chat?sessionId=${sessionId}`); if (res.ok) { const data = await res.json(); - setMessages(data.messages); + const msgs = data.messages as ChatMessage[]; + msgs.forEach((m) => seenIds.current.add(m.id)); + setMessages(msgs); } - } catch { - // silent - } + } catch { /* silent */ } + + // Connect SSE for real-time messages + eventSource = new EventSource( + `/api/rooms/${roomId}/chat/stream?sessionId=${sessionId}` + ); + eventSource.onmessage = (e) => { + try { + const msg = JSON.parse(e.data) as ChatMessage; + addMessage(msg); + } catch { /* ignore */ } + }; + eventSource.onerror = () => { + // Reconnect handled by browser automatically + }; } - fetchMessages(); - pollRef.current = setInterval(fetchMessages, 3000); + init(); return () => { - if (pollRef.current) clearInterval(pollRef.current); + eventSource?.close(); }; - }, [roomId]); + }, [roomId, sessionId, addMessage]); + // Auto-scroll useEffect(() => { bottomRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages]); @@ -51,20 +75,36 @@ export default function ChatPanel({ roomId, sessionId, senderName }: ChatPanelPr e.preventDefault(); if (!input.trim() || sending) return; + const content = input.trim(); + + // Optimistic UI — show message immediately + const optimisticMsg: ChatMessage = { + id: `temp-${Date.now()}`, + senderName, + content, + createdAt: new Date().toISOString(), + }; + setMessages((prev) => [...prev, optimisticMsg]); + setInput(""); setSending(true); + try { const res = await fetch(`/api/rooms/${roomId}/chat`, { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ sessionId, senderName, content: input.trim() }), + body: JSON.stringify({ sessionId, senderName, content }), }); if (res.ok) { const msg = await res.json(); - setMessages((prev) => [...prev, msg]); - setInput(""); + // Replace optimistic message with real one + seenIds.current.add(msg.id); + setMessages((prev) => + prev.map((m) => (m.id === optimisticMsg.id ? { ...msg, createdAt: msg.createdAt } : m)) + ); } } catch { - // silent + // Remove optimistic message on failure + setMessages((prev) => prev.filter((m) => m.id !== optimisticMsg.id)); } finally { setSending(false); } diff --git a/src/lib/chat-pubsub.ts b/src/lib/chat-pubsub.ts new file mode 100644 index 0000000..dfe92b7 --- /dev/null +++ b/src/lib/chat-pubsub.ts @@ -0,0 +1,36 @@ +import { redis, createSubscriber } from "./redis"; + +const CHAT_CHANNEL = (roomId: string) => `chat:${roomId}`; + +export type ChatEvent = { + id: string; + senderName: string; + content: string; + createdAt: string; +}; + +export async function publishChatMessage(roomId: string, message: ChatEvent) { + await redis.publish(CHAT_CHANNEL(roomId), JSON.stringify(message)); +} + +export function subscribeChatMessages( + roomId: string, + onMessage: (msg: ChatEvent) => void +): { unsubscribe: () => Promise } { + const sub = createSubscriber(); + const channel = CHAT_CHANNEL(roomId); + + sub.subscribe(channel); + sub.on("message", (_ch: string, data: string) => { + try { + onMessage(JSON.parse(data) as ChatEvent); + } catch { /* ignore */ } + }); + + return { + async unsubscribe() { + await sub.unsubscribe(channel); + await sub.quit(); + }, + }; +} diff --git a/src/lib/lobby-pubsub.ts b/src/lib/lobby-pubsub.ts new file mode 100644 index 0000000..71ecfe4 --- /dev/null +++ b/src/lib/lobby-pubsub.ts @@ -0,0 +1,46 @@ +import { redis, createSubscriber } from "./redis"; + +const LOBBY_CHANNEL = (roomId: string, sessionId: string) => + `lobby:${roomId}:${sessionId}`; + +export type LobbyEvent = { + status: "APPROVED" | "REJECTED"; + token?: string; +}; + +export async function publishLobbyEvent( + roomId: string, + sessionId: string, + event: LobbyEvent +) { + await redis.publish( + LOBBY_CHANNEL(roomId, sessionId), + JSON.stringify(event) + ); +} + +export function subscribeLobbyEvents( + roomId: string, + sessionId: string, + onEvent: (event: LobbyEvent) => void +): { unsubscribe: () => Promise } { + const sub = createSubscriber(); + const channel = LOBBY_CHANNEL(roomId, sessionId); + + sub.subscribe(channel); + sub.on("message", (_ch: string, message: string) => { + try { + const event = JSON.parse(message) as LobbyEvent; + onEvent(event); + } catch { + /* ignore malformed */ + } + }); + + return { + async unsubscribe() { + await sub.unsubscribe(channel); + await sub.quit(); + }, + }; +} diff --git a/src/lib/rate-limit.ts b/src/lib/rate-limit.ts new file mode 100644 index 0000000..80ff1cf --- /dev/null +++ b/src/lib/rate-limit.ts @@ -0,0 +1,17 @@ +import { redis } from "./redis"; + +export async function checkRateLimit( + key: string, + maxAttempts: number, + windowSeconds: number +): Promise<{ allowed: boolean; remaining: number }> { + const redisKey = `ratelimit:${key}`; + const current = await redis.incr(redisKey); + if (current === 1) { + await redis.expire(redisKey, windowSeconds); + } + return { + allowed: current <= maxAttempts, + remaining: Math.max(0, maxAttempts - current), + }; +} diff --git a/src/lib/redis.ts b/src/lib/redis.ts new file mode 100644 index 0000000..46c3df4 --- /dev/null +++ b/src/lib/redis.ts @@ -0,0 +1,20 @@ +import Redis from "ioredis"; + +const globalForRedis = globalThis as unknown as { redis: Redis | undefined }; + +export const redis = + globalForRedis.redis ?? + new Redis(process.env.REDIS_URL || "redis://localhost:6379", { + maxRetriesPerRequest: 3, + lazyConnect: true, + }); + +if (process.env.NODE_ENV !== "production") globalForRedis.redis = redis; + +// Separate instance for pub/sub (Redis requires dedicated connections for subscribers) +export function createSubscriber() { + return new Redis(process.env.REDIS_URL || "redis://localhost:6379", { + maxRetriesPerRequest: 3, + lazyConnect: true, + }); +}