Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 328 additions & 0 deletions dimo/api/conversations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
from dimo.errors import check_type, check_optional_type, HTTPError
from typing import Dict, List, Optional, Any, Generator
from requests import Session, RequestException
import json


class Conversations:
"""
Client for the DIMO Conversations API.

This API enables developers to create conversational AI agents that can query
vehicle data, telemetry data, and perform web searches on behalf of users.

Key Features:
- Create AI agents with access to specific vehicles
- Query vehicle identity (make, model, owner) via GraphQL
- Query real-time telemetry (speed, fuel, location) via GraphQL
- Perform location-based web searches
- Stream responses in real-time using Server-Sent Events (SSE)
- Multi-agent delegation architecture with specialized subagents
"""

def __init__(self, request_method, get_auth_headers, get_full_path, session: Session):
self._request = request_method
self._get_auth_headers = get_auth_headers
self._get_full_path = get_full_path
self._session = session

def health_check(self) -> Dict:
"""
Check the service status and configuration.

Returns:
dict: Service health information including status, version, proxy, and default_model

Example:
>>> dimo = DIMO("Production")
>>> health = dimo.conversations.health_check()
>>> print(health['status'])
"""
response = self._request("GET", "Conversations", "/")
return response

def create_agent(
self,
developer_jwt: str,
user: str,
vehicle_ids: Optional[List[int]] = None,
) -> Dict:
"""
Create a new conversational agent for a user with optional vehicle access.

Args:
developer_jwt (str): Developer JWT token for authentication
user (str): Wallet address (0x...) or email identifying the user
vehicle_ids (list[int], optional): List of vehicle token IDs this agent can access.
- None (default): Unrestricted access, ownership validated at runtime
- []: Empty list means no vehicle access (identity queries only)
- [872, 1234]: Explicit list of allowed vehicles

Returns:
dict: Agent information including agentId, mode, user, vehicleIds, and createdAt

Behavior:
- One agent per user (idempotent creation)
- Validates configuration and mode detection
- Creates/reuses shared identity subagent
- Creates per-vehicle telemetry subagents with token exchange
- Creates shared websearch subagent if enabled

Example:
>>> dimo = DIMO("Production")
>>> dev_jwt = "your_developer_jwt"
>>> agent = dimo.conversations.create_agent(
... developer_jwt=dev_jwt,
... user="0x1234567890abcdef1234567890abcdef12345678",
... vehicle_ids=[872, 1234],
... )
>>> print(agent['agentId'])
"""
check_type("developer_jwt", developer_jwt, str)
check_type("user", user, str)
check_optional_type("vehicle_ids", vehicle_ids, list)
# check_type("enable_websearch", enable_websearch, bool)

body = {
"user": user,
"vehicleIds": vehicle_ids,
# "enableWebsearch": enable_websearch,
}

response = self._request(
"POST",
"Conversations",
"/agents",
headers=self._get_auth_headers(developer_jwt),
data=body,
)
return response

def delete_agent(self, developer_jwt: str, agent_id: str) -> Dict:
"""
Delete an agent and all associated resources.

Args:
developer_jwt (str): Developer JWT token for authentication
agent_id (str): The agent ID to delete

Returns:
dict: Confirmation message

Behavior:
- Deletes Letta agent from server
- Removes metadata from AgentManager
- Cleanup errors are logged but don't fail the request

Example:
>>> dimo = DIMO("Production")
>>> dev_jwt = "your_developer_jwt"
>>> result = dimo.conversations.delete_agent(
... developer_jwt=dev_jwt,
... agent_id="agent-abc123"
... )
>>> print(result['message'])
"""
check_type("developer_jwt", developer_jwt, str)
check_type("agent_id", agent_id, str)

response = self._request(
"DELETE",
"Conversations",
f"/agents/{agent_id}",
headers=self._get_auth_headers(developer_jwt),
)
return response

def send_message(
self,
developer_jwt: str,
agent_id: str,
message: str,
vehicle_ids: Optional[List[int]] = None,
user: Optional[str] = None,
) -> Dict:
"""
Send a message to an agent and receive the complete response (synchronous).

Args:
developer_jwt (str): Developer JWT token for authentication
agent_id (str): The agent ID to send the message to
message (str): The message to send to the agent
vehicle_ids (list[int], optional): Optional vehicle IDs override
user (str, optional): Optional user override

Returns:
dict: Response including agentId, message, response, vehiclesQueried, and timestamp

Behavior:
- Synchronous request/response
- Agent delegates to subagents as needed
- Returns full response after agent completes reasoning
- Timeout: 120 seconds for complex queries

Example:
>>> dimo = DIMO("Production")
>>> dev_jwt = "your_developer_jwt"
>>> response = dimo.conversations.send_message(
... developer_jwt=dev_jwt,
... agent_id="agent-abc123",
... message="What's the make and model of my vehicle?"
... )
>>> print(response['response'])
"""
check_type("developer_jwt", developer_jwt, str)
check_type("agent_id", agent_id, str)
check_type("message", message, str)
check_optional_type("vehicle_ids", vehicle_ids, list)
check_optional_type("user", user, str)

body = {"message": message}
if vehicle_ids is not None:
body["vehicleIds"] = vehicle_ids
if user is not None:
body["user"] = user

response = self._request(
"POST",
"Conversations",
f"/agents/{agent_id}/message",
headers=self._get_auth_headers(developer_jwt),
data=body,
)
return response

def stream_message(
self,
developer_jwt: str,
agent_id: str,
message: str,
vehicle_ids: Optional[List[int]] = None,
user: Optional[str] = None,
) -> Generator[Dict[str, Any], None, None]:
"""
Send a message and receive real-time token-by-token streaming response via SSE.

Args:
developer_jwt (str): Developer JWT token for authentication
agent_id (str): The agent ID to send the message to
message (str): The message to send to the agent
vehicle_ids (list[int], optional): Optional vehicle IDs override
user (str, optional): Optional user override

Yields:
dict: SSE events with either {"content": "token"} or {"done": true, ...metadata}

Behavior:
- Real-time streaming for better UX
- Token-by-token generation from LLM
- Final message includes metadata (agentId, vehiclesQueried)

Example:
>>> dimo = DIMO("Production")
>>> dev_jwt = "your_developer_jwt"
>>> for chunk in dimo.conversations.stream_message(
... developer_jwt=dev_jwt,
... agent_id="agent-abc123",
... message="What's my current speed?"
... ):
... if "content" in chunk:
... print(chunk["content"], end="", flush=True)
... elif "done" in chunk:
... print(f"\\nVehicles queried: {chunk['vehiclesQueried']}")
"""
check_type("developer_jwt", developer_jwt, str)
check_type("agent_id", agent_id, str)
check_type("message", message, str)
check_optional_type("vehicle_ids", vehicle_ids, list)
check_optional_type("user", user, str)

body = {"message": message}
if vehicle_ids is not None:
body["vehicleIds"] = vehicle_ids
if user is not None:
body["user"] = user

headers = self._get_auth_headers(developer_jwt)
headers["Accept"] = "text/event-stream"
headers["Content-Type"] = "application/json"

# Build full URL
url = self._get_full_path("Conversations", f"/agents/{agent_id}/stream")

# Make streaming request directly with session
try:
response = self._session.request(
method="POST",
url=url,
headers=headers,
data=json.dumps(body),
stream=True,
)
response.raise_for_status()
except RequestException as exc:
status = getattr(exc.response, "status_code", None)
body_error = None
try:
body_error = exc.response.json()
except Exception:
body_error = exc.response.text if exc.response else None
raise HTTPError(status=status or -1, message=str(exc), body=body_error)

# Parse SSE stream
for line in response.iter_lines():
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
try:
yield json.loads(data)
except json.JSONDecodeError:
# Skip malformed JSON
continue

def get_history(
self,
developer_jwt: str,
agent_id: str,
limit: int = 100,
) -> Dict:
"""
Retrieve all messages in a conversation.

Args:
developer_jwt (str): Developer JWT token for authentication
agent_id (str): The agent ID to get history for
limit (int): Maximum number of messages to return (default: 100)

Returns:
dict: Conversation history including agentId, messages array, and total count

Behavior:
- Retrieves from Letta server
- Includes all message roles (user, agent, system)
- Reverse chronological order (newest first)

Example:
>>> dimo = DIMO("Production")
>>> dev_jwt = "your_developer_jwt"
>>> history = dimo.conversations.get_history(
... developer_jwt=dev_jwt,
... agent_id="agent-abc123",
... limit=50
... )
>>> for msg in history['messages']:
... print(f"{msg['role']}: {msg['content']}")
"""
check_type("developer_jwt", developer_jwt, str)
check_type("agent_id", agent_id, str)
check_type("limit", limit, int)

response = self._request(
"GET",
"Conversations",
f"/agents/{agent_id}/history",
headers=self._get_auth_headers(developer_jwt),
params={"limit": limit},
)
return response
2 changes: 2 additions & 0 deletions dimo/dimo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .api.attestation import Attestation
from .api.auth import Auth
from .api.conversations import Conversations
from .api.device_definitions import DeviceDefinitions
from .api.token_exchange import TokenExchange
from .api.trips import Trips
Expand Down Expand Up @@ -75,6 +76,7 @@ def __getattr__(self, name: str) -> Any:
mapping = {
"attestation": (Attestation, ("request", "_get_auth_headers")),
"auth": (Auth, ("request", "_get_auth_headers", "env", "self")),
"conversations": (Conversations, ("request", "_get_auth_headers", "_get_full_path", "session")),
"device_definitions": (DeviceDefinitions, ("request", "_get_auth_headers")),
"token_exchange": (
TokenExchange,
Expand Down
2 changes: 2 additions & 0 deletions dimo/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Production": {
"Attestation": "https://attestation-api.dimo.zone",
"Auth": "https://auth.dimo.zone",
"Conversations": "https://conversations-api.dimo.zone",
"Identity": "https://identity-api.dimo.zone/query",
"DeviceDefinitions": "https://device-definitions-api.dimo.zone",
"Telemetry": "https://telemetry-api.dimo.zone/query",
Expand All @@ -14,6 +15,7 @@
"Dev": {
"Attestation": "https://attestation-api.dev.dimo.zone",
"Auth": "https://auth.dev.dimo.zone",
"Conversations": "https://conversations-api.dev.dimo.zone",
"Identity": "https://identity-api.dev.dimo.zone/query",
"DeviceDefinitions": "https://device-definitions-api.dev.dimo.zone",
"Telemetry": "https://telemetry-api.dev.dimo.zone/query",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dimo-python-sdk"
version = "1.6.0"
version = "1.7.0"
authors = [
{ name="Barrett Kowalsky", email="barrettkowalsky@gmail.com" },
]
Expand Down
Loading