Files
LiveServer-M1/ai-agent/main.py
joylessorchid 3846e3e00d feat: LiveServer-M1 v1 — educational video conferencing platform
Full MVP implementation:
- Next.js 16 + React 19 + TypeScript + Tailwind CSS v4
- LiveKit integration (rooms, tokens, webhooks, moderation)
- better-auth (email/password, sessions, roles)
- Prisma 7 + PostgreSQL (9 models, 3 enums)
- 15 API routes (auth, rooms, lobby, chat, files, moderation, hand-raise)
- 7 pages (landing, auth, dashboard, join, video room)
- SSE-based waiting room with host approval flow
- Security: PIN rate limiting, session fingerprint bans, chat/files auth
- Python AI Agent (Deepgram STT + OpenAI summarization)
- Docker Compose (local + production with Traefik + Let's Encrypt)
- Interactive setup script (setup.sh)
- Dev protection middleware (DEV_ACCESS_KEY, ALLOWED_IPS)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 13:57:53 +03:00

296 lines
9.5 KiB
Python

"""
LiveServer-M1 AI Agent
======================
LiveKit agent that provides real-time transcription via Deepgram STT,
publishes captions to participants, and on room end generates an
AI summary (GPT-4o-mini) stored alongside the full transcript in PostgreSQL.
"""
from __future__ import annotations
import json
import logging
import os
import time
from datetime import datetime, timezone
import psycopg2
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.plugins import deepgram, openai
load_dotenv()
logger = logging.getLogger("liveserver-agent")
logger.setLevel(logging.INFO)
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def _get_db_connection():
"""Create a fresh PostgreSQL connection from DATABASE_URL."""
dsn = os.environ.get("DATABASE_URL")
if not dsn:
raise RuntimeError("DATABASE_URL environment variable is not set")
return psycopg2.connect(dsn)
def _ensure_table():
"""Create the lecture_artifacts table if it doesn't already exist.
This is a safety net — normally Prisma migrations handle schema creation.
The column names match the Prisma model exactly (snake_case mapped via @@map).
"""
ddl = """
CREATE TABLE IF NOT EXISTS lecture_artifacts (
id TEXT PRIMARY KEY,
"roomId" TEXT UNIQUE NOT NULL,
"transcriptText" TEXT,
"transcriptUrl" TEXT,
summary TEXT,
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now()
);
"""
conn = None
try:
conn = _get_db_connection()
with conn.cursor() as cur:
cur.execute(ddl)
conn.commit()
logger.info("lecture_artifacts table verified")
except Exception:
logger.exception("Failed to ensure lecture_artifacts table")
if conn:
conn.rollback()
finally:
if conn:
conn.close()
def save_artifact(room_name: str, transcript: str, summary: str | None):
"""Upsert a lecture artifact for the given room.
Uses the LiveKit room name to look up the room row first (rooms.livekitRoomName),
then inserts or updates lecture_artifacts.
"""
conn = None
try:
conn = _get_db_connection()
with conn.cursor() as cur:
# Resolve internal room id from livekitRoomName
cur.execute(
'SELECT id FROM rooms WHERE "livekitRoomName" = %s',
(room_name,),
)
row = cur.fetchone()
if not row:
logger.error("Room not found in DB for livekitRoomName=%s", room_name)
return
room_id = row[0]
now = datetime.now(timezone.utc)
# Upsert via ON CONFLICT on roomId (unique)
cur.execute(
"""
INSERT INTO lecture_artifacts (id, "roomId", "transcriptText", summary, "createdAt", "updatedAt")
VALUES (gen_random_uuid()::text, %s, %s, %s, %s, %s)
ON CONFLICT ("roomId")
DO UPDATE SET
"transcriptText" = EXCLUDED."transcriptText",
summary = EXCLUDED.summary,
"updatedAt" = EXCLUDED."updatedAt"
""",
(room_id, transcript, summary, now, now),
)
conn.commit()
logger.info("Saved artifact for room %s (room_id=%s)", room_name, room_id)
except Exception:
logger.exception("Failed to save artifact for room %s", room_name)
if conn:
conn.rollback()
finally:
if conn:
conn.close()
# ---------------------------------------------------------------------------
# Summarisation
# ---------------------------------------------------------------------------
async def summarise_transcript(transcript: str) -> str | None:
"""Send the full transcript to GPT-4o-mini and return a markdown summary."""
if not transcript.strip():
return None
try:
llm = openai.LLM(model="gpt-4o-mini")
chat_ctx = openai.ChatContext()
chat_ctx.append(
role="system",
text=(
"You are a helpful assistant that summarises educational lectures. "
"Produce a concise summary in Markdown with: key topics covered, "
"main takeaways, and any action items mentioned. "
"Keep it under 500 words. Respond in the same language as the transcript."
),
)
chat_ctx.append(role="user", text=transcript)
response = await llm.chat(chat_ctx=chat_ctx)
return response.choices[0].message.content
except Exception:
logger.exception("Summarisation failed")
return None
# ---------------------------------------------------------------------------
# Agent entrypoint
# ---------------------------------------------------------------------------
async def entrypoint(ctx: JobContext):
"""Called by the livekit-agents framework for every room job."""
logger.info("Agent joining room: %s", ctx.room.name)
# Connect to the room — subscribe to audio only (for STT)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# Accumulated transcript segments: list of {"speaker": str, "text": str, "ts": float}
segments: list[dict] = []
# ----- Deepgram STT setup ----- #
stt = deepgram.STT()
# We'll maintain one recognition stream per audio track
active_streams: dict[str, deepgram.SpeechStream] = {}
async def _process_stt_stream(
stream: deepgram.SpeechStream,
participant_identity: str,
):
"""Read STT events from a Deepgram stream, publish captions, accumulate transcript."""
async for event in stream:
if not event.is_final or not event.alternatives:
continue
text = event.alternatives[0].text.strip()
if not text:
continue
# Accumulate
segment = {
"speaker": participant_identity,
"text": text,
"ts": time.time(),
}
segments.append(segment)
# Log to console
logger.info("[%s] %s", participant_identity, text)
# Publish caption to all participants via data channel
caption_payload = json.dumps(
{
"type": "transcription",
"speaker": participant_identity,
"text": text,
},
ensure_ascii=False,
).encode("utf-8")
await ctx.room.local_participant.publish_data(
caption_payload,
reliable=True,
topic="transcription",
)
async def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
"""Start STT for every subscribed audio track."""
if track.kind != rtc.TrackKind.KIND_AUDIO:
return
logger.info("Subscribed to audio from %s", participant.identity)
audio_stream = rtc.AudioStream(track)
stt_stream = stt.stream()
active_streams[participant.identity] = stt_stream
# Forward audio frames to the STT stream
async def _forward():
async for frame_event in audio_stream:
stt_stream.push_frame(frame_event.frame)
# Run forwarding and recognition concurrently
import asyncio
asyncio.ensure_future(_forward())
asyncio.ensure_future(_process_stt_stream(stt_stream, participant.identity))
ctx.room.on("track_subscribed", on_track_subscribed)
# ----- Handle room disconnect / end ----- #
async def on_disconnected():
"""When the agent disconnects from the room, finalise the transcript."""
logger.info("Agent disconnected from room %s", ctx.room.name)
# Close all active STT streams
for identity, stream in active_streams.items():
try:
await stream.aclose()
except Exception:
logger.warning("Error closing STT stream for %s", identity)
if not segments:
logger.info("No transcript segments collected — nothing to save")
return
# Build full transcript text
full_transcript = "\n".join(
f"[{seg['speaker']}]: {seg['text']}" for seg in segments
)
logger.info(
"Full transcript (%d segments, %d chars):\n%s",
len(segments),
len(full_transcript),
full_transcript,
)
# Summarise
logger.info("Generating summary via GPT-4o-mini...")
summary = await summarise_transcript(full_transcript)
if summary:
logger.info("Summary:\n%s", summary)
else:
logger.warning("Summary generation returned empty result")
# Persist to PostgreSQL
save_artifact(ctx.room.name, full_transcript, summary)
ctx.room.on("disconnected", on_disconnected)
# ---------------------------------------------------------------------------
# Worker bootstrap
# ---------------------------------------------------------------------------
if __name__ == "__main__":
_ensure_table()
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
)
)