From 708d63d9095d4042e4c404ade19a14134f0cfe50 Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 20 Apr 2026 21:10:13 +0300 Subject: [PATCH 1/6] Refactor transcript pipeline: Vexa WS passthrough + TranscriptManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #135. Replaces the re-hash-and-flatten transcript pipeline with a passthrough over Vexa's new WS contract (`{type:"transcript", confirmed, pending}`). Backend: - models/stored_segment.py: carry Vexa's stable `segment_id`; add `start_time`, `end_time`, `completed`; remove hashed `generate_segment_id`. - transcription_service.on_transcription_updated: upsert `confirmed` by Vexa's `segment_id`; broadcast the flat `{type:"transcript", speaker, confirmed, pending, playlist_id, version_id, ts}` shape verbatim. - transcription_providers/vexa.py::_handle_ws_message: accept the new `type:"transcript"` frame; forward `confirmed/pending/speaker/ts` raw. - storage_providers/mongodb.py::upsert_segment: drop duplicate `segment_id` from `$setOnInsert` — it already lives in `$set` via the model dump, and MongoDB rejects the same field in both operators (ConflictingUpdateOperators). - main.py: add `/test/broadcast-transcript` endpoint gated by env var `DNA_TESTING_ENABLED` for end-to-end WS shape assertions (404 otherwise). Frontend: - Add `@vexaai/transcript-rendering@^0.4.0`; it is now the single dedup authority for transcript rendering. - `@dna/core` gains `TranscriptEventPayload` + a `'transcript'` EventType; `DNAEventClient.handleMessage` forwards raw transcript messages without the `{type, payload}` envelope so `TranscriptManager.handleMessage` consumes them directly. - useSegments: `createTranscriptManager()` per (playlist, version); REST bootstrap and WS ticks both flow through it. - useDNAEvents: new `useTranscriptEvents` hook. - useAISuggestion: switched off `useSegmentEvents`. - StoredSegment: gains `start_time`, `end_time`, `completed`. Follow-ups (out of scope for this PR): - backend/tests/test_transcription_service.py + test_legacy_models.py still reference the removed `generate_segment_id` and the old `segments:[...]` payload shape; they will need to be rewritten against the new contract. Signed-off-by: DmitryG228 <2280905@gmail.com> --- backend/src/dna/models/__init__.py | 2 - backend/src/dna/models/stored_segment.py | 43 ++++--- backend/src/dna/storage_providers/mongodb.py | 5 +- .../src/dna/transcription_providers/vexa.py | 14 ++- backend/src/dna/transcription_service.py | 114 ++++++++---------- backend/src/main.py | 19 +++ frontend/packages/app/package.json | 1 + frontend/packages/app/src/hooks/index.ts | 3 +- .../packages/app/src/hooks/useAISuggestion.ts | 10 +- .../packages/app/src/hooks/useDNAEvents.ts | 31 +++++ .../packages/app/src/hooks/useSegments.ts | 108 +++++++++-------- frontend/packages/core/src/eventClient.ts | 30 ++++- frontend/packages/core/src/interfaces.ts | 3 + 13 files changed, 231 insertions(+), 152 deletions(-) diff --git a/backend/src/dna/models/__init__.py b/backend/src/dna/models/__init__.py index e2b7a3dc..5aa1c398 100644 --- a/backend/src/dna/models/__init__.py +++ b/backend/src/dna/models/__init__.py @@ -43,7 +43,6 @@ from dna.models.stored_segment import ( StoredSegment, StoredSegmentCreate, - generate_segment_id, ) from dna.models.transcription import ( BotSession, @@ -92,7 +91,6 @@ "PlaylistMetadataUpdate", "StoredSegment", "StoredSegmentCreate", - "generate_segment_id", "BotSession", "BotStatus", "BotStatusEnum", diff --git a/backend/src/dna/models/stored_segment.py b/backend/src/dna/models/stored_segment.py index 7d1388aa..716db2a2 100644 --- a/backend/src/dna/models/stored_segment.py +++ b/backend/src/dna/models/stored_segment.py @@ -1,37 +1,37 @@ """Stored Segment Models. Pydantic models for transcription segments stored in MongoDB. + +Backend operates as a passthrough for Vexa's transcript stream: +- `segment_id` is Vexa's stable id (e.g. "9b914779:speaker-1:72"), not a hash. +- Upsert key in MongoDB is `{segment_id, playlist_id, version_id}`. +- All Vexa fields (start_time, end_time, completed, language, ...) are preserved. """ -import hashlib from datetime import datetime, timezone from typing import Optional from pydantic import BaseModel, ConfigDict, Field -def generate_segment_id( - playlist_id: int, - version_id: int, - absolute_start_time: str, -) -> str: - """Generate a unique segment ID based on version and start time. - - Note: Speaker is intentionally excluded from the key because Vexa's mutable - transcription can reassign speakers as it refines the transcript. Using only - the start time ensures updates to the same moment are treated as updates - rather than new segments. - """ - key = f"{playlist_id}:{version_id}:{absolute_start_time}" - return hashlib.sha256(key.encode()).hexdigest()[:16] - - class StoredSegmentCreate(BaseModel): - """Model for creating a stored segment.""" + """Model for creating/upserting a stored segment (raw Vexa passthrough).""" + segment_id: str = Field( + ..., description="Vexa's stable segment id (e.g. '9b914779:speaker-1:72')" + ) text: str = Field(..., description="Transcript text content") speaker: Optional[str] = Field(default=None, description="Speaker identifier") language: Optional[str] = Field(default=None, description="Language code") + start_time: Optional[float] = Field( + default=None, description="Relative start time in seconds" + ) + end_time: Optional[float] = Field( + default=None, description="Relative end time in seconds" + ) + completed: Optional[bool] = Field( + default=True, description="Whether the segment is confirmed (vs draft)" + ) absolute_start_time: str = Field( ..., description="UTC timestamp (ISO 8601) of segment start" ) @@ -39,7 +39,7 @@ class StoredSegmentCreate(BaseModel): ..., description="UTC timestamp (ISO 8601) of segment end" ) vexa_updated_at: Optional[str] = Field( - default=None, description="Vexa's updated_at timestamp for deduplication" + default=None, description="Vexa's updated_at timestamp" ) @@ -49,12 +49,15 @@ class StoredSegment(BaseModel): model_config = ConfigDict(populate_by_name=True) id: str = Field(alias="_id") - segment_id: str = Field(..., description="Unique segment ID") + segment_id: str playlist_id: int version_id: int text: str speaker: Optional[str] = None language: Optional[str] = None + start_time: Optional[float] = None + end_time: Optional[float] = None + completed: Optional[bool] = True absolute_start_time: str absolute_end_time: str vexa_updated_at: Optional[str] = None diff --git a/backend/src/dna/storage_providers/mongodb.py b/backend/src/dna/storage_providers/mongodb.py index 71107ac3..74f0baba 100644 --- a/backend/src/dna/storage_providers/mongodb.py +++ b/backend/src/dna/storage_providers/mongodb.py @@ -239,6 +239,10 @@ async def upsert_segment( existing = await self.segments_collection.find_one(query) is_new = existing is None + # `segment_id` is already in `data.model_dump()` — MongoDB rejects an + # update that lists the same field in both `$set` and `$setOnInsert`. + # `playlist_id`/`version_id` stay in `$setOnInsert` because they aren't + # part of `StoredSegmentCreate` (they come from the enclosing context). update: dict[str, Any] = { "$set": { **data.model_dump(), @@ -246,7 +250,6 @@ async def upsert_segment( }, "$setOnInsert": { "created_at": now, - "segment_id": segment_id, "playlist_id": playlist_id, "version_id": version_id, }, diff --git a/backend/src/dna/transcription_providers/vexa.py b/backend/src/dna/transcription_providers/vexa.py index afc9c9ce..6dc34f6d 100644 --- a/backend/src/dna/transcription_providers/vexa.py +++ b/backend/src/dna/transcription_providers/vexa.py @@ -299,7 +299,13 @@ async def _handle_ws_message(self, data: dict[str, Any]) -> None: if msg_type == "pong": return - if msg_type == "transcript.mutable": + if msg_type == "transcript": + # New Vexa WS contract (>= 2026-04): + # {type: "transcript", speaker, confirmed: [...], pending: [...], + # meeting: {id}, ts} + # Forward the raw confirmed/pending arrays to the service. The + # service persists confirmed segments to MongoDB and broadcasts + # the whole shape (plus playlist_id/version_id) to DNA clients. internal_id = meeting_info.get("id") meeting_key = self._meeting_id_to_key.get(internal_id) if not meeting_key: @@ -319,8 +325,10 @@ async def _handle_ws_message(self, data: dict[str, Any]) -> None: { "platform": platform, "meeting_id": native_id, - "segments": data.get("payload", {}).get("segments", []), - "payload": data.get("payload", {}), + "speaker": data.get("speaker"), + "confirmed": data.get("confirmed", []) or [], + "pending": data.get("pending", []) or [], + "ts": data.get("ts"), }, ) return diff --git a/backend/src/dna/transcription_service.py b/backend/src/dna/transcription_service.py index d4e19192..0ff794bf 100644 --- a/backend/src/dna/transcription_service.py +++ b/backend/src/dna/transcription_service.py @@ -5,7 +5,7 @@ from typing import Any from dna.events import EventPublisher, EventType, get_event_publisher -from dna.models.stored_segment import StoredSegmentCreate, generate_segment_id +from dna.models.stored_segment import StoredSegmentCreate from dna.storage_providers.storage_provider_base import ( StorageProviderBase, get_storage_provider, @@ -212,25 +212,28 @@ async def subscribe_to_meeting( logger.exception("Failed to subscribe to meeting %s: %s", meeting_key, e) async def on_transcription_updated(self, payload: dict[str, Any]) -> None: - """Process transcription segments and save to storage.""" + """Passthrough: upsert Vexa's confirmed segments by their stable + `segment_id`, then forward the raw `{type:"transcript", confirmed, + pending, speaker, playlist_id, version_id, ts}` message to DNA WS + clients — the frontend TranscriptManager consumes it directly. + """ if self.storage_provider is None or self.event_publisher is None: logger.error("Providers not initialized") return platform = payload.get("platform", "") meeting_id = payload.get("meeting_id", "") - segments = payload.get("segments", []) - - if not segments: - logger.debug("No segments in transcription update") - return + speaker = payload.get("speaker") + confirmed: list[dict[str, Any]] = payload.get("confirmed", []) or [] + pending: list[dict[str, Any]] = payload.get("pending", []) or [] + ts = payload.get("ts") meeting_key = f"{platform}:{meeting_id}" playlist_id = self._meeting_to_playlist.get(meeting_key) - if playlist_id is None: logger.warning( - "No playlist_id found for meeting %s, cannot save segments", meeting_key + "No playlist_id found for meeting %s, cannot save segments", + meeting_key, ) return @@ -252,13 +255,11 @@ async def on_transcription_updated(self, payload: dict[str, Any]) -> None: version_id = metadata.in_review resumed_at = metadata.transcription_resumed_at - for segment_data in segments: - text = segment_data.get("text", "").strip() - if not text: - continue - - absolute_start_time = segment_data.get("absolute_start_time") - if not absolute_start_time: + for seg in confirmed: + segment_id = seg.get("segment_id") + absolute_start_time = seg.get("absolute_start_time") + text = (seg.get("text") or "").strip() + if not segment_id or not absolute_start_time or not text: continue if resumed_at is not None: @@ -266,73 +267,52 @@ async def on_transcription_updated(self, payload: dict[str, Any]) -> None: segment_time = datetime.fromisoformat( absolute_start_time.replace("Z", "+00:00") ) - resumed_at_aware = resumed_at - if resumed_at.tzinfo is None: - resumed_at_aware = resumed_at.replace(tzinfo=timezone.utc) + resumed_at_aware = ( + resumed_at + if resumed_at.tzinfo is not None + else resumed_at.replace(tzinfo=timezone.utc) + ) if segment_time < resumed_at_aware: - logger.debug( - "Skipping segment from before resume: %s < %s", - absolute_start_time, - resumed_at_aware.isoformat(), - ) continue except ValueError: pass - speaker = segment_data.get("speaker", "Unknown") - segment_id = generate_segment_id( - playlist_id, version_id, absolute_start_time - ) - segment_create = StoredSegmentCreate( + segment_id=segment_id, text=text, - speaker=speaker, - language=segment_data.get("language"), + speaker=seg.get("speaker") or speaker, + language=seg.get("language"), + start_time=seg.get("start_time"), + end_time=seg.get("end_time"), + completed=True, absolute_start_time=absolute_start_time, - absolute_end_time=segment_data.get("absolute_end_time", ""), - vexa_updated_at=segment_data.get("updated_at"), + absolute_end_time=seg.get("absolute_end_time", ""), + vexa_updated_at=seg.get("updated_at"), ) try: - stored_segment, is_new = await self.storage_provider.upsert_segment( + await self.storage_provider.upsert_segment( playlist_id=playlist_id, version_id=version_id, segment_id=segment_id, data=segment_create, ) - - event_type = ( - EventType.SEGMENT_CREATED if is_new else EventType.SEGMENT_UPDATED - ) - await self.event_publisher.publish( - event_type, - { - "segment_id": segment_id, - "playlist_id": playlist_id, - "version_id": version_id, - "text": text, - "speaker": speaker, - "absolute_start_time": absolute_start_time, - "absolute_end_time": segment_data.get("absolute_end_time", ""), - }, - ) - - logger.info( - "Saved segment %s (%s) for version %s - text: '%s...', end_time: %s", - segment_id, - "new" if is_new else "updated", - version_id, - text[:30] if len(text) > 30 else text, - segment_data.get("absolute_end_time", ""), - ) - logger.debug( - "Full segment %s (%s) for version %s", - segment_id, - "new" if is_new else "updated", - version_id, - ) - except Exception as e: - logger.exception("Failed to save segment: %s", e) + except Exception: + logger.exception("Failed to upsert segment %s", segment_id) + + # Broadcast the raw Vexa shape with DNA envelope fields. + # Frontend TranscriptManager.handleMessage() consumes this directly. + await self.event_publisher.ws_manager.broadcast( + { + "type": "transcript", + "speaker": speaker, + "confirmed": confirmed, + "pending": pending, + "playlist_id": playlist_id, + "version_id": version_id, + "ts": ts, + } + ) async def on_transcription_completed(self, payload: dict[str, Any]) -> None: """Handle transcription completion.""" diff --git a/backend/src/main.py b/backend/src/main.py index 8249789c..04f17f08 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -362,6 +362,25 @@ async def health(): return {"status": "healthy"} +@app.post( + "/test/broadcast-transcript", + tags=["Testing"], + summary="Broadcast a synthetic transcript (dev-only).", + include_in_schema=False, +) +async def test_broadcast_transcript(payload: dict) -> dict: + """Dev-only endpoint for tests-vm/. Gated by DNA_TESTING_ENABLED=true. + + Forwards the JSON body verbatim to every WebSocket client — lets us + assert the broadcast shape end-to-end without needing a real meeting. + """ + if os.getenv("DNA_TESTING_ENABLED", "false").lower() not in ("1", "true", "yes"): + raise HTTPException(status_code=404, detail="Not found") + publisher = get_event_publisher() + await publisher.ws_manager.broadcast(payload) + return {"broadcasted": True, "clients": publisher.ws_manager.connection_count} + + MOCK_THUMBNAILS_DIR = ( Path(__file__).parent / "dna" / "prodtrack_providers" / "mock_data" / "thumbnails" ) diff --git a/frontend/packages/app/package.json b/frontend/packages/app/package.json index 2f433264..88b9b0af 100644 --- a/frontend/packages/app/package.json +++ b/frontend/packages/app/package.json @@ -28,6 +28,7 @@ "@tiptap/starter-kit": "^3.15.3", "@types/turndown": "^5.0.6", "@uiw/react-md-editor": "^4.0.11", + "@vexaai/transcript-rendering": "^0.4.0", "lucide-react": "^0.562.0", "react": "^19.1.1", "react-dom": "^19.1.1", diff --git a/frontend/packages/app/src/hooks/index.ts b/frontend/packages/app/src/hooks/index.ts index 0be8533b..b9846a60 100644 --- a/frontend/packages/app/src/hooks/index.ts +++ b/frontend/packages/app/src/hooks/index.ts @@ -29,8 +29,9 @@ export { useMultipleEventSubscriptions, useConnectionStatus, useSegmentEvents, + useTranscriptEvents, } from './useDNAEvents'; -export type { SegmentEvent } from './useDNAEvents'; +export type { SegmentEvent, TranscriptEventPayload } from './useDNAEvents'; export { useSegments } from './useSegments'; export type { UseSegmentsOptions, UseSegmentsResult } from './useSegments'; diff --git a/frontend/packages/app/src/hooks/useAISuggestion.ts b/frontend/packages/app/src/hooks/useAISuggestion.ts index c6a7f38b..44fa56d2 100644 --- a/frontend/packages/app/src/hooks/useAISuggestion.ts +++ b/frontend/packages/app/src/hooks/useAISuggestion.ts @@ -5,10 +5,10 @@ import { type AISuggestionState, type UserSettings, type DNAEvent, - type SegmentEventPayload, + type TranscriptEventPayload, } from '@dna/core'; import { apiHandler } from '../api'; -import { useSegmentEvents } from './useDNAEvents'; +import { useTranscriptEvents } from './useDNAEvents'; export interface UseAISuggestionOptions { playlistId: number | null; @@ -109,9 +109,9 @@ export function useAISuggestion({ prevVersionRef.current = versionId; }, [versionId, playlistId, userEmail, userSettings, isEnabled]); - const handleSegmentEvent = useCallback( + const handleTranscriptEvent = useCallback( // eslint-disable-next-line @typescript-eslint/no-unused-vars - (_event: DNAEvent) => { + (_event: DNAEvent) => { if (!isEnabled || !userSettings?.regenerate_on_transcript_update) { return; } @@ -121,7 +121,7 @@ export function useAISuggestion({ [playlistId, versionId, userEmail, userSettings, isEnabled] ); - useSegmentEvents(handleSegmentEvent, { + useTranscriptEvents(handleTranscriptEvent, { playlistId, versionId, enabled: isEnabled && !!userSettings?.regenerate_on_transcript_update, diff --git a/frontend/packages/app/src/hooks/useDNAEvents.ts b/frontend/packages/app/src/hooks/useDNAEvents.ts index e9e53492..817956b8 100644 --- a/frontend/packages/app/src/hooks/useDNAEvents.ts +++ b/frontend/packages/app/src/hooks/useDNAEvents.ts @@ -4,10 +4,12 @@ import { type DNAEvent, type EventCallback, type SegmentEventPayload, + type TranscriptEventPayload, } from '@dna/core'; import { useEventContext, useEventClient } from '../contexts'; export type { SegmentEventPayload as SegmentEvent }; +export type { TranscriptEventPayload }; interface UseDNAEventsOptions { enabled?: boolean; @@ -86,3 +88,32 @@ export function useSegmentEvents( return unsubscribe; }, [client, filteredCallback, enabled]); } + +export function useTranscriptEvents( + callback: EventCallback, + options: UseDNAEventsOptions & { + playlistId?: number | null; + versionId?: number | null; + } = {} +): void { + const client = useEventClient(); + const { playlistId, versionId, enabled = true } = options; + + const filteredCallback = useCallback( + (event: DNAEvent) => { + if (playlistId != null && event.payload.playlist_id !== playlistId) return; + if (versionId != null && event.payload.version_id !== versionId) return; + callback(event); + }, + [callback, playlistId, versionId] + ); + + useEffect(() => { + if (!enabled || !client) return; + const unsubscribe = client.subscribe( + 'transcript', + filteredCallback + ); + return unsubscribe; + }, [client, filteredCallback, enabled]); +} diff --git a/frontend/packages/app/src/hooks/useSegments.ts b/frontend/packages/app/src/hooks/useSegments.ts index c19d96d5..ee2cf46d 100644 --- a/frontend/packages/app/src/hooks/useSegments.ts +++ b/frontend/packages/app/src/hooks/useSegments.ts @@ -1,12 +1,17 @@ -import { useCallback } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { useQuery, useQueryClient } from '@tanstack/react-query'; import { StoredSegment, type DNAEvent, - type SegmentEventPayload, + type TranscriptEventPayload, } from '@dna/core'; +import { + createTranscriptManager, + type TranscriptManager, + type TranscriptMessage, +} from '@vexaai/transcript-rendering'; import { apiHandler } from '../api'; -import { useSegmentEvents } from './useDNAEvents'; +import { useEventSubscription } from './useDNAEvents'; export interface UseSegmentsOptions { playlistId: number | null; @@ -21,6 +26,14 @@ export interface UseSegmentsResult { error: Error | null; } +/** + * React hook exposing deduplicated transcript segments for a playlist/version. + * + * Single dedup authority: `@vexaai/transcript-rendering`'s `TranscriptManager`. + * - REST bootstrap populates the manager's `confirmed` map. + * - WS `transcript` events (raw Vexa shape forwarded by DNA backend) feed + * `manager.handleMessage()` directly — draft/confirmed distinction preserved. + */ export function useSegments({ playlistId, versionId, @@ -28,71 +41,64 @@ export function useSegments({ }: UseSegmentsOptions): UseSegmentsResult { const queryClient = useQueryClient(); const isEnabled = enabled && playlistId != null && versionId != null; + const queryKey = useMemo( + () => ['segments', playlistId, versionId], + [playlistId, versionId] + ); + + // One manager per (playlist, version); reset on change. + const managerRef = useRef | null>(null); + if (managerRef.current === null) { + managerRef.current = createTranscriptManager(); + } + useEffect(() => { + managerRef.current?.clear(); + }, [playlistId, versionId]); - const queryKey = ['segments', playlistId, versionId]; + const [liveSegments, setLiveSegments] = useState(null); const { data, isLoading, isError, error } = useQuery({ queryKey, - queryFn: () => - apiHandler.getSegmentsForVersion({ + queryFn: async () => { + const rest = await apiHandler.getSegmentsForVersion({ playlistId: playlistId!, versionId: versionId!, - }), + }); + const bootstrapped = managerRef.current!.bootstrap(rest); + setLiveSegments(bootstrapped); + return bootstrapped; + }, enabled: isEnabled, staleTime: 30000, }); - const handleSegmentEvent = useCallback( - (event: DNAEvent) => { - const segmentData = event.payload; - - queryClient.setQueryData(queryKey, (oldData) => { - if (!oldData) return oldData; - - const existingIndex = oldData.findIndex( - (s) => s.segment_id === segmentData.segment_id - ); - - const updatedSegment: StoredSegment = { - id: segmentData.segment_id, - segment_id: segmentData.segment_id, - playlist_id: segmentData.playlist_id, - version_id: segmentData.version_id, - text: segmentData.text, - speaker: segmentData.speaker, - absolute_start_time: segmentData.absolute_start_time, - absolute_end_time: segmentData.absolute_end_time || '', - created_at: new Date().toISOString(), - updated_at: new Date().toISOString(), - }; - - if (existingIndex >= 0) { - const newData = [...oldData]; - newData[existingIndex] = { - ...oldData[existingIndex], - ...updatedSegment, - updated_at: new Date().toISOString(), - }; - return newData; - } else { - const newData = [...oldData, updatedSegment]; - return newData.sort((a, b) => - a.absolute_start_time.localeCompare(b.absolute_start_time) - ); - } - }); + const handleTranscript = useCallback( + (event: DNAEvent) => { + const payload = event.payload; + if (playlistId != null && payload.playlist_id !== playlistId) return; + if (versionId != null && payload.version_id !== versionId) return; + const message: TranscriptMessage = { + type: 'transcript', + speaker: payload.speaker, + confirmed: (payload.confirmed ?? []) as StoredSegment[], + pending: (payload.pending ?? []) as StoredSegment[], + ts: payload.ts, + }; + const next = managerRef.current!.handleMessage(message); + if (next) { + setLiveSegments(next); + queryClient.setQueryData(queryKey, next); + } }, - [queryClient, queryKey] + [queryClient, queryKey, playlistId, versionId] ); - useSegmentEvents(handleSegmentEvent, { - playlistId, - versionId, + useEventSubscription('transcript', handleTranscript, { enabled: isEnabled, }); return { - segments: data ?? [], + segments: liveSegments ?? data ?? [], isLoading, isError, error: error ?? null, diff --git a/frontend/packages/core/src/eventClient.ts b/frontend/packages/core/src/eventClient.ts index 7ed1e6f5..668d1325 100644 --- a/frontend/packages/core/src/eventClient.ts +++ b/frontend/packages/core/src/eventClient.ts @@ -1,4 +1,5 @@ export type EventType = + | 'transcript' | 'segment.created' | 'segment.updated' | 'playlist.updated' @@ -22,6 +23,23 @@ export interface SegmentEventPayload { absolute_end_time?: string; } +/** + * Raw transcript message forwarded by the DNA backend, verbatim from + * Vexa's WS contract plus DNA envelope fields (`playlist_id`, `version_id`). + * + * Shape: `{ type: "transcript", speaker, confirmed: [...], pending: [...], + * playlist_id, version_id, ts }` — matches the `TranscriptMessage` + * interface of `@vexaai/transcript-rendering`. + */ +export interface TranscriptEventPayload { + speaker?: string; + confirmed?: Array>; + pending?: Array>; + playlist_id: number; + version_id: number; + ts?: string; +} + export interface BotStatusEventPayload { platform: string; meeting_id: string; @@ -169,9 +187,17 @@ export class DNAEventClient { private handleMessage(data: string): void { try { - const message = JSON.parse(data) as { type: string; payload: unknown }; + const message = JSON.parse(data) as Record & { + type: string; + }; const eventType = message.type as EventType; - const payload = message.payload; + + // Transcript messages are forwarded verbatim from Vexa — the whole + // message object (including confirmed/pending/speaker/…) IS the payload + // so `TranscriptManager.handleMessage()` can consume it directly. + // All other events follow the classic `{type, payload}` envelope. + const payload = + eventType === 'transcript' ? message : (message as { payload: unknown }).payload; const event: DNAEvent = { type: eventType, payload }; diff --git a/frontend/packages/core/src/interfaces.ts b/frontend/packages/core/src/interfaces.ts index 0e46f278..c8bbbdd7 100644 --- a/frontend/packages/core/src/interfaces.ts +++ b/frontend/packages/core/src/interfaces.ts @@ -314,6 +314,9 @@ export interface StoredSegment { text: string; speaker?: string; language?: string; + start_time?: number; + end_time?: number; + completed?: boolean; absolute_start_time: string; absolute_end_time: string; vexa_updated_at?: string; From 3b3409e5ef386b72931cc314861b057adfdcb03e Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 20 Apr 2026 21:22:29 +0300 Subject: [PATCH 2/6] Remove dead segment-event surface + fix/update tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up cleanup to the transcript passthrough: Dead-code removal - EventType.SEGMENT_CREATED / SEGMENT_UPDATED — never emitted after the passthrough lands; drop from the enum. Update /ws docstring in main.py. - SegmentEventPayload, useSegmentEvents, subscribeToSegmentEvents — the frontend reads segments via the new flat `transcript` event only; drop the dead wrappers. Tests - Delete TestOnTranscriptionUpdated + TestSegmentIdGeneration classes in test_transcription_service.py (they referenced the old segments:[...] payload shape and the removed generate_segment_id). - Update `test_forwards_transcript_updated` to use the new confirmed/pending payload shape. - Update test_storage_providers.py: StoredSegmentCreate now requires segment_id. - Swap SEGMENT_CREATED/UPDATED sample event types for TRANSCRIPTION_UPDATED/COMPLETED in test_event_publisher.py and test_websocket.py (sample types used to verify subscribe/publish wiring — behavior unchanged, just picked live EventType values). Full pytest: 256 passing. tests-local Gate still GREEN (7/7). Signed-off-by: DmitryG228 <2280905@gmail.com> --- backend/src/dna/events/event_types.py | 2 - backend/src/main.py | 9 +- backend/tests/test_event_publisher.py | 46 +- backend/tests/test_storage_providers.py | 3 + backend/tests/test_transcription_service.py | 457 +----------------- backend/tests/test_websocket.py | 10 +- frontend/packages/app/src/hooks/index.ts | 3 +- .../packages/app/src/hooks/useDNAEvents.ts | 36 -- frontend/packages/core/src/eventClient.ts | 41 +- 9 files changed, 44 insertions(+), 563 deletions(-) diff --git a/backend/src/dna/events/event_types.py b/backend/src/dna/events/event_types.py index 3d9c7424..2ef77bbf 100644 --- a/backend/src/dna/events/event_types.py +++ b/backend/src/dna/events/event_types.py @@ -9,8 +9,6 @@ class EventType(str, Enum): TRANSCRIPTION_UPDATED = "transcription.updated" TRANSCRIPTION_COMPLETED = "transcription.completed" TRANSCRIPTION_ERROR = "transcription.error" - SEGMENT_CREATED = "segment.created" - SEGMENT_UPDATED = "segment.updated" BOT_STATUS_CHANGED = "bot.status_changed" PLAYLIST_UPDATED = "playlist.updated" VERSION_UPDATED = "version.updated" diff --git a/backend/src/main.py b/backend/src/main.py index 04f17f08..445bdfb1 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -450,12 +450,15 @@ async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time event streaming. Clients connect to this endpoint to receive real-time events such as: - - segment.created / segment.updated: Transcript segment changes + - transcript: Raw Vexa-shaped transcript ticks (flat envelope with + `speaker`, `confirmed`, `pending`, `playlist_id`, `version_id`, `ts`). + Consumed by the frontend `TranscriptManager`. - bot.status_changed: Bot status updates - transcription.completed / transcription.error: Transcription lifecycle events - Events are sent as JSON messages with the format: - {"type": "event.type", "payload": {...}} + Most events use `{"type": "event.type", "payload": {...}}`. The + `transcript` event is flat — the whole message IS the payload so it can + be fed to `TranscriptManager.handleMessage()` without reshaping. """ event_publisher = get_event_publisher() ws_manager = event_publisher.ws_manager diff --git a/backend/tests/test_event_publisher.py b/backend/tests/test_event_publisher.py index c73961c6..265243ab 100644 --- a/backend/tests/test_event_publisher.py +++ b/backend/tests/test_event_publisher.py @@ -126,11 +126,11 @@ async def test_publish_calls_type_subscribers(self): async def callback(event_type, payload): received_events.append((event_type, payload)) - publisher.subscribe(EventType.SEGMENT_CREATED, callback) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data"}) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) assert len(received_events) == 1 - assert received_events[0] == (EventType.SEGMENT_CREATED, {"test": "data"}) + assert received_events[0] == (EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) @pytest.mark.asyncio async def test_publish_does_not_call_other_type_subscribers(self): @@ -141,8 +141,8 @@ async def test_publish_does_not_call_other_type_subscribers(self): async def callback(event_type, payload): received_events.append((event_type, payload)) - publisher.subscribe(EventType.SEGMENT_CREATED, callback) - await publisher.publish(EventType.SEGMENT_UPDATED, {"test": "data"}) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) + await publisher.publish(EventType.TRANSCRIPTION_COMPLETED, {"test": "data"}) assert len(received_events) == 0 @@ -156,12 +156,12 @@ async def callback(event_type, payload): received_events.append((event_type, payload)) publisher.subscribe_all(callback) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data"}) - await publisher.publish(EventType.SEGMENT_UPDATED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_COMPLETED, {"test": "data2"}) assert len(received_events) == 2 - assert received_events[0] == (EventType.SEGMENT_CREATED, {"test": "data"}) - assert received_events[1] == (EventType.SEGMENT_UPDATED, {"test": "data2"}) + assert received_events[0] == (EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + assert received_events[1] == (EventType.TRANSCRIPTION_COMPLETED, {"test": "data2"}) @pytest.mark.asyncio async def test_publish_calls_multiple_subscribers(self): @@ -176,9 +176,9 @@ async def callback1(event_type, payload): async def callback2(event_type, payload): received_events_2.append(payload) - publisher.subscribe(EventType.SEGMENT_CREATED, callback1) - publisher.subscribe(EventType.SEGMENT_CREATED, callback2) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data"}) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback1) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback2) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) assert len(received_events_1) == 1 assert len(received_events_2) == 1 @@ -192,10 +192,10 @@ async def test_subscribe_returns_unsubscribe_function(self): async def callback(event_type, payload): received_events.append(payload) - unsubscribe = publisher.subscribe(EventType.SEGMENT_CREATED, callback) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data1"}) + unsubscribe = publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data1"}) unsubscribe() - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data2"}) assert len(received_events) == 1 assert received_events[0] == {"test": "data1"} @@ -210,9 +210,9 @@ async def callback(event_type, payload): received_events.append(payload) unsubscribe = publisher.subscribe_all(callback) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data1"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data1"}) unsubscribe() - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data2"}) assert len(received_events) == 1 @@ -228,10 +228,10 @@ async def failing_callback(event_type, payload): async def working_callback(event_type, payload): received_events.append(payload) - publisher.subscribe(EventType.SEGMENT_CREATED, failing_callback) - publisher.subscribe(EventType.SEGMENT_CREATED, working_callback) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, failing_callback) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, working_callback) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) assert len(received_events) == 1 @@ -242,11 +242,11 @@ async def test_publish_broadcasts_to_websocket_clients(self): mock_ws = AsyncMock() await publisher.ws_manager.connect(mock_ws) - await publisher.publish(EventType.SEGMENT_CREATED, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) mock_ws.send_text.assert_called_once() sent_message = json.loads(mock_ws.send_text.call_args[0][0]) - assert sent_message["type"] == "segment.created" + assert sent_message["type"] == "transcription.updated" assert sent_message["payload"] == {"test": "data"} @pytest.mark.asyncio @@ -257,7 +257,7 @@ async def test_close_clears_all_subscribers(self): async def callback(event_type, payload): pass - publisher.subscribe(EventType.SEGMENT_CREATED, callback) + publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) publisher.subscribe_all(callback) assert len(publisher._subscribers) > 0 diff --git a/backend/tests/test_storage_providers.py b/backend/tests/test_storage_providers.py index 55ef7199..32b4d22c 100644 --- a/backend/tests/test_storage_providers.py +++ b/backend/tests/test_storage_providers.py @@ -81,6 +81,7 @@ async def test_upsert_segment_raises_not_implemented(self): """Test that upsert_segment raises NotImplementedError.""" provider = StorageProviderBase() data = StoredSegmentCreate( + segment_id="seg-1", text="Hello", speaker="John", absolute_start_time="2024-01-01T00:00:00Z", @@ -661,6 +662,7 @@ async def test_upsert_segment_new(self, provider): provider._client = mock_client data = StoredSegmentCreate( + segment_id="seg-1", text="Hello", speaker="John", absolute_start_time="2024-01-01T00:00:00Z", @@ -707,6 +709,7 @@ async def test_upsert_segment_existing(self, provider): provider._client = mock_client data = StoredSegmentCreate( + segment_id="seg-1", text="Updated text", speaker="John", absolute_start_time="2024-01-01T00:00:00Z", diff --git a/backend/tests/test_transcription_service.py b/backend/tests/test_transcription_service.py index 8183c0f0..5bd23797 100644 --- a/backend/tests/test_transcription_service.py +++ b/backend/tests/test_transcription_service.py @@ -6,7 +6,7 @@ from dna.events import EventType from dna.models.playlist_metadata import PlaylistMetadata -from dna.models.stored_segment import StoredSegment, generate_segment_id +from dna.models.stored_segment import StoredSegment from dna.transcription_service import TranscriptionService @@ -125,419 +125,6 @@ async def test_handles_provider_not_initialized(self, service, caplog): assert "Transcription provider not initialized" in caplog.text -class TestOnTranscriptionUpdated: - """Tests for transcript segment processing.""" - - @pytest.fixture - def sample_vexa_segments(self): - """Sample Vexa transcript.mutable segments.""" - return [ - { - "text": "Hello, this is a test.", - "speaker": "John Doe", - "language": "en", - "absolute_start_time": "2026-01-23T04:00:00.000Z", - "absolute_end_time": "2026-01-23T04:00:05.000Z", - "updated_at": "2026-01-23T04:00:05.000Z", - }, - { - "text": "This is another segment.", - "speaker": "Jane Smith", - "language": "en", - "absolute_start_time": "2026-01-23T04:00:05.000Z", - "absolute_end_time": "2026-01-23T04:00:10.000Z", - "updated_at": "2026-01-23T04:00:10.000Z", - }, - ] - - @pytest.fixture - def sample_metadata(self): - """Sample playlist metadata with in_review version.""" - return PlaylistMetadata( - _id="meta123", - playlist_id=42, - in_review=5, - meeting_id="abc-def-ghi", - platform="google_meet", - vexa_meeting_id=123, - ) - - @pytest.mark.asyncio - async def test_saves_segments_to_storage( - self, service, mock_storage_provider, sample_vexa_segments, sample_metadata - ): - """Test that segments are saved to storage.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - assert mock_storage_provider.upsert_segment.call_count == 2 - - @pytest.mark.asyncio - async def test_publishes_segment_created_event( - self, - service, - mock_storage_provider, - mock_event_publisher, - sample_vexa_segments, - sample_metadata, - ): - """Test that SEGMENT_CREATED event is published for new segments.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [sample_vexa_segments[0]], - } - - await service.on_transcription_updated(payload) - - mock_event_publisher.publish.assert_called() - call_args = mock_event_publisher.publish.call_args_list[0] - assert call_args[0][0] == EventType.SEGMENT_CREATED - assert call_args[0][1]["text"] == "Hello, this is a test." - assert call_args[0][1]["speaker"] == "John Doe" - - @pytest.mark.asyncio - async def test_publishes_segment_updated_event( - self, - service, - mock_storage_provider, - mock_event_publisher, - sample_vexa_segments, - sample_metadata, - ): - """Test that SEGMENT_UPDATED event is published for existing segments.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - False, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [sample_vexa_segments[0]], - } - - await service.on_transcription_updated(payload) - - mock_event_publisher.publish.assert_called() - call_args = mock_event_publisher.publish.call_args_list[0] - assert call_args[0][0] == EventType.SEGMENT_UPDATED - - @pytest.mark.asyncio - async def test_generates_correct_segment_id( - self, service, mock_storage_provider, sample_vexa_segments, sample_metadata - ): - """Test that segment ID is generated correctly.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [sample_vexa_segments[0]], - } - - await service.on_transcription_updated(payload) - - expected_segment_id = generate_segment_id( - playlist_id=42, - version_id=5, - absolute_start_time="2026-01-23T04:00:00.000Z", - ) - - call_kwargs = mock_storage_provider.upsert_segment.call_args.kwargs - assert call_kwargs["segment_id"] == expected_segment_id - - @pytest.mark.asyncio - async def test_skips_empty_text_segments( - self, service, mock_storage_provider, sample_metadata - ): - """Test that segments with empty text are skipped.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [ - { - "text": "", - "speaker": "John Doe", - "absolute_start_time": "2026-01-23T04:00:00.000Z", - }, - { - "text": " ", - "speaker": "Jane Smith", - "absolute_start_time": "2026-01-23T04:00:05.000Z", - }, - ], - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.upsert_segment.assert_not_called() - - @pytest.mark.asyncio - async def test_skips_segments_without_start_time( - self, service, mock_storage_provider, sample_metadata - ): - """Test that segments without absolute_start_time are skipped.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [ - { - "text": "Hello world", - "speaker": "John Doe", - }, - ], - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.upsert_segment.assert_not_called() - - @pytest.mark.asyncio - async def test_handles_missing_playlist_mapping( - self, service, mock_storage_provider, sample_vexa_segments, caplog - ): - """Test handling when playlist mapping is not found.""" - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.upsert_segment.assert_not_called() - assert "No playlist_id found for meeting" in caplog.text - - @pytest.mark.asyncio - async def test_handles_missing_in_review_version( - self, service, mock_storage_provider, sample_vexa_segments, caplog - ): - """Test handling when in_review version is not set.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( - _id="meta123", - playlist_id=42, - in_review=None, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.upsert_segment.assert_not_called() - assert "No in_review version found" in caplog.text - - @pytest.mark.asyncio - async def test_handles_empty_segments_list(self, service, mock_storage_provider): - """Test handling when segments list is empty.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [], - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.get_playlist_metadata.assert_not_called() - - @pytest.mark.asyncio - async def test_uses_default_speaker_when_missing( - self, service, mock_storage_provider, mock_event_publisher, sample_metadata - ): - """Test that 'Unknown' is used as default speaker.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": [ - { - "text": "Hello world", - "absolute_start_time": "2026-01-23T04:00:00.000Z", - "absolute_end_time": "2026-01-23T04:00:05.000Z", - }, - ], - } - - await service.on_transcription_updated(payload) - - call_kwargs = mock_storage_provider.upsert_segment.call_args.kwargs - assert call_kwargs["data"].speaker == "Unknown" - - @pytest.mark.asyncio - async def test_skips_segments_when_transcription_paused( - self, service, mock_storage_provider, sample_vexa_segments, caplog - ): - """Test that segments are not saved when transcription is paused.""" - import logging - - caplog.set_level(logging.DEBUG) - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - paused_metadata = PlaylistMetadata( - _id="meta123", - playlist_id=42, - in_review=5, - meeting_id="abc-def-ghi", - platform="google_meet", - vexa_meeting_id=123, - transcription_paused=True, - ) - mock_storage_provider.get_playlist_metadata.return_value = paused_metadata - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - mock_storage_provider.upsert_segment.assert_not_called() - assert "Transcription paused for playlist" in caplog.text - - @pytest.mark.asyncio - async def test_saves_segments_when_transcription_not_paused( - self, service, mock_storage_provider, sample_vexa_segments, sample_metadata - ): - """Test that segments are saved when transcription is not paused.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - assert mock_storage_provider.upsert_segment.call_count == 2 - - @pytest.mark.asyncio - async def test_skips_segments_before_resume_time( - self, service, mock_storage_provider, caplog - ): - """Test that segments from before the resume time are skipped.""" - import logging - from datetime import datetime, timezone - - caplog.set_level(logging.DEBUG) - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - - resumed_at = datetime(2026, 1, 23, 4, 0, 10, tzinfo=timezone.utc) - resumed_metadata = PlaylistMetadata( - _id="meta123", - playlist_id=42, - in_review=5, - meeting_id="abc-def-ghi", - platform="google_meet", - transcription_paused=False, - transcription_resumed_at=resumed_at, - ) - mock_storage_provider.get_playlist_metadata.return_value = resumed_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - segments = [ - { - "text": "Before pause - should be skipped", - "speaker": "John", - "absolute_start_time": "2026-01-23T04:00:05.000Z", - "absolute_end_time": "2026-01-23T04:00:08.000Z", - }, - { - "text": "After resume - should be saved", - "speaker": "Jane", - "absolute_start_time": "2026-01-23T04:00:15.000Z", - "absolute_end_time": "2026-01-23T04:00:20.000Z", - }, - ] - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": segments, - } - - await service.on_transcription_updated(payload) - - assert mock_storage_provider.upsert_segment.call_count == 1 - call_kwargs = mock_storage_provider.upsert_segment.call_args.kwargs - assert call_kwargs["data"].text == "After resume - should be saved" - assert "Skipping segment from before resume" in caplog.text - - @pytest.mark.asyncio - async def test_saves_all_segments_when_no_resume_time( - self, service, mock_storage_provider, sample_vexa_segments, sample_metadata - ): - """Test that all segments are saved when there is no resume time.""" - service._meeting_to_playlist["google_meet:abc-def-ghi"] = 42 - mock_storage_provider.get_playlist_metadata.return_value = sample_metadata - mock_storage_provider.upsert_segment.return_value = ( - MagicMock(spec=StoredSegment), - True, - ) - - payload = { - "platform": "google_meet", - "meeting_id": "abc-def-ghi", - "segments": sample_vexa_segments, - } - - await service.on_transcription_updated(payload) - - assert mock_storage_provider.upsert_segment.call_count == 2 - class TestOnVexaEvent: """Tests for Vexa event forwarding.""" @@ -548,7 +135,10 @@ async def test_forwards_transcript_updated(self, service, mock_event_publisher): payload = { "platform": "google_meet", "meeting_id": "abc-def-ghi", - "segments": [], + "speaker": "Alice", + "confirmed": [], + "pending": [], + "ts": "2026-01-23T04:00:05.000Z", } await service._on_vexa_event("transcript.updated", payload) @@ -964,43 +554,6 @@ async def test_publishes_recovery_status_for_each_active_bot( assert mock_event_publisher.publish.call_count == 2 -class TestSegmentIdGeneration: - """Tests for segment ID generation consistency.""" - - def test_same_inputs_generate_same_id(self): - """Test that identical inputs generate the same segment ID.""" - id1 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - id2 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - assert id1 == id2 - - def test_different_start_time_generates_different_id(self): - """Test that different start times generate different IDs.""" - id1 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - id2 = generate_segment_id(42, 5, "2026-01-23T04:00:05.000Z") - assert id1 != id2 - - def test_different_playlist_generates_different_id(self): - """Test that different playlists generate different IDs.""" - id1 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - id2 = generate_segment_id(43, 5, "2026-01-23T04:00:00.000Z") - assert id1 != id2 - - def test_different_version_generates_different_id(self): - """Test that different versions generate different IDs.""" - id1 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - id2 = generate_segment_id(42, 6, "2026-01-23T04:00:00.000Z") - assert id1 != id2 - - def test_speaker_changes_do_not_affect_id(self): - """Test that the same start time generates the same ID regardless of speaker. - - This is important because Vexa's mutable transcription can reassign - speakers as it refines the transcript. - """ - id1 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - id2 = generate_segment_id(42, 5, "2026-01-23T04:00:00.000Z") - assert id1 == id2 - class TestTranscriptionServiceLifecycle: """Tests for TranscriptionService initialization and cleanup.""" diff --git a/backend/tests/test_websocket.py b/backend/tests/test_websocket.py index 49beabb6..f6c1dd63 100644 --- a/backend/tests/test_websocket.py +++ b/backend/tests/test_websocket.py @@ -48,7 +48,7 @@ def test_websocket_receives_published_events(self): loop = asyncio.new_event_loop() loop.run_until_complete( publisher.publish( - EventType.SEGMENT_CREATED, + EventType.TRANSCRIPTION_UPDATED, { "segment_id": "abc123", "text": "Hello world", @@ -59,7 +59,7 @@ def test_websocket_receives_published_events(self): loop.close() data = websocket.receive_json() - assert data["type"] == "segment.created" + assert data["type"] == "transcription.updated" assert data["payload"]["segment_id"] == "abc123" assert data["payload"]["text"] == "Hello world" @@ -101,7 +101,7 @@ def test_multiple_websocket_clients_receive_events(self): loop = asyncio.new_event_loop() loop.run_until_complete( publisher.publish( - EventType.SEGMENT_UPDATED, + EventType.TRANSCRIPTION_COMPLETED, {"segment_id": "xyz", "text": "Updated text"}, ) ) @@ -110,7 +110,7 @@ def test_multiple_websocket_clients_receive_events(self): data1 = ws1.receive_json() data2 = ws2.receive_json() - assert data1["type"] == "segment.updated" - assert data2["type"] == "segment.updated" + assert data1["type"] == "transcription.completed" + assert data2["type"] == "transcription.completed" assert data1["payload"]["segment_id"] == "xyz" assert data2["payload"]["segment_id"] == "xyz" diff --git a/frontend/packages/app/src/hooks/index.ts b/frontend/packages/app/src/hooks/index.ts index b9846a60..100da5b5 100644 --- a/frontend/packages/app/src/hooks/index.ts +++ b/frontend/packages/app/src/hooks/index.ts @@ -28,10 +28,9 @@ export { useEventSubscription, useMultipleEventSubscriptions, useConnectionStatus, - useSegmentEvents, useTranscriptEvents, } from './useDNAEvents'; -export type { SegmentEvent, TranscriptEventPayload } from './useDNAEvents'; +export type { TranscriptEventPayload } from './useDNAEvents'; export { useSegments } from './useSegments'; export type { UseSegmentsOptions, UseSegmentsResult } from './useSegments'; diff --git a/frontend/packages/app/src/hooks/useDNAEvents.ts b/frontend/packages/app/src/hooks/useDNAEvents.ts index 817956b8..bb035bb0 100644 --- a/frontend/packages/app/src/hooks/useDNAEvents.ts +++ b/frontend/packages/app/src/hooks/useDNAEvents.ts @@ -3,12 +3,10 @@ import { type EventType, type DNAEvent, type EventCallback, - type SegmentEventPayload, type TranscriptEventPayload, } from '@dna/core'; import { useEventContext, useEventClient } from '../contexts'; -export type { SegmentEventPayload as SegmentEvent }; export type { TranscriptEventPayload }; interface UseDNAEventsOptions { @@ -55,40 +53,6 @@ export function useConnectionStatus(): { return { isConnected, connectionError }; } -export function useSegmentEvents( - callback: EventCallback, - options: UseDNAEventsOptions & { - playlistId?: number | null; - versionId?: number | null; - } = {} -): void { - const client = useEventClient(); - const { playlistId, versionId, enabled = true } = options; - - const filteredCallback = useCallback( - (event: DNAEvent) => { - if (playlistId != null && event.payload.playlist_id !== playlistId) { - return; - } - if (versionId != null && event.payload.version_id !== versionId) { - return; - } - callback(event); - }, - [callback, playlistId, versionId] - ); - - useEffect(() => { - if (!enabled || !client) return; - - const unsubscribe = client.subscribeMultiple( - ['segment.created', 'segment.updated'], - filteredCallback - ); - return unsubscribe; - }, [client, filteredCallback, enabled]); -} - export function useTranscriptEvents( callback: EventCallback, options: UseDNAEventsOptions & { diff --git a/frontend/packages/core/src/eventClient.ts b/frontend/packages/core/src/eventClient.ts index 668d1325..2c8a8467 100644 --- a/frontend/packages/core/src/eventClient.ts +++ b/frontend/packages/core/src/eventClient.ts @@ -1,7 +1,5 @@ export type EventType = | 'transcript' - | 'segment.created' - | 'segment.updated' | 'playlist.updated' | 'version.updated' | 'bot.status_changed' @@ -13,16 +11,6 @@ export interface DNAEvent { payload: T; } -export interface SegmentEventPayload { - segment_id: string; - playlist_id: number; - version_id: number; - text: string; - speaker?: string; - absolute_start_time: string; - absolute_end_time?: string; -} - /** * Raw transcript message forwarded by the DNA backend, verbatim from * Vexa's WS contract plus DNA envelope fields (`playlist_id`, `version_id`). @@ -197,7 +185,7 @@ export class DNAEventClient { // so `TranscriptManager.handleMessage()` can consume it directly. // All other events follow the classic `{type, payload}` envelope. const payload = - eventType === 'transcript' ? message : (message as { payload: unknown }).payload; + eventType === 'transcript' ? message : (message as unknown as { payload: unknown }).payload; const event: DNAEvent = { type: eventType, payload }; @@ -272,33 +260,6 @@ export class DNAEventClient { }; } - subscribeToSegmentEvents( - callback: EventCallback, - filter?: { playlistId?: number; versionId?: number } - ): () => void { - const filteredCallback = (event: DNAEvent) => { - const payload = event.payload; - if ( - filter?.playlistId != null && - payload.playlist_id !== filter.playlistId - ) { - return; - } - if ( - filter?.versionId != null && - payload.version_id !== filter.versionId - ) { - return; - } - callback(event); - }; - - return this.subscribeMultiple( - ['segment.created', 'segment.updated'], - filteredCallback - ); - } - subscribeToBotStatusEvents( callback: EventCallback, filter?: { platform?: string; meetingId?: string } From 3131bf5c9dbc6323a44873c69844d0745a7f26d9 Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 20 Apr 2026 21:36:03 +0300 Subject: [PATCH 3/6] Add compound unique MongoDB index on segments upsert key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `upsert_segment` keys on {segment_id, playlist_id, version_id}. Without an index, each upsert does a full collection scan, which only matters once the collection grows — but growth is fast (refine-heavy write rate on a live meeting), so the fix belongs in the same ship as the upsert refactor. - mongodb.py: `ensure_indexes()` creates a unique compound index segments_upsert_key {segment_id, playlist_id, version_id} and a supporting index segments_list_by_version {playlist_id, version_id, absolute_start_time} for the REST bootstrap query. - main.py: call `storage.ensure_indexes()` in the FastAPI startup hook (guarded by hasattr so tests that mock the provider stay happy). Idempotent — safe to call on every container start. Signed-off-by: DmitryG228 <2280905@gmail.com> --- backend/src/dna/storage_providers/mongodb.py | 22 ++++++++++++++++++++ backend/src/main.py | 4 ++++ 2 files changed, 26 insertions(+) diff --git a/backend/src/dna/storage_providers/mongodb.py b/backend/src/dna/storage_providers/mongodb.py index 74f0baba..ddb8bfa7 100644 --- a/backend/src/dna/storage_providers/mongodb.py +++ b/backend/src/dna/storage_providers/mongodb.py @@ -21,6 +21,28 @@ class MongoDBStorageProvider(StorageProviderBase): def __init__(self) -> None: self._client: Optional[AsyncMongoClient[Any]] = None + self._indexes_ensured = False + + async def ensure_indexes(self) -> None: + """Create collection indexes. Idempotent; safe to call on every startup. + + The compound unique index on the `segments` upsert key makes + `upsert_segment` O(log n) instead of a full-collection scan — at + Vexa's refine-heavy write rate, scans become user-visible at ~100k + segments and timeouts at ~1M. + """ + if self._indexes_ensured: + return + await self.segments_collection.create_index( + [("segment_id", 1), ("playlist_id", 1), ("version_id", 1)], + unique=True, + name="segments_upsert_key", + ) + await self.segments_collection.create_index( + [("playlist_id", 1), ("version_id", 1), ("absolute_start_time", 1)], + name="segments_list_by_version", + ) + self._indexes_ensured = True @property def client(self) -> AsyncMongoClient[Any]: diff --git a/backend/src/main.py b/backend/src/main.py index 445bdfb1..8bb5eb1b 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -323,6 +323,10 @@ async def startup_event(): """Initialize services on startup.""" service = get_transcription_service() await service.init_providers() + storage = service.storage_provider + ensure_indexes = getattr(storage, "ensure_indexes", None) + if callable(ensure_indexes): + await ensure_indexes() await service.resubscribe_to_active_meetings() From 688c3a6776ceb561db2a5f420758bb457980c92e Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 20 Apr 2026 21:47:24 +0300 Subject: [PATCH 4/6] TranscriptPanel: render pending segments subtle + dedup speaker labels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the Vexa dashboard's two render-time polish cues now that `useSegments` yields the raw Vexa shape (including `completed` per segment and contiguous same-speaker runs via `TranscriptManager`): - Pending segments (`completed === false`, i.e. draft ticks arriving in the WS `pending[]` array) render with muted color, italic, and 0.75 opacity — same visual semantics as Vexa's `text-muted-foreground/70 italic` in services/dashboard/src/components/transcript/transcript-segment.tsx. - Consecutive segments from the same speaker no longer repeat the name + timestamp header. The first segment of a run carries the header; the rest pad tightly to read as one block. Mirrors `showSpeakerHeader` in Vexa's transcript-viewer. Render-time only — no changes to the manager, backend, schema, or the raw WS envelope. Signed-off-by: DmitryG228 <2280905@gmail.com> --- .../app/src/components/TranscriptPanel.tsx | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/frontend/packages/app/src/components/TranscriptPanel.tsx b/frontend/packages/app/src/components/TranscriptPanel.tsx index 59993368..9c5bc7c5 100644 --- a/frontend/packages/app/src/components/TranscriptPanel.tsx +++ b/frontend/packages/app/src/components/TranscriptPanel.tsx @@ -1,5 +1,5 @@ import { useEffect, useRef } from 'react'; -import styled from 'styled-components'; +import styled, { css } from 'styled-components'; import { Loader2, MessageSquare, AlertCircle } from 'lucide-react'; import { useSegments } from '../hooks'; import { useConnectionStatus } from '../hooks/useDNAEvents'; @@ -22,20 +22,19 @@ const SegmentList = styled.div` padding: 8px 0; `; -const SegmentItem = styled.div` - padding: 12px 16px; - border-bottom: 1px solid ${({ theme }) => theme.colors.border.subtle}; - - &:last-child { - border-bottom: none; - } +// Speaker-runs are visually grouped: top padding on the first segment of a run +// (showSpeakerHeader=true) is a bit larger; continuation segments have almost +// no vertical padding so they read as one block. +const SegmentItem = styled.div<{ $showSpeakerHeader: boolean }>` + padding: ${({ $showSpeakerHeader }) => + $showSpeakerHeader ? '10px 16px 2px' : '0 16px 2px'}; `; const SegmentHeader = styled.div` display: flex; justify-content: space-between; align-items: center; - margin-bottom: 4px; + margin-bottom: 2px; `; const SpeakerName = styled.span` @@ -49,11 +48,18 @@ const Timestamp = styled.span` color: ${({ theme }) => theme.colors.text.muted}; `; -const SegmentText = styled.p` +const pendingStyle = css` + color: ${({ theme }) => theme.colors.text.muted}; + font-style: italic; + opacity: 0.75; +`; + +const SegmentText = styled.p<{ $pending: boolean }>` margin: 0; font-size: 14px; line-height: 1.5; color: ${({ theme }) => theme.colors.text.secondary}; + ${({ $pending }) => $pending && pendingStyle} `; const StateContainer = styled.div` @@ -167,15 +173,30 @@ export function TranscriptPanel({ {isConnected ? 'Live' : 'Reconnecting...'} • {segments.length} segments - {segments.map((segment) => ( - - - {segment.speaker || 'Unknown'} - {formatTime(segment.absolute_start_time)} - - {segment.text} - - ))} + {segments.map((segment, idx) => { + // Deduplicate speaker labels: only show the speaker header on the + // first segment of a contiguous same-speaker run. Matches Vexa + // dashboard's `showSpeakerHeader` behaviour. + const prev = idx > 0 ? segments[idx - 1] : null; + const showSpeakerHeader = !prev || prev.speaker !== segment.speaker; + const isPending = segment.completed === false; + return ( + + {showSpeakerHeader && ( + + {segment.speaker || 'Unknown'} + + {formatTime(segment.absolute_start_time)} + + + )} + {segment.text} + + ); + })} ); From b68cbcb1bd1ed9121d75a444d87379e31b94fcbd Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 20 Apr 2026 22:21:52 +0300 Subject: [PATCH 5/6] Fix WS-vs-REST race on version switch + remove legacy transcription.updated MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two issues reported from the live deploy: - Sometimes only fresh WS transcripts showed after a version switch; the REST-loaded historical segments appeared to vanish. - Every Vexa tick produced two WS frames (`transcription.updated` wrapped + `transcript` flat); nothing subscribes to the wrapped one. useSegments — fix the race - `manager.bootstrap(rest)` in the queryFn CLEARS confirmed + pending maps; any WS tick that landed during the REST fetch was wiped. Swap it for the additive tick path (`manager.handleMessage({type:'transcript', confirmed: rest, pending: []})`) so REST + WS converge on the same confirmed map keyed by Vexa's `segment_id`. - Seed the manager from React Query's cache during the version-change effect. Without this, a revisit to a version within `staleTime` skipped the queryFn entirely — the cleared manager never got the REST data, so new WS ticks sat alone on an empty base. Now the cached REST list is tick-merged into the manager on mount, before WS ticks start. - Capture the queryKey's playlist/version at queryFn start; if the user switched while we were fetching, return the raw REST (valid cache for the OLD queryKey) instead of contaminating it with the current manager state (which belongs to the NEW version). Dead-code cleanup — `transcription.updated` - `_on_vexa_event` no longer publishes `TRANSCRIPTION_UPDATED`; the flat `{type:"transcript", ...}` broadcast from `on_transcription_updated` carries the full payload and is what the frontend consumes. Nothing subscribed to the wrapped event. - Trim `EventType` enum to the 3 values actually emitted: BOT_STATUS_CHANGED, TRANSCRIPTION_COMPLETED, TRANSCRIPTION_ERROR. Remove unused TRANSCRIPTION_SUBSCRIBE, TRANSCRIPTION_STARTED, TRANSCRIPTION_UPDATED, PLAYLIST_UPDATED, VERSION_UPDATED, DRAFT_NOTE_UPDATED. - Trim frontend `EventType` union to match (removes `'playlist.updated'`, `'version.updated'`). - `test_forwards_transcript_updated` rewritten to verify `on_transcription_updated` is called (the actual behaviour). - Sample event types in test_event_publisher + test_websocket retargeted to live enum values so the subscribe/publish mechanism stays exercised. Full pytest: 256 passing. Local Gate: GREEN. Signed-off-by: DmitryG228 <2280905@gmail.com> --- backend/src/dna/events/event_types.py | 15 ++-- backend/src/dna/transcription_service.py | 9 +-- backend/tests/test_event_publisher.py | 44 ++++++------ backend/tests/test_transcription_service.py | 15 ++-- backend/tests/test_websocket.py | 10 +-- .../packages/app/src/hooks/useSegments.ts | 70 +++++++++++++++---- frontend/packages/core/src/eventClient.ts | 2 - 7 files changed, 106 insertions(+), 59 deletions(-) diff --git a/backend/src/dna/events/event_types.py b/backend/src/dna/events/event_types.py index 2ef77bbf..eb0c2f03 100644 --- a/backend/src/dna/events/event_types.py +++ b/backend/src/dna/events/event_types.py @@ -1,15 +1,16 @@ -"""Event type definitions.""" +"""Event type definitions. + +Only events that are actually emitted AND consumed live here. The flat +`transcript` frame doesn't go through this enum — it's broadcast directly +via `EventPublisher.ws_manager.broadcast(...)` because its envelope is +shaped by the Vexa contract, not by the `{type, payload}` wrapper this +enum drives. +""" from enum import Enum class EventType(str, Enum): - TRANSCRIPTION_SUBSCRIBE = "transcription.subscribe" - TRANSCRIPTION_STARTED = "transcription.started" - TRANSCRIPTION_UPDATED = "transcription.updated" TRANSCRIPTION_COMPLETED = "transcription.completed" TRANSCRIPTION_ERROR = "transcription.error" BOT_STATUS_CHANGED = "bot.status_changed" - PLAYLIST_UPDATED = "playlist.updated" - VERSION_UPDATED = "version.updated" - DRAFT_NOTE_UPDATED = "draft_note.updated" diff --git a/backend/src/dna/transcription_service.py b/backend/src/dna/transcription_service.py index 0ff794bf..c8cfa583 100644 --- a/backend/src/dna/transcription_service.py +++ b/backend/src/dna/transcription_service.py @@ -155,10 +155,11 @@ async def _on_vexa_event(self, event_type: str, payload: dict[str, Any]) -> None return if event_type == "transcript.updated": - await self.event_publisher.publish( - EventType.TRANSCRIPTION_UPDATED, - payload, - ) + # The service persists confirmed segments + broadcasts the flat + # `{type:"transcript", ...}` shape directly from + # `on_transcription_updated`. No need to also emit + # TRANSCRIPTION_UPDATED through the publisher — nothing + # subscribes to it and frontends only consume the flat envelope. await self.on_transcription_updated(payload) elif event_type == "bot.status_changed": await self.event_publisher.publish( diff --git a/backend/tests/test_event_publisher.py b/backend/tests/test_event_publisher.py index 265243ab..893c7b27 100644 --- a/backend/tests/test_event_publisher.py +++ b/backend/tests/test_event_publisher.py @@ -126,11 +126,11 @@ async def test_publish_calls_type_subscribers(self): async def callback(event_type, payload): received_events.append((event_type, payload)) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data"}) assert len(received_events) == 1 - assert received_events[0] == (EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + assert received_events[0] == (EventType.TRANSCRIPTION_ERROR, {"test": "data"}) @pytest.mark.asyncio async def test_publish_does_not_call_other_type_subscribers(self): @@ -141,7 +141,7 @@ async def test_publish_does_not_call_other_type_subscribers(self): async def callback(event_type, payload): received_events.append((event_type, payload)) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback) await publisher.publish(EventType.TRANSCRIPTION_COMPLETED, {"test": "data"}) assert len(received_events) == 0 @@ -156,12 +156,12 @@ async def callback(event_type, payload): received_events.append((event_type, payload)) publisher.subscribe_all(callback) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) - await publisher.publish(EventType.TRANSCRIPTION_COMPLETED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data2"}) assert len(received_events) == 2 - assert received_events[0] == (EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) - assert received_events[1] == (EventType.TRANSCRIPTION_COMPLETED, {"test": "data2"}) + assert received_events[0] == (EventType.TRANSCRIPTION_ERROR, {"test": "data"}) + assert received_events[1] == (EventType.TRANSCRIPTION_ERROR, {"test": "data2"}) @pytest.mark.asyncio async def test_publish_calls_multiple_subscribers(self): @@ -176,9 +176,9 @@ async def callback1(event_type, payload): async def callback2(event_type, payload): received_events_2.append(payload) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback1) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback2) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback1) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback2) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data"}) assert len(received_events_1) == 1 assert len(received_events_2) == 1 @@ -192,10 +192,10 @@ async def test_subscribe_returns_unsubscribe_function(self): async def callback(event_type, payload): received_events.append(payload) - unsubscribe = publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data1"}) + unsubscribe = publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data1"}) unsubscribe() - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data2"}) assert len(received_events) == 1 assert received_events[0] == {"test": "data1"} @@ -210,9 +210,9 @@ async def callback(event_type, payload): received_events.append(payload) unsubscribe = publisher.subscribe_all(callback) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data1"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data1"}) unsubscribe() - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data2"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data2"}) assert len(received_events) == 1 @@ -228,10 +228,10 @@ async def failing_callback(event_type, payload): async def working_callback(event_type, payload): received_events.append(payload) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, failing_callback) - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, working_callback) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, failing_callback) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, working_callback) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data"}) assert len(received_events) == 1 @@ -242,11 +242,11 @@ async def test_publish_broadcasts_to_websocket_clients(self): mock_ws = AsyncMock() await publisher.ws_manager.connect(mock_ws) - await publisher.publish(EventType.TRANSCRIPTION_UPDATED, {"test": "data"}) + await publisher.publish(EventType.TRANSCRIPTION_ERROR, {"test": "data"}) mock_ws.send_text.assert_called_once() sent_message = json.loads(mock_ws.send_text.call_args[0][0]) - assert sent_message["type"] == "transcription.updated" + assert sent_message["type"] == "transcription.error" assert sent_message["payload"] == {"test": "data"} @pytest.mark.asyncio @@ -257,7 +257,7 @@ async def test_close_clears_all_subscribers(self): async def callback(event_type, payload): pass - publisher.subscribe(EventType.TRANSCRIPTION_UPDATED, callback) + publisher.subscribe(EventType.TRANSCRIPTION_ERROR, callback) publisher.subscribe_all(callback) assert len(publisher._subscribers) > 0 diff --git a/backend/tests/test_transcription_service.py b/backend/tests/test_transcription_service.py index 5bd23797..d1b17c12 100644 --- a/backend/tests/test_transcription_service.py +++ b/backend/tests/test_transcription_service.py @@ -130,8 +130,14 @@ class TestOnVexaEvent: """Tests for Vexa event forwarding.""" @pytest.mark.asyncio - async def test_forwards_transcript_updated(self, service, mock_event_publisher): - """Test that transcript.updated is forwarded via event publisher.""" + async def test_forwards_transcript_updated(self, service): + """`transcript.updated` must route to on_transcription_updated so the + flat `{type:"transcript", ...}` broadcast happens. The legacy + `TRANSCRIPTION_UPDATED` publish has been removed — nothing + subscribed and the flat envelope carries the full payload.""" + from unittest.mock import AsyncMock + + service.on_transcription_updated = AsyncMock() payload = { "platform": "google_meet", "meeting_id": "abc-def-ghi", @@ -143,10 +149,7 @@ async def test_forwards_transcript_updated(self, service, mock_event_publisher): await service._on_vexa_event("transcript.updated", payload) - mock_event_publisher.publish.assert_called_once_with( - EventType.TRANSCRIPTION_UPDATED, - payload, - ) + service.on_transcription_updated.assert_called_once_with(payload) @pytest.mark.asyncio async def test_forwards_bot_status_changed(self, service, mock_event_publisher): diff --git a/backend/tests/test_websocket.py b/backend/tests/test_websocket.py index f6c1dd63..4ceea841 100644 --- a/backend/tests/test_websocket.py +++ b/backend/tests/test_websocket.py @@ -48,7 +48,7 @@ def test_websocket_receives_published_events(self): loop = asyncio.new_event_loop() loop.run_until_complete( publisher.publish( - EventType.TRANSCRIPTION_UPDATED, + EventType.TRANSCRIPTION_ERROR, { "segment_id": "abc123", "text": "Hello world", @@ -59,7 +59,7 @@ def test_websocket_receives_published_events(self): loop.close() data = websocket.receive_json() - assert data["type"] == "transcription.updated" + assert data["type"] == "transcription.error" assert data["payload"]["segment_id"] == "abc123" assert data["payload"]["text"] == "Hello world" @@ -101,7 +101,7 @@ def test_multiple_websocket_clients_receive_events(self): loop = asyncio.new_event_loop() loop.run_until_complete( publisher.publish( - EventType.TRANSCRIPTION_COMPLETED, + EventType.TRANSCRIPTION_ERROR, {"segment_id": "xyz", "text": "Updated text"}, ) ) @@ -110,7 +110,7 @@ def test_multiple_websocket_clients_receive_events(self): data1 = ws1.receive_json() data2 = ws2.receive_json() - assert data1["type"] == "transcription.completed" - assert data2["type"] == "transcription.completed" + assert data1["type"] == "transcription.error" + assert data2["type"] == "transcription.error" assert data1["payload"]["segment_id"] == "xyz" assert data2["payload"]["segment_id"] == "xyz" diff --git a/frontend/packages/app/src/hooks/useSegments.ts b/frontend/packages/app/src/hooks/useSegments.ts index ee2cf46d..1b29a609 100644 --- a/frontend/packages/app/src/hooks/useSegments.ts +++ b/frontend/packages/app/src/hooks/useSegments.ts @@ -30,9 +30,20 @@ export interface UseSegmentsResult { * React hook exposing deduplicated transcript segments for a playlist/version. * * Single dedup authority: `@vexaai/transcript-rendering`'s `TranscriptManager`. - * - REST bootstrap populates the manager's `confirmed` map. - * - WS `transcript` events (raw Vexa shape forwarded by DNA backend) feed - * `manager.handleMessage()` directly — draft/confirmed distinction preserved. + * + * Load order is designed so WS ticks never get wiped by a late REST response + * and a cached REST response is always merged into the manager before WS + * ticks start stacking on top: + * + * 1. On (playlist, version) change: `manager.clear()`, then seed the manager + * from React Query's existing cache for this queryKey (if any) via the + * additive tick path. + * 2. `useQuery` fetches fresh REST; the response is also merged additively + * (NOT via `bootstrap()`, which clears state). + * 3. WS `transcript` events call `manager.handleMessage()` directly. + * + * All three paths converge on the same confirmed/pending maps, keyed by + * Vexa's stable `segment_id`, so duplicates are impossible. */ export function useSegments({ playlistId, @@ -46,27 +57,60 @@ export function useSegments({ [playlistId, versionId] ); - // One manager per (playlist, version); reset on change. const managerRef = useRef | null>(null); if (managerRef.current === null) { managerRef.current = createTranscriptManager(); } - useEffect(() => { - managerRef.current?.clear(); - }, [playlistId, versionId]); + + const activeKeyRef = useRef(''); + const activeKey = `${playlistId ?? '-'}:${versionId ?? '-'}`; const [liveSegments, setLiveSegments] = useState(null); + // Additive merge: feed confirmed segments into the manager via the tick + // path (which does not clear state), then pull the reconciled array out. + const mergeConfirmed = useCallback((rest: StoredSegment[]): StoredSegment[] => { + const mgr = managerRef.current!; + if (rest && rest.length > 0) { + mgr.handleMessage({ type: 'transcript', confirmed: rest, pending: [] }); + } + return mgr.getSegments(); + }, []); + + // Version change — reset manager, then seed from any cached REST already + // in React Query so WS ticks append onto the historical transcript rather + // than replacing it. + useEffect(() => { + activeKeyRef.current = activeKey; + const mgr = managerRef.current!; + mgr.clear(); + const cached = queryClient.getQueryData(queryKey); + if (cached && cached.length > 0) { + const seeded = mergeConfirmed(cached); + setLiveSegments(seeded); + } else { + setLiveSegments(null); + } + }, [activeKey, queryClient, queryKey, mergeConfirmed]); + const { data, isLoading, isError, error } = useQuery({ queryKey, - queryFn: async () => { + queryFn: async ({ queryKey: qk }) => { + const [, qPlaylistId, qVersionId] = qk as [string, number, number]; + const capturedKey = `${qPlaylistId ?? '-'}:${qVersionId ?? '-'}`; const rest = await apiHandler.getSegmentsForVersion({ - playlistId: playlistId!, - versionId: versionId!, + playlistId: qPlaylistId, + versionId: qVersionId, }); - const bootstrapped = managerRef.current!.bootstrap(rest); - setLiveSegments(bootstrapped); - return bootstrapped; + // If the user switched versions while we were fetching, cache the + // raw REST under the old queryKey (still valid data for that version) + // but don't touch the current manager (it's for a different version). + if (activeKeyRef.current !== capturedKey) { + return rest; + } + const merged = mergeConfirmed(rest); + setLiveSegments(merged); + return merged; }, enabled: isEnabled, staleTime: 30000, diff --git a/frontend/packages/core/src/eventClient.ts b/frontend/packages/core/src/eventClient.ts index 2c8a8467..caabcc7f 100644 --- a/frontend/packages/core/src/eventClient.ts +++ b/frontend/packages/core/src/eventClient.ts @@ -1,7 +1,5 @@ export type EventType = | 'transcript' - | 'playlist.updated' - | 'version.updated' | 'bot.status_changed' | 'transcription.completed' | 'transcription.error'; From 09c157d1d8f341715424762b813b0ece25d1460c Mon Sep 17 00:00:00 2001 From: DmitryG228 <2280905@gmail.com> Date: Mon, 4 May 2026 20:01:38 +0300 Subject: [PATCH 6/6] Fix CI: update WS tests to new transcript contract + cover on_transcription_updated - test_vexa_provider: replace stale `transcript.mutable` payloads with the new flat `{type:"transcript", confirmed, pending, speaker, ts}` shape; add coverage for the empty-defaults path. - test_transcription_service: add TestOnTranscriptionUpdated covering the upsert + flat broadcast happy path, every early-return branch (missing providers / no playlist mapping / no metadata / paused / in_review None), the resumed_at filter (aware + naive) including the ValueError fall-through on bad timestamps, missing-required-field skips, top-level-speaker fallback, and upsert exception swallowing. - Bumps coverage on transcription_service.py from 67% to 92%, restoring the suite above the 90% gate. Black-formatted. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: DmitryG228 <2280905@gmail.com> --- backend/tests/providers/test_vexa_provider.py | 57 +++- backend/tests/test_transcription_service.py | 272 +++++++++++++++++- 2 files changed, 320 insertions(+), 9 deletions(-) diff --git a/backend/tests/providers/test_vexa_provider.py b/backend/tests/providers/test_vexa_provider.py index 83e4854f..b61fbca5 100644 --- a/backend/tests/providers/test_vexa_provider.py +++ b/backend/tests/providers/test_vexa_provider.py @@ -500,37 +500,73 @@ async def test_handle_ws_message_pong(self, vexa_provider): await vexa_provider._handle_ws_message({"type": "pong"}) @pytest.mark.asyncio - async def test_handle_ws_message_transcript_mutable(self, vexa_provider): - """Test handling transcript.mutable message.""" + async def test_handle_ws_message_transcript(self, vexa_provider): + """Test handling the new flat `transcript` frame from Vexa WS.""" callback_called = False + callback_event = None callback_data = {} async def callback(event_type, data): - nonlocal callback_called, callback_data + nonlocal callback_called, callback_event, callback_data callback_called = True + callback_event = event_type callback_data = data vexa_provider._meeting_id_to_key[100] = "google_meet:abc-123" vexa_provider._subscribed_meetings["google_meet:abc-123"] = callback + confirmed = [{"segment_id": "s1", "text": "Hello"}] + pending = [{"segment_id": "s2", "text": "world"}] await vexa_provider._handle_ws_message( { - "type": "transcript.mutable", + "type": "transcript", "meeting": {"id": 100}, - "payload": {"segments": [{"text": "Hello"}]}, + "speaker": "Alice", + "confirmed": confirmed, + "pending": pending, + "ts": "2026-04-20T19:00:00.000Z", } ) assert callback_called + assert callback_event == "transcript.updated" assert callback_data["platform"] == "google_meet" assert callback_data["meeting_id"] == "abc-123" - assert callback_data["segments"] == [{"text": "Hello"}] + assert callback_data["speaker"] == "Alice" + assert callback_data["confirmed"] == confirmed + assert callback_data["pending"] == pending + assert callback_data["ts"] == "2026-04-20T19:00:00.000Z" + + @pytest.mark.asyncio + async def test_handle_ws_message_transcript_defaults_empty_lists( + self, vexa_provider + ): + """Missing `confirmed`/`pending` keys default to empty lists.""" + callback_data = {} + + async def callback(event_type, data): + callback_data.update(data) + + vexa_provider._meeting_id_to_key[100] = "google_meet:abc-123" + vexa_provider._subscribed_meetings["google_meet:abc-123"] = callback + + await vexa_provider._handle_ws_message( + {"type": "transcript", "meeting": {"id": 100}} + ) + + assert callback_data["confirmed"] == [] + assert callback_data["pending"] == [] @pytest.mark.asyncio async def test_handle_ws_message_transcript_unknown_meeting(self, vexa_provider): """Test handling transcript for unknown meeting.""" await vexa_provider._handle_ws_message( - {"type": "transcript.mutable", "meeting": {"id": 999}, "payload": {}} + { + "type": "transcript", + "meeting": {"id": 999}, + "confirmed": [], + "pending": [], + } ) @pytest.mark.asyncio @@ -539,7 +575,12 @@ async def test_handle_ws_message_transcript_no_callback(self, vexa_provider): vexa_provider._meeting_id_to_key[100] = "google_meet:abc-123" await vexa_provider._handle_ws_message( - {"type": "transcript.mutable", "meeting": {"id": 100}, "payload": {}} + { + "type": "transcript", + "meeting": {"id": 100}, + "confirmed": [], + "pending": [], + } ) @pytest.mark.asyncio diff --git a/backend/tests/test_transcription_service.py b/backend/tests/test_transcription_service.py index d1b17c12..0bd4667b 100644 --- a/backend/tests/test_transcription_service.py +++ b/backend/tests/test_transcription_service.py @@ -125,7 +125,6 @@ async def test_handles_provider_not_initialized(self, service, caplog): assert "Transcription provider not initialized" in caplog.text - class TestOnVexaEvent: """Tests for Vexa event forwarding.""" @@ -557,6 +556,277 @@ async def test_publishes_recovery_status_for_each_active_bot( assert mock_event_publisher.publish.call_count == 2 +class TestOnTranscriptionUpdated: + """Tests for `on_transcription_updated` — the new flat-passthrough flow.""" + + @pytest.fixture + def service_ready( + self, + mock_transcription_provider, + mock_storage_provider, + mock_event_publisher, + ): + svc = TranscriptionService( + transcription_provider=mock_transcription_provider, + storage_provider=mock_storage_provider, + event_publisher=mock_event_publisher, + ) + svc._meeting_to_playlist["google_meet:abc-def"] = 42 + return svc + + @pytest.fixture + def metadata(self): + return PlaylistMetadata( + _id="meta1", + playlist_id=42, + in_review=7, + transcription_paused=False, + ) + + def _payload(self, **overrides): + base = { + "platform": "google_meet", + "meeting_id": "abc-def", + "speaker": "Alice", + "confirmed": [], + "pending": [], + "ts": "2026-04-20T19:00:00.000Z", + } + base.update(overrides) + return base + + def _seg(self, **overrides): + seg = { + "segment_id": "abc:speaker-0:1", + "text": "hello world", + "speaker": "Alice", + "language": "en", + "start_time": 0.0, + "end_time": 1.0, + "absolute_start_time": "2026-04-20T19:00:00.000Z", + "absolute_end_time": "2026-04-20T19:00:01.000Z", + "updated_at": "2026-04-20T19:00:01.500Z", + } + seg.update(overrides) + return seg + + @pytest.mark.asyncio + async def test_upserts_confirmed_and_broadcasts_flat_shape( + self, service_ready, mock_storage_provider, mock_event_publisher, metadata + ): + mock_storage_provider.get_playlist_metadata.return_value = metadata + seg = self._seg() + + await service_ready.on_transcription_updated( + self._payload(confirmed=[seg], pending=[{"segment_id": "p1"}]) + ) + + mock_storage_provider.upsert_segment.assert_called_once() + kwargs = mock_storage_provider.upsert_segment.call_args.kwargs + assert kwargs["playlist_id"] == 42 + assert kwargs["version_id"] == 7 + assert kwargs["segment_id"] == "abc:speaker-0:1" + assert kwargs["data"].segment_id == "abc:speaker-0:1" + assert kwargs["data"].completed is True + assert kwargs["data"].speaker == "Alice" + + mock_event_publisher.ws_manager.broadcast.assert_called_once() + msg = mock_event_publisher.ws_manager.broadcast.call_args.args[0] + assert msg["type"] == "transcript" + assert msg["speaker"] == "Alice" + assert msg["confirmed"] == [seg] + assert msg["pending"] == [{"segment_id": "p1"}] + assert msg["playlist_id"] == 42 + assert msg["version_id"] == 7 + assert msg["ts"] == "2026-04-20T19:00:00.000Z" + + @pytest.mark.asyncio + async def test_returns_when_storage_provider_missing(self, service_ready, caplog): + service_ready.storage_provider = None + await service_ready.on_transcription_updated(self._payload()) + assert "Providers not initialized" in caplog.text + + @pytest.mark.asyncio + async def test_returns_when_event_publisher_missing(self, service_ready, caplog): + service_ready.event_publisher = None + await service_ready.on_transcription_updated(self._payload()) + assert "Providers not initialized" in caplog.text + + @pytest.mark.asyncio + async def test_returns_when_no_playlist_mapping( + self, service_ready, mock_storage_provider, caplog + ): + await service_ready.on_transcription_updated( + self._payload(meeting_id="not-mapped") + ) + mock_storage_provider.upsert_segment.assert_not_called() + assert "No playlist_id" in caplog.text + + @pytest.mark.asyncio + async def test_returns_when_metadata_missing( + self, service_ready, mock_storage_provider, mock_event_publisher + ): + mock_storage_provider.get_playlist_metadata.return_value = None + await service_ready.on_transcription_updated( + self._payload(confirmed=[self._seg()]) + ) + mock_storage_provider.upsert_segment.assert_not_called() + mock_event_publisher.ws_manager.broadcast.assert_not_called() + + @pytest.mark.asyncio + async def test_returns_when_in_review_none( + self, service_ready, mock_storage_provider, mock_event_publisher + ): + mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( + _id="m", playlist_id=42, in_review=None + ) + await service_ready.on_transcription_updated( + self._payload(confirmed=[self._seg()]) + ) + mock_storage_provider.upsert_segment.assert_not_called() + mock_event_publisher.ws_manager.broadcast.assert_not_called() + + @pytest.mark.asyncio + async def test_returns_when_paused( + self, service_ready, mock_storage_provider, mock_event_publisher + ): + mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( + _id="m", playlist_id=42, in_review=7, transcription_paused=True + ) + await service_ready.on_transcription_updated( + self._payload(confirmed=[self._seg()]) + ) + mock_storage_provider.upsert_segment.assert_not_called() + mock_event_publisher.ws_manager.broadcast.assert_not_called() + + @pytest.mark.asyncio + async def test_skips_segment_before_resumed_at( + self, service_ready, mock_storage_provider + ): + from datetime import datetime, timezone + + mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( + _id="m", + playlist_id=42, + in_review=7, + transcription_resumed_at=datetime( + 2026, 4, 20, 19, 0, 30, tzinfo=timezone.utc + ), + ) + + old = self._seg( + segment_id="old", absolute_start_time="2026-04-20T19:00:00.000Z" + ) + new = self._seg( + segment_id="new", absolute_start_time="2026-04-20T19:01:00.000Z" + ) + + await service_ready.on_transcription_updated( + self._payload(confirmed=[old, new]) + ) + + ids = [ + c.kwargs["segment_id"] + for c in mock_storage_provider.upsert_segment.call_args_list + ] + assert ids == ["new"] + + @pytest.mark.asyncio + async def test_handles_naive_resumed_at(self, service_ready, mock_storage_provider): + """Naive `transcription_resumed_at` is treated as UTC.""" + from datetime import datetime + + mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( + _id="m", + playlist_id=42, + in_review=7, + transcription_resumed_at=datetime(2026, 4, 20, 19, 0, 30), + ) + + await service_ready.on_transcription_updated( + self._payload( + confirmed=[ + self._seg(absolute_start_time="2026-04-20T19:01:00.000Z"), + ] + ) + ) + + mock_storage_provider.upsert_segment.assert_called_once() + + @pytest.mark.asyncio + async def test_swallows_invalid_absolute_start_time( + self, service_ready, mock_storage_provider + ): + """A malformed `absolute_start_time` falls through `ValueError` and the segment is still upserted.""" + from datetime import datetime, timezone + + mock_storage_provider.get_playlist_metadata.return_value = PlaylistMetadata( + _id="m", + playlist_id=42, + in_review=7, + transcription_resumed_at=datetime( + 2026, 4, 20, 19, 0, 30, tzinfo=timezone.utc + ), + ) + + await service_ready.on_transcription_updated( + self._payload(confirmed=[self._seg(absolute_start_time="not-a-date")]) + ) + mock_storage_provider.upsert_segment.assert_called_once() + + @pytest.mark.asyncio + async def test_skips_segments_missing_required_fields( + self, service_ready, mock_storage_provider, metadata + ): + mock_storage_provider.get_playlist_metadata.return_value = metadata + + await service_ready.on_transcription_updated( + self._payload( + confirmed=[ + self._seg(segment_id=""), + self._seg(absolute_start_time=""), + self._seg(text=""), + self._seg(text=" "), + ] + ) + ) + mock_storage_provider.upsert_segment.assert_not_called() + + @pytest.mark.asyncio + async def test_falls_back_to_top_level_speaker( + self, service_ready, mock_storage_provider, metadata + ): + """Per-segment `speaker` overrides; otherwise the message-level `speaker` is used.""" + mock_storage_provider.get_playlist_metadata.return_value = metadata + + await service_ready.on_transcription_updated( + self._payload( + speaker="Bob", + confirmed=[self._seg(speaker=None, segment_id="x")], + ) + ) + kwargs = mock_storage_provider.upsert_segment.call_args.kwargs + assert kwargs["data"].speaker == "Bob" + + @pytest.mark.asyncio + async def test_logs_and_continues_on_upsert_failure( + self, + service_ready, + mock_storage_provider, + mock_event_publisher, + metadata, + caplog, + ): + mock_storage_provider.get_playlist_metadata.return_value = metadata + mock_storage_provider.upsert_segment.side_effect = RuntimeError("boom") + + await service_ready.on_transcription_updated( + self._payload(confirmed=[self._seg()]) + ) + + assert "Failed to upsert segment" in caplog.text + mock_event_publisher.ws_manager.broadcast.assert_called_once() + class TestTranscriptionServiceLifecycle: """Tests for TranscriptionService initialization and cleanup."""