""" LiveServer-M1 AI Agent ====================== LiveKit agent that provides real-time transcription via Deepgram STT, publishes captions to participants, and on room end generates an AI summary (GPT-4o-mini) stored alongside the full transcript in PostgreSQL. """ from __future__ import annotations import json import logging import os import time from datetime import datetime, timezone import psycopg2 from dotenv import load_dotenv from livekit import rtc from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli from livekit.plugins import deepgram, openai load_dotenv() logger = logging.getLogger("liveserver-agent") logger.setLevel(logging.INFO) # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def _get_db_connection(): """Create a fresh PostgreSQL connection from DATABASE_URL.""" dsn = os.environ.get("DATABASE_URL") if not dsn: raise RuntimeError("DATABASE_URL environment variable is not set") return psycopg2.connect(dsn) def _ensure_table(): """Create the lecture_artifacts table if it doesn't already exist. This is a safety net — normally Prisma migrations handle schema creation. The column names match the Prisma model exactly (snake_case mapped via @@map). """ ddl = """ CREATE TABLE IF NOT EXISTS lecture_artifacts ( id TEXT PRIMARY KEY, "roomId" TEXT UNIQUE NOT NULL, "transcriptText" TEXT, "transcriptUrl" TEXT, summary TEXT, "createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(), "updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now() ); """ conn = None try: conn = _get_db_connection() with conn.cursor() as cur: cur.execute(ddl) conn.commit() logger.info("lecture_artifacts table verified") except Exception: logger.exception("Failed to ensure lecture_artifacts table") if conn: conn.rollback() finally: if conn: conn.close() def save_artifact(room_name: str, transcript: str, summary: str | None): """Upsert a lecture artifact for the given room. Uses the LiveKit room name to look up the room row first (rooms.livekitRoomName), then inserts or updates lecture_artifacts. """ conn = None try: conn = _get_db_connection() with conn.cursor() as cur: # Resolve internal room id from livekitRoomName cur.execute( 'SELECT id FROM rooms WHERE "livekitRoomName" = %s', (room_name,), ) row = cur.fetchone() if not row: logger.error("Room not found in DB for livekitRoomName=%s", room_name) return room_id = row[0] now = datetime.now(timezone.utc) # Upsert via ON CONFLICT on roomId (unique) cur.execute( """ INSERT INTO lecture_artifacts (id, "roomId", "transcriptText", summary, "createdAt", "updatedAt") VALUES (gen_random_uuid()::text, %s, %s, %s, %s, %s) ON CONFLICT ("roomId") DO UPDATE SET "transcriptText" = EXCLUDED."transcriptText", summary = EXCLUDED.summary, "updatedAt" = EXCLUDED."updatedAt" """, (room_id, transcript, summary, now, now), ) conn.commit() logger.info("Saved artifact for room %s (room_id=%s)", room_name, room_id) except Exception: logger.exception("Failed to save artifact for room %s", room_name) if conn: conn.rollback() finally: if conn: conn.close() # --------------------------------------------------------------------------- # Summarisation # --------------------------------------------------------------------------- async def summarise_transcript(transcript: str) -> str | None: """Send the full transcript to GPT-4o-mini and return a markdown summary.""" if not transcript.strip(): return None try: llm = openai.LLM(model="gpt-4o-mini") chat_ctx = openai.ChatContext() chat_ctx.append( role="system", text=( "You are a helpful assistant that summarises educational lectures. " "Produce a concise summary in Markdown with: key topics covered, " "main takeaways, and any action items mentioned. " "Keep it under 500 words. Respond in the same language as the transcript." ), ) chat_ctx.append(role="user", text=transcript) response = await llm.chat(chat_ctx=chat_ctx) return response.choices[0].message.content except Exception: logger.exception("Summarisation failed") return None # --------------------------------------------------------------------------- # Agent entrypoint # --------------------------------------------------------------------------- async def entrypoint(ctx: JobContext): """Called by the livekit-agents framework for every room job.""" logger.info("Agent joining room: %s", ctx.room.name) # Connect to the room — subscribe to audio only (for STT) await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) # Accumulated transcript segments: list of {"speaker": str, "text": str, "ts": float} segments: list[dict] = [] # ----- Deepgram STT setup ----- # stt = deepgram.STT() # We'll maintain one recognition stream per audio track active_streams: dict[str, deepgram.SpeechStream] = {} async def _process_stt_stream( stream: deepgram.SpeechStream, participant_identity: str, ): """Read STT events from a Deepgram stream, publish captions, accumulate transcript.""" async for event in stream: if not event.is_final or not event.alternatives: continue text = event.alternatives[0].text.strip() if not text: continue # Accumulate segment = { "speaker": participant_identity, "text": text, "ts": time.time(), } segments.append(segment) # Log to console logger.info("[%s] %s", participant_identity, text) # Publish caption to all participants via data channel caption_payload = json.dumps( { "type": "transcription", "speaker": participant_identity, "text": text, }, ensure_ascii=False, ).encode("utf-8") await ctx.room.local_participant.publish_data( caption_payload, reliable=True, topic="transcription", ) async def on_track_subscribed( track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, ): """Start STT for every subscribed audio track.""" if track.kind != rtc.TrackKind.KIND_AUDIO: return logger.info("Subscribed to audio from %s", participant.identity) audio_stream = rtc.AudioStream(track) stt_stream = stt.stream() active_streams[participant.identity] = stt_stream # Forward audio frames to the STT stream async def _forward(): async for frame_event in audio_stream: stt_stream.push_frame(frame_event.frame) # Run forwarding and recognition concurrently import asyncio asyncio.ensure_future(_forward()) asyncio.ensure_future(_process_stt_stream(stt_stream, participant.identity)) ctx.room.on("track_subscribed", on_track_subscribed) # ----- Handle room disconnect / end ----- # async def on_disconnected(): """When the agent disconnects from the room, finalise the transcript.""" logger.info("Agent disconnected from room %s", ctx.room.name) # Close all active STT streams for identity, stream in active_streams.items(): try: await stream.aclose() except Exception: logger.warning("Error closing STT stream for %s", identity) if not segments: logger.info("No transcript segments collected — nothing to save") return # Build full transcript text full_transcript = "\n".join( f"[{seg['speaker']}]: {seg['text']}" for seg in segments ) logger.info( "Full transcript (%d segments, %d chars):\n%s", len(segments), len(full_transcript), full_transcript, ) # Summarise logger.info("Generating summary via GPT-4o-mini...") summary = await summarise_transcript(full_transcript) if summary: logger.info("Summary:\n%s", summary) else: logger.warning("Summary generation returned empty result") # Persist to PostgreSQL save_artifact(ctx.room.name, full_transcript, summary) ctx.room.on("disconnected", on_disconnected) # --------------------------------------------------------------------------- # Worker bootstrap # --------------------------------------------------------------------------- if __name__ == "__main__": _ensure_table() cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, ) )