-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconversation_async.py
More file actions
130 lines (110 loc) · 4.9 KB
/
conversation_async.py
File metadata and controls
130 lines (110 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import asyncio
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from amigo_sdk import AsyncAmigoClient
from amigo_sdk.errors import AmigoError, ConflictError, NotFoundError
from amigo_sdk.models import (
ConversationCreateConversationRequest,
CreateConversationParametersQuery,
GetConversationMessagesParametersQuery,
GetConversationsParametersQuery,
InteractWithConversationParametersQuery,
)
async def run() -> None:
# Load env vars from examples/.env (shared by all examples)
examples_env = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(dotenv_path=examples_env)
service_id = os.getenv("AMIGO_SERVICE_ID")
if not service_id:
raise SystemExit(
"Missing AMIGO_SERVICE_ID. Set it in your .env file (see .env.example)."
)
# AmigoClient reads other config from env (AMIGO_API_KEY, AMIGO_API_KEY_ID, AMIGO_USER_ID,
# AMIGO_ORGANIZATION_ID, optional AMIGO_BASE_URL). You can also pass them explicitly.
async with AsyncAmigoClient() as client:
try:
# 1) Create a conversation and log streamed events
print("Creating conversation...")
create_events = await client.conversation.create_conversation(
ConversationCreateConversationRequest(service_id=service_id),
CreateConversationParametersQuery(response_format="text"),
)
conversation_id: str | None = None
log_create = make_event_logger("create")
async for evt in create_events:
event = evt.model_dump(mode="json")
log_create(event)
if event.get("type") == "conversation-created":
conversation_id = event.get("conversation_id")
if not conversation_id:
raise RuntimeError("Conversation was not created (no id received).")
# 2) Get the conversation
print("\nGetting conversation...")
conversation = await client.conversation.get_conversations(
GetConversationsParametersQuery(id=[conversation_id])
)
print(
json.dumps(
conversation.conversations[0].model_dump(mode="json"), indent=2
),
)
# 3) Interact with the conversation via text and log streamed events
print("\nSending a text message to the conversation...")
interaction_events = await client.conversation.interact_with_conversation(
conversation_id,
InteractWithConversationParametersQuery(
request_format="text", response_format="text"
),
text_message="Hello from the Amigo Python SDK example!",
)
log_interact = make_event_logger("interact")
async for evt in interaction_events:
event = evt.model_dump(mode="json")
log_interact(event)
if event.get("type") == "interaction-complete":
break
# 4) Get messages for the conversation and log them
print("\nFetching recent messages...")
messages_page = await client.conversation.get_conversation_messages(
conversation_id,
GetConversationMessagesParametersQuery(
limit=10, sort_by=["+created_at"]
),
)
for m in getattr(messages_page, "messages", []) or []:
print("[message]", json.dumps(m.model_dump(mode="json"), indent=2))
# 5) Finish the conversation
print("\nFinishing conversation...")
try:
await client.conversation.finish_conversation(conversation_id)
print("Conversation finished.")
except (ConflictError, NotFoundError) as e:
# Acceptable eventual-consistency outcomes
print(f"Finish conversation warning: {type(e).__name__} - {e}")
print("Done.")
except AmigoError as err:
print(err)
raise SystemExit(1) from err
except Exception as err:
print("Unexpected error:", err)
raise SystemExit(1) from err
def make_event_logger(label: str):
new_message_count = 0
printed_ellipsis = False
def _log(event: dict) -> None:
nonlocal new_message_count, printed_ellipsis
event_type = event.get("type")
if event_type == "new-message":
if new_message_count < 3:
new_message_count += 1
print(f"[{label} event] {json.dumps(event, indent=2)}")
elif not printed_ellipsis:
printed_ellipsis = True
print(f"[{label} event] ... (more new-message events)")
return
print(f"[{label} event] {json.dumps(event, indent=2)}")
return _log
if __name__ == "__main__":
asyncio.run(run())