Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[project]
name = "uipath"
version = "2.2.32"
version = "2.2.33"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
dependencies = [
"uipath-runtime>=0.2.5, <0.3.0",
"uipath-runtime==0.3.0.dev1000550171",
"uipath-core>=0.1.3, <0.2.0",
"click>=8.3.1",
"httpx>=0.28.1",
Expand Down Expand Up @@ -144,3 +144,6 @@ name = "testpypi"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true

[tool.uv.sources]
uipath-runtime = { index = "testpypi" }
141 changes: 123 additions & 18 deletions src/uipath/_cli/_chat/_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import logging
import os
import uuid
from typing import Any
from urllib.parse import urlparse

Expand All @@ -12,7 +13,11 @@
UiPathConversationEvent,
UiPathConversationExchangeEndEvent,
UiPathConversationExchangeEvent,
UiPathConversationInterruptEvent,
UiPathConversationInterruptStartEvent,
UiPathConversationMessageEvent,
)
from uipath.runtime import UiPathRuntimeResult
from uipath.runtime.chat import UiPathChatProtocol
from uipath.runtime.context import UiPathRuntimeContext

Expand Down Expand Up @@ -51,6 +56,7 @@ def __init__(
self.headers = headers
self._client: AsyncClient | None = None
self._connected_event = asyncio.Event()
self._waiting_for_resume = False

async def connect(self, timeout: float = 10.0) -> None:
"""Establish WebSocket connection to the server.
Expand Down Expand Up @@ -127,23 +133,7 @@ async def disconnect(self) -> None:
logger.warning("WebSocket client not connected")
return

# Send exchange end event using stored IDs
if self._client and self._connected_event.is_set():
try:
end_event = UiPathConversationEvent(
conversation_id=self.conversation_id,
exchange=UiPathConversationExchangeEvent(
exchange_id=self.exchange_id,
end=UiPathConversationExchangeEndEvent(),
),
)
event_data = end_event.model_dump(
mode="json", exclude_none=True, by_alias=True
)
await self._client.emit("ConversationEvent", event_data)
logger.info("Exchange end event sent")
except Exception as e:
logger.warning(f"Error sending exchange end event: {e}")
await self.emit_exchange_end_event()

try:
logger.info("Disconnecting from WebSocket server")
Expand All @@ -154,7 +144,9 @@ async def disconnect(self) -> None:
finally:
await self._cleanup_client()

async def emit_message_event(self, message_event: Any) -> None:
async def emit_message_event(
self, message_event: UiPathConversationMessageEvent
) -> None:
"""Wrap and send a message event to the WebSocket server.

Args:
Expand All @@ -169,6 +161,9 @@ async def emit_message_event(self, message_event: Any) -> None:
if not self._connected_event.is_set():
raise RuntimeError("WebSocket client not in connected state")

# Store the current message ID, used for emitting interrupt events.
self._current_message_id = message_event.message_id

try:
# Wrap message event with conversation/exchange IDs
wrapped_event = UiPathConversationEvent(
Expand All @@ -191,6 +186,84 @@ async def emit_message_event(self, message_event: Any) -> None:
logger.error(f"Error sending conversation event to WebSocket: {e}")
raise RuntimeError(f"Failed to send conversation event: {e}") from e

async def emit_exchange_end_event(self):
# Send exchange end event using stored IDs
if self._client and self._connected_event.is_set():
try:
end_event = UiPathConversationEvent(
conversation_id=self.conversation_id,
exchange=UiPathConversationExchangeEvent(
exchange_id=self.exchange_id,
end=UiPathConversationExchangeEndEvent(),
),
)
event_data = end_event.model_dump(
mode="json", exclude_none=True, by_alias=True
)
await self._client.emit("ConversationEvent", event_data)
logger.info("Exchange end event sent")
except Exception as e:
logger.warning(f"Error sending exchange end event: {e}")

async def emit_interrupt_event(self, runtime_result: UiPathRuntimeResult):
# Send startInterrupt event using stored ID's
if self._client and self._connected_event.is_set():
try:

self._interrupt_id = str(uuid.uuid4())

interrupt_event = UiPathConversationEvent(
conversation_id=self.conversation_id,
exchange=UiPathConversationExchangeEvent(
exchange_id=self.exchange_id,
message=UiPathConversationMessageEvent(
message_id=self._current_message_id,
interrupt=UiPathConversationInterruptEvent(
interrupt_id=self._interrupt_id,
start=UiPathConversationInterruptStartEvent(
type="coded-agent-test", value=runtime_result.output
),
),
),
),
)
event_data = interrupt_event.model_dump(
mode="json", exclude_none=True, by_alias=True
)
await self._client.emit("ConversationEvent", event_data)
logger.info("Interrupt event sent")
except Exception as e:
logger.warning(f"Error sending interrupt event: {e}")

async def wait_for_resume(self) -> dict[str, Any]:
"""Wait for the interrupt_end event to be received.

Returns:
Resume data from the interrupt end event
"""
if self._client is None:
raise RuntimeError("WebSocket client not connected")

# Initialize resume event and data
self._resume_event = asyncio.Event()
self._resume_data = None
self._waiting_for_resume = True

# Register handler for interrupt events
self._client.on("ConversationEvent", self._handle_conversation_event)

try:
# Wait for the resume event to be signaled
await self._resume_event.wait()

# Return the resume data
resume_data = self._resume_data or {}

return resume_data
finally:
# Clear the waiting flag
self._waiting_for_resume = False

@property
def is_connected(self) -> bool:
"""Check if the WebSocket is currently connected.
Expand All @@ -214,6 +287,38 @@ async def _handle_connect_error(self, data: Any) -> None:
"""Handle connection error event."""
logger.error(f"WebSocket connection error: {data}")

async def _handle_conversation_event(self, data: Any, *args: Any) -> None:
"""Handle incoming conversation event from the server.

Args:
data: The incoming conversation event data (JSON)
*args: Additional arguments from Socket.IO
"""
# Only process events when actively waiting for resume
if not self._waiting_for_resume:
return

try:
# Parse the incoming event as a UiPathConversationEvent
event = UiPathConversationEvent.model_validate(data)

if isinstance(event.exchange, UiPathConversationExchangeEvent):
message = event.exchange.message
if message and message.message_id == self._current_message_id:
if message.interrupt:
if (
message.interrupt.interrupt_id
== self._interrupt_id
):
if message.interrupt.end:
# Extract resume data from the end event
# end is already a dict (typed as Any), no need to call model_dump
self._resume_data = message.interrupt.end
self._resume_event.set()
logger.info("Resume event received")
except Exception as e:
logger.error(f"Error handling conversation event: {e}")

async def _cleanup_client(self) -> None:
"""Clean up client resources."""
self._connected_event.clear()
Expand Down
15 changes: 9 additions & 6 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading