Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 61 additions & 36 deletions src/amigo_sdk/resources/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ class GetMessageSourceResponse(BaseModel):
content_type: Literal["audio/mpeg", "audio/wav"]


def _build_interact_form_data(
*,
initial_message_type: Literal["user-message", "external-event"],
external_event_message_content: list[str] | None,
external_event_message_timestamp: list[datetime] | None,
) -> list[tuple[str, tuple[None, str]]]:
"""Build multipart form-data fields for the interact endpoint."""
data: list[tuple[str, tuple[None, str]]] = [
("initial_message_type", (None, initial_message_type))
]
for content in external_event_message_content or []:
data.append(("external_event_message_content", (None, content)))
for timestamp in external_event_message_timestamp or []:
data.append(("external_event_message_timestamp", (None, timestamp.isoformat())))
return data


class AsyncConversationResource:
"""Conversation resource for Amigo API operations."""

Expand Down Expand Up @@ -81,6 +98,8 @@ async def interact_with_conversation(
text_message: str | None = None,
audio_bytes: bytes | None = None,
audio_content_type: Literal["audio/mpeg", "audio/wav"] | None = None,
external_event_message_content: list[str] | None = None,
external_event_message_timestamp: list[datetime] | None = None,
) -> "AsyncGenerator[ConversationInteractWithConversationResponse]":
"""Interact with a conversation and stream NDJSON events.

Expand Down Expand Up @@ -110,32 +129,34 @@ async def _generator():
"text_message is required when request_format is 'text'"
)
text_bytes = text_message.encode("utf-8")
request_kwargs["data"] = {
"initial_message_type": initial_message_type,
}
request_kwargs["files"] = {
"recorded_message": (
"message.txt",
text_bytes,
"text/plain; charset=utf-8",
form_fields = _build_interact_form_data(
initial_message_type=initial_message_type,
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)
request_kwargs["files"] = form_fields + [
(
"recorded_message",
("message.txt", text_bytes, "text/plain; charset=utf-8"),
)
}
]
elif params.request_format == Format.voice:
if audio_bytes is None or audio_content_type is None:
raise ValueError(
"audio_bytes and audio_content_type are required when request_format is 'voice'"
)
ext = "mp3" if audio_content_type == "audio/mpeg" else "wav"
request_kwargs["data"] = {
"initial_message_type": initial_message_type,
}
request_kwargs["files"] = {
"recorded_message": (
f"audio.{ext}",
audio_bytes,
audio_content_type,
form_fields = _build_interact_form_data(
initial_message_type=initial_message_type,
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)
request_kwargs["files"] = form_fields + [
(
"recorded_message",
(f"audio.{ext}", audio_bytes, audio_content_type),
)
}
]
else:
raise ValueError("Unsupported or missing request_format in params")

Expand Down Expand Up @@ -270,6 +291,8 @@ def interact_with_conversation(
text_message: str | None = None,
audio_bytes: bytes | None = None,
audio_content_type: Literal["audio/mpeg", "audio/wav"] | None = None,
external_event_message_content: list[str] | None = None,
external_event_message_timestamp: list[datetime] | None = None,
) -> Iterator[ConversationInteractWithConversationResponse]:
def _iter():
params_data = params.model_dump(mode="json", exclude_none=True)
Expand All @@ -295,32 +318,34 @@ def _iter():
"text_message is required when request_format is 'text'"
)
text_bytes = text_message.encode("utf-8")
request_kwargs["data"] = {
"initial_message_type": initial_message_type,
}
request_kwargs["files"] = {
"recorded_message": (
"message.txt",
text_bytes,
"text/plain; charset=utf-8",
form_fields = _build_interact_form_data(
initial_message_type=initial_message_type,
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)
request_kwargs["files"] = form_fields + [
(
"recorded_message",
("message.txt", text_bytes, "text/plain; charset=utf-8"),
)
}
]
elif req_format == Format.voice:
if audio_bytes is None or audio_content_type is None:
raise ValueError(
"audio_bytes and audio_content_type are required when request_format is 'voice'"
)
ext = "mp3" if audio_content_type == "audio/mpeg" else "wav"
request_kwargs["data"] = {
"initial_message_type": initial_message_type,
}
request_kwargs["files"] = {
"recorded_message": (
f"audio.{ext}",
audio_bytes,
audio_content_type,
form_fields = _build_interact_form_data(
initial_message_type=initial_message_type,
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)
request_kwargs["files"] = form_fields + [
(
"recorded_message",
(f"audio.{ext}", audio_bytes, audio_content_type),
)
}
]
else:
raise ValueError("Unsupported or missing request_format in params")

Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_conversation_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import os
from collections.abc import AsyncGenerator
from datetime import UTC, datetime, timedelta
from pathlib import Path

import pytest
Expand Down Expand Up @@ -31,6 +32,36 @@ def _build_test_wav_bytes() -> bytes:
return fixture_path.read_bytes()


async def _latest_conversation_message_time_async(
client: AsyncAmigoClient, conversation_id: str
) -> datetime:
page = await client.conversation.get_conversation_messages(
conversation_id,
GetConversationMessagesParametersQuery(limit=1, sort_by=["-created_at"]),
)
if not page.messages:
return datetime.now(UTC)
latest = page.messages[0]
return getattr(latest, "timestamp", None) or getattr(
latest, "created_at", datetime.now(UTC)
)


def _latest_conversation_message_time_sync(
client: AmigoClient, conversation_id: str
) -> datetime:
page = client.conversation.get_conversation_messages(
conversation_id,
GetConversationMessagesParametersQuery(limit=1, sort_by=["-created_at"]),
)
if not page.messages:
return datetime.now(UTC)
latest = page.messages[0]
return getattr(latest, "timestamp", None) or getattr(
latest, "created_at", datetime.now(UTC)
)


@pytest.fixture(scope="module", autouse=True)
async def pre_suite_cleanup() -> AsyncGenerator[None]:
# Ensure env loaded and client can connect; verify service exists
Expand Down Expand Up @@ -168,13 +199,29 @@ async def test_interact_with_conversation_external_event_streams(self):
assert type(self).conversation_id is not None

async with AsyncAmigoClient() as client:
latest_message_time = await _latest_conversation_message_time_async(
client, type(self).conversation_id
)
external_event_message_content = [
"External event integration prelude #1.",
"External event integration prelude #2.",
]
external_event_message_timestamp = [
latest_message_time + timedelta(seconds=1),
latest_message_time + timedelta(seconds=2),
]
assert len(external_event_message_timestamp) == len(
external_event_message_content
)
events = await client.conversation.interact_with_conversation(
type(self).conversation_id,
params=InteractWithConversationParametersQuery(
request_format="text", response_format="text"
),
initial_message_type="external-event",
text_message="External event integration test message.",
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)

saw_interaction_complete = False
Expand Down Expand Up @@ -378,13 +425,29 @@ def test_interact_with_conversation_external_event_streams(self):
assert type(self).conversation_id is not None

with AmigoClient() as client:
latest_message_time = _latest_conversation_message_time_sync(
client, type(self).conversation_id
)
external_event_message_content = [
"External event integration prelude #1.",
"External event integration prelude #2.",
]
external_event_message_timestamp = [
latest_message_time + timedelta(seconds=1),
latest_message_time + timedelta(seconds=2),
]
assert len(external_event_message_timestamp) == len(
external_event_message_content
)
events = client.conversation.interact_with_conversation(
type(self).conversation_id,
params=InteractWithConversationParametersQuery(
request_format="text", response_format="text"
),
initial_message_type="external-event",
text_message="External event integration test message.",
external_event_message_content=external_event_message_content,
external_event_message_timestamp=external_event_message_timestamp,
)

saw_interaction_complete = False
Expand Down
Loading