Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions backend/src/dna/events/event_types.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
"""Event type definitions."""
"""Event type definitions.

Only events that are actually emitted AND consumed live here. The flat
`transcript` frame doesn't go through this enum — it's broadcast directly
via `EventPublisher.ws_manager.broadcast(...)` because its envelope is
shaped by the Vexa contract, not by the `{type, payload}` wrapper this
enum drives.
"""

from enum import Enum


class EventType(str, Enum):
TRANSCRIPTION_SUBSCRIBE = "transcription.subscribe"
TRANSCRIPTION_STARTED = "transcription.started"
TRANSCRIPTION_UPDATED = "transcription.updated"
TRANSCRIPTION_COMPLETED = "transcription.completed"
TRANSCRIPTION_ERROR = "transcription.error"
SEGMENT_CREATED = "segment.created"
SEGMENT_UPDATED = "segment.updated"
BOT_STATUS_CHANGED = "bot.status_changed"
PLAYLIST_UPDATED = "playlist.updated"
VERSION_UPDATED = "version.updated"
DRAFT_NOTE_UPDATED = "draft_note.updated"
2 changes: 0 additions & 2 deletions backend/src/dna/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from dna.models.stored_segment import (
StoredSegment,
StoredSegmentCreate,
generate_segment_id,
)
from dna.models.transcription import (
BotSession,
Expand Down Expand Up @@ -92,7 +91,6 @@
"PlaylistMetadataUpdate",
"StoredSegment",
"StoredSegmentCreate",
"generate_segment_id",
"BotSession",
"BotStatus",
"BotStatusEnum",
Expand Down
43 changes: 23 additions & 20 deletions backend/src/dna/models/stored_segment.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
"""Stored Segment Models.

Pydantic models for transcription segments stored in MongoDB.

Backend operates as a passthrough for Vexa's transcript stream:
- `segment_id` is Vexa's stable id (e.g. "9b914779:speaker-1:72"), not a hash.
- Upsert key in MongoDB is `{segment_id, playlist_id, version_id}`.
- All Vexa fields (start_time, end_time, completed, language, ...) are preserved.
"""

import hashlib
from datetime import datetime, timezone
from typing import Optional

from pydantic import BaseModel, ConfigDict, Field


def generate_segment_id(
playlist_id: int,
version_id: int,
absolute_start_time: str,
) -> str:
"""Generate a unique segment ID based on version and start time.

Note: Speaker is intentionally excluded from the key because Vexa's mutable
transcription can reassign speakers as it refines the transcript. Using only
the start time ensures updates to the same moment are treated as updates
rather than new segments.
"""
key = f"{playlist_id}:{version_id}:{absolute_start_time}"
return hashlib.sha256(key.encode()).hexdigest()[:16]


class StoredSegmentCreate(BaseModel):
"""Model for creating a stored segment."""
"""Model for creating/upserting a stored segment (raw Vexa passthrough)."""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although Vexa is a first class dependency now, I would like to try to keep this part of the codebase generalized as much as we can. Nit picky, but change where we say "Vexa" to "transcriptProvider".


segment_id: str = Field(
..., description="Vexa's stable segment id (e.g. '9b914779:speaker-1:72')"
)
text: str = Field(..., description="Transcript text content")
speaker: Optional[str] = Field(default=None, description="Speaker identifier")
language: Optional[str] = Field(default=None, description="Language code")
start_time: Optional[float] = Field(
default=None, description="Relative start time in seconds"
)
end_time: Optional[float] = Field(
default=None, description="Relative end time in seconds"
)
completed: Optional[bool] = Field(
default=True, description="Whether the segment is confirmed (vs draft)"
)
absolute_start_time: str = Field(
..., description="UTC timestamp (ISO 8601) of segment start"
)
absolute_end_time: str = Field(
..., description="UTC timestamp (ISO 8601) of segment end"
)
vexa_updated_at: Optional[str] = Field(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

default=None, description="Vexa's updated_at timestamp for deduplication"
default=None, description="Vexa's updated_at timestamp"
)


Expand All @@ -49,12 +49,15 @@ class StoredSegment(BaseModel):
model_config = ConfigDict(populate_by_name=True)

id: str = Field(alias="_id")
segment_id: str = Field(..., description="Unique segment ID")
segment_id: str
playlist_id: int
version_id: int
text: str
speaker: Optional[str] = None
language: Optional[str] = None
start_time: Optional[float] = None
end_time: Optional[float] = None
completed: Optional[bool] = True
absolute_start_time: str
absolute_end_time: str
vexa_updated_at: Optional[str] = None
Expand Down
27 changes: 26 additions & 1 deletion backend/src/dna/storage_providers/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ class MongoDBStorageProvider(StorageProviderBase):

def __init__(self) -> None:
self._client: Optional[AsyncMongoClient[Any]] = None
self._indexes_ensured = False

async def ensure_indexes(self) -> None:
"""Create collection indexes. Idempotent; safe to call on every startup.

The compound unique index on the `segments` upsert key makes
`upsert_segment` O(log n) instead of a full-collection scan — at
Vexa's refine-heavy write rate, scans become user-visible at ~100k
segments and timeouts at ~1M.
"""
if self._indexes_ensured:
return
await self.segments_collection.create_index(
[("segment_id", 1), ("playlist_id", 1), ("version_id", 1)],
unique=True,
name="segments_upsert_key",
)
await self.segments_collection.create_index(
[("playlist_id", 1), ("version_id", 1), ("absolute_start_time", 1)],
name="segments_list_by_version",
)
self._indexes_ensured = True

@property
def client(self) -> AsyncMongoClient[Any]:
Expand Down Expand Up @@ -239,14 +261,17 @@ async def upsert_segment(
existing = await self.segments_collection.find_one(query)
is_new = existing is None

# `segment_id` is already in `data.model_dump()` — MongoDB rejects an
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment seems unneeded

# update that lists the same field in both `$set` and `$setOnInsert`.
# `playlist_id`/`version_id` stay in `$setOnInsert` because they aren't
# part of `StoredSegmentCreate` (they come from the enclosing context).
update: dict[str, Any] = {
"$set": {
**data.model_dump(),
"updated_at": now,
},
"$setOnInsert": {
"created_at": now,
"segment_id": segment_id,
"playlist_id": playlist_id,
"version_id": version_id,
},
Expand Down
14 changes: 11 additions & 3 deletions backend/src/dna/transcription_providers/vexa.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,13 @@ async def _handle_ws_message(self, data: dict[str, Any]) -> None:
if msg_type == "pong":
return

if msg_type == "transcript.mutable":
if msg_type == "transcript":
# New Vexa WS contract (>= 2026-04):
# {type: "transcript", speaker, confirmed: [...], pending: [...],
# meeting: {id}, ts}
# Forward the raw confirmed/pending arrays to the service. The
# service persists confirmed segments to MongoDB and broadcasts
# the whole shape (plus playlist_id/version_id) to DNA clients.
internal_id = meeting_info.get("id")
meeting_key = self._meeting_id_to_key.get(internal_id)
if not meeting_key:
Expand All @@ -319,8 +325,10 @@ async def _handle_ws_message(self, data: dict[str, Any]) -> None:
{
"platform": platform,
"meeting_id": native_id,
"segments": data.get("payload", {}).get("segments", []),
"payload": data.get("payload", {}),
"speaker": data.get("speaker"),
"confirmed": data.get("confirmed", []) or [],
"pending": data.get("pending", []) or [],
"ts": data.get("ts"),
},
)
return
Expand Down
123 changes: 52 additions & 71 deletions backend/src/dna/transcription_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any

from dna.events import EventPublisher, EventType, get_event_publisher
from dna.models.stored_segment import StoredSegmentCreate, generate_segment_id
from dna.models.stored_segment import StoredSegmentCreate
from dna.storage_providers.storage_provider_base import (
StorageProviderBase,
get_storage_provider,
Expand Down Expand Up @@ -155,10 +155,11 @@ async def _on_vexa_event(self, event_type: str, payload: dict[str, Any]) -> None
return

if event_type == "transcript.updated":
await self.event_publisher.publish(
EventType.TRANSCRIPTION_UPDATED,
payload,
)
# The service persists confirmed segments + broadcasts the flat
# `{type:"transcript", ...}` shape directly from
# `on_transcription_updated`. No need to also emit
# TRANSCRIPTION_UPDATED through the publisher — nothing
# subscribes to it and frontends only consume the flat envelope.
await self.on_transcription_updated(payload)
elif event_type == "bot.status_changed":
await self.event_publisher.publish(
Expand Down Expand Up @@ -212,25 +213,28 @@ async def subscribe_to_meeting(
logger.exception("Failed to subscribe to meeting %s: %s", meeting_key, e)

async def on_transcription_updated(self, payload: dict[str, Any]) -> None:
"""Process transcription segments and save to storage."""
"""Passthrough: upsert Vexa's confirmed segments by their stable
`segment_id`, then forward the raw `{type:"transcript", confirmed,
pending, speaker, playlist_id, version_id, ts}` message to DNA WS
clients — the frontend TranscriptManager consumes it directly.
"""
if self.storage_provider is None or self.event_publisher is None:
logger.error("Providers not initialized")
return

platform = payload.get("platform", "")
meeting_id = payload.get("meeting_id", "")
segments = payload.get("segments", [])

if not segments:
logger.debug("No segments in transcription update")
return
speaker = payload.get("speaker")
confirmed: list[dict[str, Any]] = payload.get("confirmed", []) or []
pending: list[dict[str, Any]] = payload.get("pending", []) or []
ts = payload.get("ts")

meeting_key = f"{platform}:{meeting_id}"
playlist_id = self._meeting_to_playlist.get(meeting_key)

if playlist_id is None:
logger.warning(
"No playlist_id found for meeting %s, cannot save segments", meeting_key
"No playlist_id found for meeting %s, cannot save segments",
meeting_key,
)
return

Expand All @@ -252,87 +256,64 @@ async def on_transcription_updated(self, payload: dict[str, Any]) -> None:
version_id = metadata.in_review
resumed_at = metadata.transcription_resumed_at

for segment_data in segments:
text = segment_data.get("text", "").strip()
if not text:
continue

absolute_start_time = segment_data.get("absolute_start_time")
if not absolute_start_time:
for seg in confirmed:
segment_id = seg.get("segment_id")
absolute_start_time = seg.get("absolute_start_time")
text = (seg.get("text") or "").strip()
if not segment_id or not absolute_start_time or not text:
continue

if resumed_at is not None:
try:
segment_time = datetime.fromisoformat(
absolute_start_time.replace("Z", "+00:00")
)
resumed_at_aware = resumed_at
if resumed_at.tzinfo is None:
resumed_at_aware = resumed_at.replace(tzinfo=timezone.utc)
resumed_at_aware = (
resumed_at
if resumed_at.tzinfo is not None
else resumed_at.replace(tzinfo=timezone.utc)
)
if segment_time < resumed_at_aware:
logger.debug(
"Skipping segment from before resume: %s < %s",
absolute_start_time,
resumed_at_aware.isoformat(),
)
continue
except ValueError:
pass

speaker = segment_data.get("speaker", "Unknown")
segment_id = generate_segment_id(
playlist_id, version_id, absolute_start_time
)

segment_create = StoredSegmentCreate(
segment_id=segment_id,
text=text,
speaker=speaker,
language=segment_data.get("language"),
speaker=seg.get("speaker") or speaker,
language=seg.get("language"),
start_time=seg.get("start_time"),
end_time=seg.get("end_time"),
completed=True,
absolute_start_time=absolute_start_time,
absolute_end_time=segment_data.get("absolute_end_time", ""),
vexa_updated_at=segment_data.get("updated_at"),
absolute_end_time=seg.get("absolute_end_time", ""),
vexa_updated_at=seg.get("updated_at"),
)

try:
stored_segment, is_new = await self.storage_provider.upsert_segment(
await self.storage_provider.upsert_segment(
playlist_id=playlist_id,
version_id=version_id,
segment_id=segment_id,
data=segment_create,
)

event_type = (
EventType.SEGMENT_CREATED if is_new else EventType.SEGMENT_UPDATED
)
await self.event_publisher.publish(
event_type,
{
"segment_id": segment_id,
"playlist_id": playlist_id,
"version_id": version_id,
"text": text,
"speaker": speaker,
"absolute_start_time": absolute_start_time,
"absolute_end_time": segment_data.get("absolute_end_time", ""),
},
)

logger.info(
"Saved segment %s (%s) for version %s - text: '%s...', end_time: %s",
segment_id,
"new" if is_new else "updated",
version_id,
text[:30] if len(text) > 30 else text,
segment_data.get("absolute_end_time", ""),
)
logger.debug(
"Full segment %s (%s) for version %s",
segment_id,
"new" if is_new else "updated",
version_id,
)
except Exception as e:
logger.exception("Failed to save segment: %s", e)
except Exception:
logger.exception("Failed to upsert segment %s", segment_id)

# Broadcast the raw Vexa shape with DNA envelope fields.
# Frontend TranscriptManager.handleMessage() consumes this directly.
await self.event_publisher.ws_manager.broadcast(
{
"type": "transcript",
"speaker": speaker,
"confirmed": confirmed,
"pending": pending,
"playlist_id": playlist_id,
"version_id": version_id,
"ts": ts,
}
)

async def on_transcription_completed(self, payload: dict[str, Any]) -> None:
"""Handle transcription completion."""
Expand Down
Loading
Loading