diff --git a/pyproject.toml b/pyproject.toml index 5b99ec684..251f00126 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.2.36" +version = "2.2.37" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/samples/calculator/evaluations/eval-sets/legacy.json b/samples/calculator/evaluations/eval-sets/legacy.json index 1e3234fae..4740c7a3a 100644 --- a/samples/calculator/evaluations/eval-sets/legacy.json +++ b/samples/calculator/evaluations/eval-sets/legacy.json @@ -1,17 +1,17 @@ { - "fileName": "default.json", - "id": "default-eval-set-id", - "name": "Basic Calculator Evaluation Set", + "fileName": "legacy.json", + "id": "a1b2c3d4-e5f6-4a89-abcd-ef0123456789", + "name": "Basic Calculator Evaluation Set (Legacy)", "batchSize": 10, "evaluatorRefs": [ - "equality", - "llm-as-a-judge", - "json-similarity", - "trajectory" + "aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa", + "bbbbbbbb-bbbb-4bbb-bbbb-bbbbbbbbbbbb", + "cccccccc-cccc-4ccc-cccc-cccccccccccc", + "dddddddd-dddd-4ddd-dddd-dddddddddddd" ], "evaluations": [ { - "id": "test-addition", + "id": "11111111-1111-4111-8111-111111111111", "name": "Test Addition", "inputs": { "a": 1, @@ -22,12 +22,12 @@ "result": 2.0 }, "expectedAgentBehavior": "The operation should produce the right output.", - "evalSetId": "default-eval-set-id", + "evalSetId": "a1b2c3d4-e5f6-4a89-abcd-ef0123456789", "createdAt": "2025-09-04T18:54:58.378Z", "updatedAt": "2025-09-04T18:55:55.416Z" }, { - "id": "test-random-addition-using-llm", + "id": "22222222-2222-4222-8222-222222222222", "name": "Test Random Addition Using LLM", "inputs": { "a": 1, @@ -45,12 +45,12 @@ "name": "get_random_operator" } ], - "evalSetId": "default-eval-set-id", + "evalSetId": "a1b2c3d4-e5f6-4a89-abcd-ef0123456789", "createdAt": "2025-09-04T18:54:58.378Z", "updatedAt": "2025-09-04T18:55:55.416Z" }, { - "id": "test-with-llm-input-mocking", + "id": "33333333-3333-4333-8333-333333333333", "name": "Test with LLM input mocking", "inputs": {}, "expectedOutput": { @@ -59,7 +59,7 @@ "expectedAgentBehavior": "The operation should produce the right output.", "simulateInput": true, "inputGenerationInstructions": "Generate a multiplication calculation where the first number is 5 and the second number is 7", - "evalSetId": "default-eval-set-id", + "evalSetId": "a1b2c3d4-e5f6-4a89-abcd-ef0123456789", "createdAt": "2025-09-04T18:54:58.378Z", "updatedAt": "2025-09-04T18:55:55.416Z" } diff --git a/samples/calculator/evaluations/evaluators/legacy-equality.json b/samples/calculator/evaluations/evaluators/legacy-equality.json index 10e073c8e..73f0fbd9a 100644 --- a/samples/calculator/evaluations/evaluators/legacy-equality.json +++ b/samples/calculator/evaluations/evaluators/legacy-equality.json @@ -1,6 +1,6 @@ { - "fileName": "equality.json", - "id": "equality", + "fileName": "legacy-equality.json", + "id": "aaaaaaaa-aaaa-4aaa-aaaa-aaaaaaaaaaaa", "name": "Equality Evaluator", "description": "An evaluator that judges the agent based on expected output.", "category": 0, diff --git a/samples/calculator/evaluations/evaluators/legacy-json-similarity.json b/samples/calculator/evaluations/evaluators/legacy-json-similarity.json index dd1fca355..d1066b0ee 100644 --- a/samples/calculator/evaluations/evaluators/legacy-json-similarity.json +++ b/samples/calculator/evaluations/evaluators/legacy-json-similarity.json @@ -1,6 +1,6 @@ { - "fileName": "json-similarity.json", - "id": "json-similarity", + "fileName": "legacy-json-similarity.json", + "id": "cccccccc-cccc-4ccc-cccc-cccccccccccc", "name": "JSON Similarity Evaluator", "description": "An evaluator that compares JSON structures with tolerance for numeric and string differences.", "category": 0, diff --git a/samples/calculator/evaluations/evaluators/legacy-llm-as-a-judge.json b/samples/calculator/evaluations/evaluators/legacy-llm-as-a-judge.json index 1b90f193f..209d663f0 100644 --- a/samples/calculator/evaluations/evaluators/legacy-llm-as-a-judge.json +++ b/samples/calculator/evaluations/evaluators/legacy-llm-as-a-judge.json @@ -1,6 +1,6 @@ { - "fileName": "llm-as-a-judge.json", - "id": "llm-as-a-judge", + "fileName": "legacy-llm-as-a-judge.json", + "id": "bbbbbbbb-bbbb-4bbb-bbbb-bbbbbbbbbbbb", "name": "LLMAsAJudge Evaluator", "description": "An evaluator that judges the agent based on it's run history and expected behavior", "category": 3, diff --git a/samples/calculator/evaluations/evaluators/legacy-trajectory.json b/samples/calculator/evaluations/evaluators/legacy-trajectory.json index 8d6e600ea..894424fd6 100644 --- a/samples/calculator/evaluations/evaluators/legacy-trajectory.json +++ b/samples/calculator/evaluations/evaluators/legacy-trajectory.json @@ -1,6 +1,6 @@ { - "fileName": "trajectory.json", - "id": "trajectory", + "fileName": "legacy-trajectory.json", + "id": "dddddddd-dddd-4ddd-dddd-dddddddddddd", "name": "Trajectory Evaluator", "description": "An evaluator that analyzes the execution trajectory and decision sequence taken by the agent.", "category": 3, diff --git a/src/uipath/_cli/_evals/_progress_reporter.py b/src/uipath/_cli/_evals/_progress_reporter.py index 92e10fed0..72f000731 100644 --- a/src/uipath/_cli/_evals/_progress_reporter.py +++ b/src/uipath/_cli/_evals/_progress_reporter.py @@ -1,1257 +1,24 @@ -"""Progress reporter for sending evaluation updates to StudioWeb.""" +"""Backward compatibility - import from _reporting instead. -import functools -import json -import logging -import os -import uuid -from datetime import datetime, timezone -from typing import Any -from urllib.parse import urlparse +This module re-exports components from the _reporting package for +backward compatibility with existing code that imports from this location. -from opentelemetry import trace -from opentelemetry.trace import SpanContext, SpanKind, TraceFlags -from pydantic import BaseModel -from rich.console import Console +For new code, prefer importing directly from: + from uipath._cli._evals._reporting import StudioWebProgressReporter +""" -from uipath._cli._evals._models._evaluation_set import ( - EvaluationItem, - EvaluationStatus, +from uipath._cli._evals._reporting import ( + CodedEvalReportingStrategy, + EvalReportingStrategy, + LegacyEvalReportingStrategy, + StudioWebProgressReporter, + gracefully_handle_errors, ) -from uipath._cli._evals._models._evaluator import Evaluator -from uipath._cli._evals._models._sw_reporting import ( - StudioWebAgentSnapshot, - StudioWebProgressItem, -) -from uipath._cli._utils._console import ConsoleLogger -from uipath._events._event_bus import EventBus -from uipath._events._events import ( - EvalRunCreatedEvent, - EvalRunUpdatedEvent, - EvalSetRunCreatedEvent, - EvalSetRunUpdatedEvent, - EvaluationEvents, -) -from uipath._utils import Endpoint, RequestSpec -from uipath._utils.constants import ( - ENV_EVAL_BACKEND_URL, - ENV_TENANT_ID, - HEADER_INTERNAL_TENANT_ID, -) -from uipath.eval.evaluators import ( - BaseEvaluator, - LegacyBaseEvaluator, -) -from uipath.eval.models import EvalItemResult, ScoreType -from uipath.platform import UiPath -from uipath.platform.common import UiPathConfig -from uipath.tracing import LlmOpsHttpExporter - -logger = logging.getLogger(__name__) - - -def gracefully_handle_errors(func): - """Decorator to catch and log errors without stopping execution.""" - - @functools.wraps(func) - async def wrapper(self, *args, **kwargs): - try: - return await func(self, *args, **kwargs) - except Exception as e: - if hasattr(self, "_console"): - error_type = type(e).__name__ - # Log the full error message for debugging - logger.debug(f"Full error details: {e}") - logger.warning( - f"Cannot report progress to SW. " - f"Function: {func.__name__}, " - f"Error type: {error_type}, " - f"Details: {e}" - ) - return None - - return wrapper - - -class StudioWebProgressReporter: - """Handles reporting evaluation progress to StudioWeb.""" - - def __init__(self, spans_exporter: LlmOpsHttpExporter): - self.spans_exporter = spans_exporter - - logging.getLogger("uipath._cli.middlewares").setLevel(logging.CRITICAL) - console_logger = ConsoleLogger.get_instance() - - # Use UIPATH_EVAL_BACKEND_URL for eval-specific routing if set - eval_backend_url = os.getenv(ENV_EVAL_BACKEND_URL) - uipath = UiPath(base_url=eval_backend_url) if eval_backend_url else UiPath() - - self._client = uipath.api_client - self._console = console_logger - self._rich_console = Console() - self._project_id = os.getenv("UIPATH_PROJECT_ID", None) - if not self._project_id: - logger.warning( - "Cannot report data to StudioWeb. Please set UIPATH_PROJECT_ID." - ) - - self.eval_set_run_ids: dict[str, str] = {} - self.evaluators: dict[str, Any] = {} - self.evaluator_scores: dict[str, list[float]] = {} - self.eval_run_ids: dict[str, str] = {} - self.is_coded_eval: dict[str, bool] = {} # Track coded vs legacy per execution - self.eval_spans: dict[ - str, list[Any] - ] = {} # Store spans per execution for usage metrics - self.eval_set_execution_id: str | None = ( - None # Track current eval set execution ID - ) - - def _format_error_message(self, error: Exception, context: str) -> None: - """Helper method to format and display error messages consistently.""" - self._rich_console.print(f" • \u26a0 [dim]{context}: {error}[/dim]") - - def _is_localhost(self) -> bool: - """Check if the eval backend URL is localhost. - - Returns: - True if using localhost, False otherwise. - """ - eval_backend_url = os.getenv(ENV_EVAL_BACKEND_URL, "") - if eval_backend_url: - try: - parsed = urlparse(eval_backend_url) - hostname = parsed.hostname or parsed.netloc.split(":")[0] - return hostname.lower() in ("localhost", "127.0.0.1") - except Exception: - pass - return False - - def _get_endpoint_prefix(self) -> str: - """Determine the endpoint prefix based on environment. - - Checks UIPATH_EVAL_BACKEND_URL environment variable: - - If set to localhost/127.0.0.1: returns "api/" (direct API access) - - Otherwise: returns "agentsruntime_/api/" (service routing for alpha/prod) - - Returns: - "api/" for localhost environments, "agentsruntime_/api/" for alpha/production. - """ - if self._is_localhost(): - return "api/" - return "agentsruntime_/api/" - - def _is_coded_evaluator( - self, evaluators: list[BaseEvaluator[Any, Any, Any]] - ) -> bool: - """Check if evaluators are coded (BaseEvaluator) vs legacy (LegacyBaseEvaluator). - - Args: - evaluators: List of evaluators to check - - Returns: - True if using coded evaluators, False for legacy evaluators - """ - if not evaluators: - return False - # Check the first evaluator type - return not isinstance(evaluators[0], LegacyBaseEvaluator) - - def _extract_usage_from_spans( - self, spans: list[Any] - ) -> dict[str, int | float | None]: - """Extract token usage and cost from OpenTelemetry spans. - - Args: - spans: List of ReadableSpan objects from agent execution - - Returns: - Dictionary with tokens, completionTokens, promptTokens, and cost - """ - total_tokens = 0 - completion_tokens = 0 - prompt_tokens = 0 - total_cost = 0.0 - - for span in spans: - try: - # Handle both dictionary attributes and string Attributes field - attrs = None - if hasattr(span, "attributes") and span.attributes: - if isinstance(span.attributes, dict): - attrs = span.attributes - elif isinstance(span.attributes, str): - # Parse JSON string attributes - attrs = json.loads(span.attributes) - - # Also check for Attributes field (capitalized) from backend spans - if not attrs and hasattr(span, "Attributes") and span.Attributes: - if isinstance(span.Attributes, str): - attrs = json.loads(span.Attributes) - elif isinstance(span.Attributes, dict): - attrs = span.Attributes - - if attrs: - # Try to get usage from nested usage object (backend format) - if "usage" in attrs and isinstance(attrs["usage"], dict): - usage = attrs["usage"] - prompt_tokens += usage.get("promptTokens", 0) - completion_tokens += usage.get("completionTokens", 0) - total_tokens += usage.get("totalTokens", 0) - # Cost might be in usage or at root level - total_cost += usage.get("cost", 0.0) - - # Also try OpenTelemetry semantic conventions (SDK format) - prompt_tokens += attrs.get("gen_ai.usage.prompt_tokens", 0) - completion_tokens += attrs.get("gen_ai.usage.completion_tokens", 0) - total_tokens += attrs.get("gen_ai.usage.total_tokens", 0) - total_cost += attrs.get("gen_ai.usage.cost", 0.0) - total_cost += attrs.get("llm.usage.cost", 0.0) - - except (json.JSONDecodeError, AttributeError, TypeError) as e: - logger.debug(f"Failed to parse span attributes: {e}") - continue - - return { - "tokens": total_tokens if total_tokens > 0 else None, - "completionTokens": completion_tokens if completion_tokens > 0 else None, - "promptTokens": prompt_tokens if prompt_tokens > 0 else None, - "cost": total_cost if total_cost > 0 else None, - } - - @gracefully_handle_errors - async def create_eval_set_run_sw( - self, - eval_set_id: str, - agent_snapshot: StudioWebAgentSnapshot, - no_of_evals: int, - evaluators: list[LegacyBaseEvaluator[Any]], - is_coded: bool = False, - ) -> str: - """Create a new evaluation set run in StudioWeb.""" - spec = self._create_eval_set_run_spec( - eval_set_id, agent_snapshot, no_of_evals, is_coded - ) - response = await self._client.request_async( - method=spec.method, - url=spec.endpoint, - params=spec.params, - json=spec.json, - headers=spec.headers, - scoped="org" if self._is_localhost() else "tenant", - ) - eval_set_run_id = json.loads(response.content)["id"] - return eval_set_run_id - - @gracefully_handle_errors - async def create_eval_run( - self, eval_item: EvaluationItem, eval_set_run_id: str, is_coded: bool = False - ) -> str: - """Create a new evaluation run in StudioWeb. - - Args: - eval_item: Dictionary containing evaluation data - eval_set_run_id: The ID of the evaluation set run - is_coded: Whether this is a coded evaluation (vs legacy) - - Returns: - The ID of the created evaluation run - """ - spec = self._create_eval_run_spec(eval_item, eval_set_run_id, is_coded) - response = await self._client.request_async( - method=spec.method, - url=spec.endpoint, - params=spec.params, - json=spec.json, - headers=spec.headers, - scoped="org" if self._is_localhost() else "tenant", - ) - return json.loads(response.content)["id"] - - @gracefully_handle_errors - async def update_eval_run( - self, - sw_progress_item: StudioWebProgressItem, - evaluators: dict[str, Evaluator], - is_coded: bool = False, - spans: list[Any] | None = None, - ): - """Update an evaluation run with results.""" - coded_evaluators: dict[str, BaseEvaluator[Any, Any, Any]] = {} - legacy_evaluators: dict[str, LegacyBaseEvaluator[Any]] = {} - evaluator_runs: list[dict[str, Any]] = [] - evaluator_scores: list[dict[str, Any]] = [] - - for k, v in evaluators.items(): - if isinstance(v, LegacyBaseEvaluator): - legacy_evaluators[k] = v - elif isinstance(v, BaseEvaluator): - coded_evaluators[k] = v - - # Use coded evaluator format - runs, scores = self._collect_coded_results( - sw_progress_item.eval_results, coded_evaluators, spans or [] - ) - evaluator_runs.extend(runs) - evaluator_scores.extend(scores) - - # Use legacy evaluator format - runs, scores = self._collect_results( - sw_progress_item.eval_results, - legacy_evaluators, - spans or [], - ) - evaluator_runs.extend(runs) - evaluator_scores.extend(scores) - - # Use the appropriate spec method based on evaluation type - if is_coded: - spec = self._update_coded_eval_run_spec( - evaluator_runs=evaluator_runs, - evaluator_scores=evaluator_scores, - eval_run_id=sw_progress_item.eval_run_id, - execution_time=sw_progress_item.agent_execution_time, - actual_output=sw_progress_item.agent_output, - success=sw_progress_item.success, - is_coded=is_coded, - ) - else: - spec = self._update_eval_run_spec( - assertion_runs=evaluator_runs, - evaluator_scores=evaluator_scores, - eval_run_id=sw_progress_item.eval_run_id, - execution_time=sw_progress_item.agent_execution_time, - actual_output=sw_progress_item.agent_output, - success=sw_progress_item.success, - is_coded=is_coded, - ) - - await self._client.request_async( - method=spec.method, - url=spec.endpoint, - params=spec.params, - json=spec.json, - headers=spec.headers, - scoped="org" if self._is_localhost() else "tenant", - ) - - @gracefully_handle_errors - async def update_eval_set_run( - self, - eval_set_run_id: str, - evaluator_scores: dict[str, float], - is_coded: bool = False, - success: bool = True, - ): - """Update the evaluation set run status to complete.""" - spec = self._update_eval_set_run_spec( - eval_set_run_id, evaluator_scores, is_coded, success - ) - await self._client.request_async( - method=spec.method, - url=spec.endpoint, - params=spec.params, - json=spec.json, - headers=spec.headers, - scoped="org" if self._is_localhost() else "tenant", - ) - - async def handle_create_eval_set_run(self, payload: EvalSetRunCreatedEvent) -> None: - try: - self.evaluators = {eval.id: eval for eval in payload.evaluators} - self.evaluator_scores = {eval.id: [] for eval in payload.evaluators} - - # Store the eval set execution ID for mapping eval runs to eval set - self.eval_set_execution_id = payload.execution_id - - # Detect if using coded evaluators and store for this execution - is_coded = self._is_coded_evaluator(payload.evaluators) - self.is_coded_eval[payload.execution_id] = is_coded - - eval_set_run_id = payload.eval_set_run_id - if not eval_set_run_id: - eval_set_run_id = await self.create_eval_set_run_sw( - eval_set_id=payload.eval_set_id, - agent_snapshot=self._extract_agent_snapshot(payload.entrypoint), - no_of_evals=payload.no_of_evals, - evaluators=payload.evaluators, - is_coded=is_coded, - ) - self.eval_set_run_ids[payload.execution_id] = eval_set_run_id - current_span = trace.get_current_span() - if current_span.is_recording(): - current_span.set_attribute("eval_set_run_id", eval_set_run_id) - - # Create and send parent trace for the evaluation set run - if eval_set_run_id: - await self._send_parent_trace(eval_set_run_id, payload.eval_set_id) - - logger.debug( - f"Created eval set run with ID: {eval_set_run_id} (coded={is_coded})" - ) - - except Exception as e: - self._format_error_message(e, "StudioWeb create eval set run error") - - async def handle_create_eval_run(self, payload: EvalRunCreatedEvent) -> None: - try: - # Use the stored eval set execution ID to find the eval_set_run_id - if self.eval_set_execution_id and ( - eval_set_run_id := self.eval_set_run_ids.get(self.eval_set_execution_id) - ): - # Get the is_coded flag for this execution - is_coded = self.is_coded_eval.get(self.eval_set_execution_id, False) - eval_run_id = await self.create_eval_run( - payload.eval_item, eval_set_run_id, is_coded - ) - if eval_run_id: - # Store eval_run_id with the individual eval run's execution_id - self.eval_run_ids[payload.execution_id] = eval_run_id - - logger.debug( - f"Created eval run with ID: {eval_run_id} (coded={is_coded})" - ) - else: - logger.warning("Cannot create eval run: eval_set_run_id not available") - - except Exception as e: - self._format_error_message(e, "StudioWeb create eval run error") - - async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None: - try: - eval_run_id = self.eval_run_ids.get(payload.execution_id) - - # Use evalRunId as the trace_id for agent execution spans - # This makes all agent spans children of the eval run trace - if eval_run_id: - self.spans_exporter.trace_id = eval_run_id - else: - # Fallback to evalSetRunId if eval_run_id not available yet - if self.eval_set_execution_id: - self.spans_exporter.trace_id = self.eval_set_run_ids.get( - self.eval_set_execution_id - ) - - self.spans_exporter.export(payload.spans) - - for eval_result in payload.eval_results: - evaluator_id = eval_result.evaluator_id - if evaluator_id in self.evaluator_scores: - match eval_result.result.score_type: - case ScoreType.NUMERICAL: - self.evaluator_scores[evaluator_id].append( - eval_result.result.score - ) - case ScoreType.BOOLEAN: - self.evaluator_scores[evaluator_id].append( - 100 if eval_result.result.score else 0 - ) - case ScoreType.ERROR: - self.evaluator_scores[evaluator_id].append(0) - - if eval_run_id and self.eval_set_execution_id: - # Get the is_coded flag for this execution - is_coded = self.is_coded_eval.get(self.eval_set_execution_id, False) - - # Extract usage metrics from spans - self._extract_usage_from_spans(payload.spans) - - # Send evaluator traces - await self._send_evaluator_traces( - eval_run_id, payload.eval_results, payload.spans - ) - - await self.update_eval_run( - StudioWebProgressItem( - eval_run_id=eval_run_id, - eval_results=payload.eval_results, - success=payload.success, - agent_output=payload.agent_output, - agent_execution_time=payload.agent_execution_time, - ), - self.evaluators, - is_coded=is_coded, - spans=payload.spans, - ) - - logger.debug( - f"Updated eval run with ID: {eval_run_id} (coded={is_coded})" - ) - - except Exception as e: - self._format_error_message(e, "StudioWeb reporting error") - - async def handle_update_eval_set_run(self, payload: EvalSetRunUpdatedEvent) -> None: - try: - if eval_set_run_id := self.eval_set_run_ids.get(payload.execution_id): - # Get the is_coded flag for this execution - is_coded = self.is_coded_eval.get(payload.execution_id, False) - await self.update_eval_set_run( - eval_set_run_id, - payload.evaluator_scores, - is_coded=is_coded, - success=payload.success, - ) - status_str = "completed" if payload.success else "failed" - logger.debug( - f"Updated eval set run with ID: {eval_set_run_id} (coded={is_coded}, status={status_str})" - ) - else: - logger.warning( - "Cannot update eval set run: eval_set_run_id not available" - ) - - except Exception as e: - self._format_error_message(e, "StudioWeb update eval set run error") - - async def subscribe_to_eval_runtime_events(self, event_bus: EventBus) -> None: - event_bus.subscribe( - EvaluationEvents.CREATE_EVAL_SET_RUN, self.handle_create_eval_set_run - ) - event_bus.subscribe( - EvaluationEvents.CREATE_EVAL_RUN, self.handle_create_eval_run - ) - event_bus.subscribe( - EvaluationEvents.UPDATE_EVAL_RUN, self.handle_update_eval_run - ) - event_bus.subscribe( - EvaluationEvents.UPDATE_EVAL_SET_RUN, self.handle_update_eval_set_run - ) - - logger.debug("StudioWeb progress reporter subscribed to evaluation events") - - def _serialize_justification( - self, justification: BaseModel | str | None - ) -> str | None: - """Serialize justification to JSON string for API compatibility. - - Args: - justification: The justification object which could be None, a BaseModel, - a string, or any other JSON-serializable object - - Returns: - JSON string representation or None if justification is None - """ - if isinstance(justification, BaseModel): - justification = json.dumps(justification.model_dump()) - - return justification - - def _extract_agent_snapshot(self, entrypoint: str) -> StudioWebAgentSnapshot: - try: - entry_points_file_path = os.path.join( - os.getcwd(), str(UiPathConfig.entry_points_file_path) - ) - if not os.path.exists(entry_points_file_path): - return StudioWebAgentSnapshot(input_schema={}, output_schema={}) - - with open(entry_points_file_path, "r") as f: - entry_points = json.load(f).get("entryPoints", []) - - ep = None - for entry_point in entry_points: - if entry_point.get("filePath") == entrypoint: - ep = entry_point - break - - if not ep: - logger.warning( - f"Entrypoint {entrypoint} not found in configuration file" - ) - return StudioWebAgentSnapshot(input_schema={}, output_schema={}) - - input_schema = ep.get("input", {}) - output_schema = ep.get("output", {}) - - return StudioWebAgentSnapshot( - input_schema=input_schema, output_schema=output_schema - ) - except Exception as e: - logger.warning(f"Failed to extract agent snapshot: {e}") - return StudioWebAgentSnapshot(input_schema={}, output_schema={}) - - def _collect_results( - self, - eval_results: list[EvalItemResult], - evaluators: dict[str, LegacyBaseEvaluator[Any]], - spans: list[Any], - ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - assertion_runs: list[dict[str, Any]] = [] - evaluator_scores_list: list[dict[str, Any]] = [] - - # Extract usage metrics from spans - usage_metrics = self._extract_usage_from_spans(spans) - - for eval_result in eval_results: - # Skip results for evaluators not in the provided dict - # (happens when processing mixed coded/legacy eval sets) - if eval_result.evaluator_id not in evaluators: - continue - - # Legacy API expects evaluatorId as GUID, convert string to GUID - try: - uuid.UUID(eval_result.evaluator_id) - evaluator_id_value = eval_result.evaluator_id - except ValueError: - # Generate deterministic UUID5 from string - evaluator_id_value = str( - uuid.uuid5(uuid.NAMESPACE_DNS, eval_result.evaluator_id) - ) - - # Convert BaseModel justification to JSON string for API compatibility - justification = self._serialize_justification(eval_result.result.details) - - evaluator_scores_list.append( - { - "type": eval_result.result.score_type.value, - "value": eval_result.result.score, - "justification": justification, - "evaluatorId": evaluator_id_value, - } - ) - assertion_runs.append( - { - "status": EvaluationStatus.COMPLETED.value, - "evaluatorId": evaluator_id_value, - "completionMetrics": { - "duration": int(eval_result.result.evaluation_time) - if eval_result.result.evaluation_time - else 0, - "cost": usage_metrics["cost"], - "tokens": usage_metrics["tokens"] or 0, - "completionTokens": usage_metrics["completionTokens"] or 0, - "promptTokens": usage_metrics["promptTokens"] or 0, - }, - "assertionSnapshot": { - "assertionType": evaluators[ - eval_result.evaluator_id - ].evaluator_type.name, - "outputKey": evaluators[ - eval_result.evaluator_id - ].target_output_key, - }, - } - ) - return assertion_runs, evaluator_scores_list - - def _collect_coded_results( - self, - eval_results: list[EvalItemResult], - evaluators: dict[str, BaseEvaluator[Any, Any, Any]], - spans: list[Any], - ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - """Collect results for coded evaluators. - - Returns evaluatorRuns and scores in the format expected by coded eval endpoints. - """ - evaluator_runs: list[dict[str, Any]] = [] - evaluator_scores_list: list[dict[str, Any]] = [] - - # Extract usage metrics from spans - usage_metrics = self._extract_usage_from_spans(spans) - - for eval_result in eval_results: - # Skip results for evaluators not in the provided dict - # (happens when processing mixed coded/legacy eval sets) - if eval_result.evaluator_id not in evaluators: - continue - - # Convert BaseModel justification to JSON string for API compatibility - justification = self._serialize_justification(eval_result.result.details) - - evaluator_scores_list.append( - { - "type": eval_result.result.score_type.value, - "value": eval_result.result.score, - "justification": justification, - "evaluatorId": eval_result.evaluator_id, - } - ) - evaluator_runs.append( - { - "status": EvaluationStatus.COMPLETED.value, - "evaluatorId": eval_result.evaluator_id, - "result": { - "score": { - "type": eval_result.result.score_type.value, - "value": eval_result.result.score, - }, - "justification": justification, - }, - "completionMetrics": { - "duration": int(eval_result.result.evaluation_time) - if eval_result.result.evaluation_time - else 0, - "cost": usage_metrics["cost"], - "tokens": usage_metrics["tokens"] or 0, - "completionTokens": usage_metrics["completionTokens"] or 0, - "promptTokens": usage_metrics["promptTokens"] or 0, - }, - } - ) - return evaluator_runs, evaluator_scores_list - - def _update_eval_run_spec( - self, - assertion_runs: list[dict[str, Any]], - evaluator_scores: list[dict[str, Any]], - eval_run_id: str, - actual_output: dict[str, Any], - execution_time: float, - success: bool, - is_coded: bool = False, - ) -> RequestSpec: - # For legacy evaluations, endpoint is without /coded - endpoint_suffix = "coded/" if is_coded else "" - - # Determine status based on success - status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED - - inner_payload: dict[str, Any] = { - "evalRunId": eval_run_id, - # Backend expects integer status - "status": status.value, - "result": { - "output": dict(actual_output), - "evaluatorScores": evaluator_scores, - }, - "completionMetrics": {"duration": int(execution_time)}, - "assertionRuns": assertion_runs, - } - - # Legacy backend expects payload wrapped in "request" field - # Coded backend accepts payload directly - # Both coded and legacy send payload directly at root level - payload = inner_payload - - return RequestSpec( - method="PUT", - endpoint=Endpoint( - f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/{endpoint_suffix}evalRun" - ), - json=payload, - headers=self._tenant_header(), - ) - - def _update_coded_eval_run_spec( - self, - evaluator_runs: list[dict[str, Any]], - evaluator_scores: list[dict[str, Any]], - eval_run_id: str, - actual_output: dict[str, Any], - execution_time: float, - success: bool, - is_coded: bool = False, - ) -> RequestSpec: - """Create update spec for coded evaluators.""" - # For coded evaluations, endpoint has /coded - endpoint_suffix = "coded/" if is_coded else "" - - # Determine status based on success - status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED - - payload: dict[str, Any] = { - "evalRunId": eval_run_id, - # For coded evaluations, use integer status; for legacy, use string - "status": status.value, - "result": { - "output": dict(actual_output), - "scores": evaluator_scores, - }, - "completionMetrics": {"duration": int(execution_time)}, - "evaluatorRuns": evaluator_runs, - } - - return RequestSpec( - method="PUT", - endpoint=Endpoint( - f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/{endpoint_suffix}evalRun" - ), - json=payload, - headers=self._tenant_header(), - ) - - def _create_eval_run_spec( - self, eval_item: EvaluationItem, eval_set_run_id: str, is_coded: bool = False - ) -> RequestSpec: - # Legacy API expects eval IDs as GUIDs, coded accepts strings - # Convert string IDs to deterministic GUIDs for legacy - if is_coded: - eval_item_id = eval_item.id - else: - # Try to parse as GUID, if it fails, generate deterministic GUID from string - try: - uuid.UUID(eval_item.id) - eval_item_id = eval_item.id - except ValueError: - # Generate deterministic UUID5 from string - eval_item_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, eval_item.id)) - - # Build eval snapshot based on evaluation item type - eval_snapshot = { - "id": eval_item_id, - "name": eval_item.name, - "inputs": eval_item.inputs, - } - - # For coded evaluators, use evaluationCriterias directly - # For legacy evaluators, extract expectedOutput from the migrated evaluationCriterias - # (Legacy evals are migrated to EvaluationItem format with expectedOutput inside evaluationCriterias) - if is_coded: - eval_snapshot["evaluationCriterias"] = eval_item.evaluation_criterias - else: - # Legacy backend endpoint expects expectedOutput directly in evalSnapshot - # Extract it from the first evaluator criteria (all criteria have the same expectedOutput) - expected_output = {} - if eval_item.evaluation_criterias: - first_criteria = next( - iter(eval_item.evaluation_criterias.values()), None - ) - if first_criteria and isinstance(first_criteria, dict): - expected_output = first_criteria.get("expectedOutput", {}) - eval_snapshot["expectedOutput"] = expected_output - - # For legacy evaluations, endpoint is without /coded - endpoint_suffix = "coded/" if is_coded else "" - - inner_payload: dict[str, Any] = { - "evalSetRunId": eval_set_run_id, - "evalSnapshot": eval_snapshot, - # Backend expects integer status - "status": EvaluationStatus.IN_PROGRESS.value, - } - - # Legacy backend expects payload wrapped in "request" field - # Coded backend accepts payload directly - # Both coded and legacy send payload directly at root level - payload = inner_payload - - return RequestSpec( - method="POST", - endpoint=Endpoint( - f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/{endpoint_suffix}evalRun" - ), - json=payload, - headers=self._tenant_header(), - ) - - def _create_eval_set_run_spec( - self, - eval_set_id: str, - agent_snapshot: StudioWebAgentSnapshot, - no_of_evals: int, - is_coded: bool = False, - ) -> RequestSpec: - # For legacy evaluations, endpoint is without /coded - endpoint_suffix = "coded/" if is_coded else "" - - # Legacy API expects evalSetId as GUID, coded accepts string - # Convert string IDs to deterministic GUIDs for legacy - if is_coded: - eval_set_id_value = eval_set_id - else: - # Try to parse as GUID, if it fails, generate deterministic GUID from string - try: - uuid.UUID(eval_set_id) - eval_set_id_value = eval_set_id - except ValueError: - # Generate deterministic UUID5 from string - eval_set_id_value = str(uuid.uuid5(uuid.NAMESPACE_DNS, eval_set_id)) - - inner_payload: dict[str, Any] = { - "agentId": self._project_id, - "evalSetId": eval_set_id_value, - "agentSnapshot": agent_snapshot.model_dump(by_alias=True), - # Backend expects integer status - "status": EvaluationStatus.IN_PROGRESS.value, - "numberOfEvalsExecuted": no_of_evals, - # Source is required by the backend (0 = coded SDK) - "source": 0, - } - - # Both coded and legacy send payload directly at root level - payload = inner_payload - - return RequestSpec( - method="POST", - endpoint=Endpoint( - f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/{endpoint_suffix}evalSetRun" - ), - json=payload, - headers=self._tenant_header(), - ) - - def _update_eval_set_run_spec( - self, - eval_set_run_id: str, - evaluator_scores: dict[str, float], - is_coded: bool = False, - success: bool = True, - ) -> RequestSpec: - # Legacy API expects evaluatorId as GUID, coded accepts string - evaluator_scores_list = [] - for evaluator_id, avg_score in evaluator_scores.items(): - if is_coded: - evaluator_id_value = evaluator_id - else: - # Convert string to GUID for legacy - try: - uuid.UUID(evaluator_id) - evaluator_id_value = evaluator_id - except ValueError: - # Generate deterministic UUID5 from string - evaluator_id_value = str( - uuid.uuid5(uuid.NAMESPACE_DNS, evaluator_id) - ) - - evaluator_scores_list.append( - {"value": avg_score, "evaluatorId": evaluator_id_value} - ) - - # For legacy evaluations, endpoint is without /coded - endpoint_suffix = "coded/" if is_coded else "" - - # Determine status based on success - status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED - - inner_payload: dict[str, Any] = { - "evalSetRunId": eval_set_run_id, - # Backend expects integer status - "status": status.value, - "evaluatorScores": evaluator_scores_list, - } - - # Legacy backend expects payload wrapped in "request" field - # Coded backend accepts payload directly - # Both coded and legacy send payload directly at root level - payload = inner_payload - - return RequestSpec( - method="PUT", - endpoint=Endpoint( - f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/{endpoint_suffix}evalSetRun" - ), - json=payload, - headers=self._tenant_header(), - ) - - def _tenant_header(self) -> dict[str, str | None]: - tenant_id = os.getenv(ENV_TENANT_ID, None) - if not tenant_id: - self._console.error( - f"{ENV_TENANT_ID} env var is not set. Please run 'uipath auth'." - ) - return {HEADER_INTERNAL_TENANT_ID: tenant_id} - - async def _send_parent_trace( - self, eval_set_run_id: str, eval_set_name: str - ) -> None: - """Send the parent trace span for the evaluation set run. - - Args: - eval_set_run_id: The ID of the evaluation set run - eval_set_name: The name of the evaluation set - """ - try: - # Get the tracer - tracer = trace.get_tracer(__name__) - - # Convert eval_set_run_id to trace ID format (128-bit integer) - trace_id_int = int(uuid.UUID(eval_set_run_id)) - - # Create a span context with the eval_set_run_id as the trace ID - span_context = SpanContext( - trace_id=trace_id_int, - span_id=trace_id_int, # Use same ID for root span - is_remote=False, - trace_flags=TraceFlags(0x01), # Sampled - ) - - # Create a non-recording span with our custom context - ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_context)) - - # Start a new span with the custom trace ID - with tracer.start_as_current_span( - eval_set_name, - context=ctx, - kind=SpanKind.INTERNAL, - start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), - ) as span: - # Set attributes for the evaluation set span - span.set_attribute("openinference.span.kind", "CHAIN") - span.set_attribute("span.type", "evaluationSet") - span.set_attribute("eval_set_run_id", eval_set_run_id) - - logger.debug(f"Created parent trace for eval set run: {eval_set_run_id}") - - except Exception as e: - logger.warning(f"Failed to create parent trace: {e}") - - async def _send_eval_run_trace( - self, eval_run_id: str, eval_set_run_id: str, eval_name: str - ) -> None: - """Send the child trace span for an evaluation run. - - Args: - eval_run_id: The ID of the evaluation run - eval_set_run_id: The ID of the parent evaluation set run - eval_name: The name of the evaluation - """ - try: - # Get the tracer - tracer = trace.get_tracer(__name__) - - # Convert IDs to trace format - trace_id_int = int(uuid.UUID(eval_run_id)) - parent_span_id_int = int(uuid.UUID(eval_set_run_id)) - - # Create a parent span context - parent_context = SpanContext( - trace_id=trace_id_int, - span_id=parent_span_id_int, - is_remote=False, - trace_flags=TraceFlags(0x01), - ) - - # Create context with parent span - ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) - - # Start a new span with the eval_run_id as trace ID - with tracer.start_as_current_span( - eval_name, - context=ctx, - kind=SpanKind.INTERNAL, - start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), - ) as span: - # Set attributes for the evaluation run span - span.set_attribute("openinference.span.kind", "CHAIN") - span.set_attribute("span.type", "evaluation") - span.set_attribute("eval_run_id", eval_run_id) - span.set_attribute("eval_set_run_id", eval_set_run_id) - - logger.debug( - f"Created trace for eval run: {eval_run_id} (parent: {eval_set_run_id})" - ) - - except Exception as e: - logger.warning(f"Failed to create eval run trace: {e}") - - async def _send_evaluator_traces( - self, eval_run_id: str, eval_results: list[EvalItemResult], spans: list[Any] - ) -> None: - """Send trace spans for all evaluators. - - Args: - eval_run_id: The ID of the evaluation run - eval_results: List of evaluator results - spans: List of spans that may contain evaluator LLM calls - """ - try: - if not eval_results: - logger.debug( - f"No evaluator results to trace for eval run: {eval_run_id}" - ) - return - - # First, export the agent execution spans so they appear in the trace - agent_readable_spans = [] - if spans: - for span in spans: - if hasattr(span, "_readable_span"): - agent_readable_spans.append(span._readable_span()) - - if agent_readable_spans: - self.spans_exporter.export(agent_readable_spans) - logger.debug( - f"Exported {len(agent_readable_spans)} agent execution spans for eval run: {eval_run_id}" - ) - - # Get the tracer - tracer = trace.get_tracer(__name__) - - # Calculate overall start and end times for the evaluators parent span - # Since evaluators run sequentially, the parent span duration should be - # the sum of all individual evaluator times - now = datetime.now(timezone.utc) - - # Sum all evaluator execution times for sequential execution - total_eval_time = ( - sum( - ( - r.result.evaluation_time - for r in eval_results - if r.result.evaluation_time - ) - ) - or 0.0 - ) - - # Parent span covers the sequential evaluation period - parent_end_time = now - parent_start_time = ( - datetime.fromtimestamp( - now.timestamp() - total_eval_time, tz=timezone.utc - ) - if total_eval_time > 0 - else now - ) - - # Find the root execution span from the agent spans - # The root span typically has no parent - root_span_uuid = None - if spans: - from uipath.tracing._utils import _SpanUtils - - for span in spans: - # Check if this span has no parent (indicating it's the root) - if span.parent is None: - # Get the span context and convert to UUID - span_context = span.get_span_context() - root_span_uuid = _SpanUtils.span_id_to_uuid4( - span_context.span_id - ) - break - - # Convert eval_run_id to trace ID format - trace_id_int = int(uuid.UUID(eval_run_id)) - - # Create parent span context - child of root span if available - # The root span should be the eval span (the agent execution root) - if root_span_uuid: - # Convert root span UUID to integer for SpanContext - root_span_id_int = int(root_span_uuid) - parent_context = SpanContext( - trace_id=trace_id_int, - span_id=root_span_id_int, - is_remote=False, - trace_flags=TraceFlags(0x01), - ) - ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) - else: - # No root span found, create as root span with eval_run_id as both trace and span - parent_context = SpanContext( - trace_id=trace_id_int, - span_id=trace_id_int, - is_remote=False, - trace_flags=TraceFlags(0x01), - ) - ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) - - # Create the evaluators parent span - parent_start_ns = int(parent_start_time.timestamp() * 1_000_000_000) - parent_end_ns = int(parent_end_time.timestamp() * 1_000_000_000) - - # Start parent span manually (not using with statement) to control end time - parent_span = tracer.start_span( - "Evaluators", - context=ctx, - kind=SpanKind.INTERNAL, - start_time=parent_start_ns, - ) - - # Set attributes for the evaluators parent span - parent_span.set_attribute("openinference.span.kind", "CHAIN") - parent_span.set_attribute("span.type", "evaluators") - parent_span.set_attribute("eval_run_id", eval_run_id) - - # Make this span the active span for child spans - parent_ctx = trace.set_span_in_context(parent_span, ctx) - - # Track the current time for sequential execution - current_time = parent_start_time - - # Collect all readable spans for export - readable_spans = [] - - # Create individual evaluator spans - running sequentially - for eval_result in eval_results: - # Get evaluator name from stored evaluators - evaluator = self.evaluators.get(eval_result.evaluator_id) - evaluator_name = evaluator.id if evaluator else eval_result.evaluator_id - - # Each evaluator starts where the previous one ended (sequential execution) - eval_time = eval_result.result.evaluation_time or 0 - eval_start = current_time - eval_end = datetime.fromtimestamp( - current_time.timestamp() + eval_time, tz=timezone.utc - ) - - # Move current time forward for the next evaluator - current_time = eval_end - - # Create timestamps - eval_start_ns = int(eval_start.timestamp() * 1_000_000_000) - eval_end_ns = int(eval_end.timestamp() * 1_000_000_000) - - # Start evaluator span manually (not using with statement) to control end time - evaluator_span = tracer.start_span( - evaluator_name, - context=parent_ctx, - kind=SpanKind.INTERNAL, - start_time=eval_start_ns, - ) - - # Set attributes for the evaluator span - evaluator_span.set_attribute("openinference.span.kind", "EVALUATOR") - evaluator_span.set_attribute("span.type", "evaluator") - evaluator_span.set_attribute("evaluator_id", eval_result.evaluator_id) - evaluator_span.set_attribute("evaluator_name", evaluator_name) - evaluator_span.set_attribute("eval_run_id", eval_run_id) - evaluator_span.set_attribute("score", eval_result.result.score) - evaluator_span.set_attribute( - "score_type", eval_result.result.score_type.name - ) - - # Add details/justification if available - if eval_result.result.details: - if isinstance(eval_result.result.details, BaseModel): - evaluator_span.set_attribute( - "details", - json.dumps(eval_result.result.details.model_dump()), - ) - else: - evaluator_span.set_attribute( - "details", str(eval_result.result.details) - ) - - # Add evaluation time if available - if eval_result.result.evaluation_time: - evaluator_span.set_attribute( - "evaluation_time", eval_result.result.evaluation_time - ) - - # Set status based on score type - from opentelemetry.trace import Status, StatusCode - - if eval_result.result.score_type == ScoreType.ERROR: - evaluator_span.set_status( - Status(StatusCode.ERROR, "Evaluation failed") - ) - else: - evaluator_span.set_status(Status(StatusCode.OK)) - - # End the evaluator span at the correct time - evaluator_span.end(end_time=eval_end_ns) - - # Convert to ReadableSpan for export - # The span object has a method to get the readable version - if hasattr(evaluator_span, "_readable_span"): - readable_spans.append(evaluator_span._readable_span()) - - # End the parent span at the correct time after all children are created - parent_span.end(end_time=parent_end_ns) - - # Convert parent span to ReadableSpan - if hasattr(parent_span, "_readable_span"): - # Add parent span at the beginning for proper ordering - readable_spans.insert(0, parent_span._readable_span()) - - # Export all evaluator spans together - if readable_spans: - self.spans_exporter.export(readable_spans) - logger.debug( - f"Created evaluator traces for eval run: {eval_run_id} ({len(eval_results)} evaluators)" - ) - except Exception as e: - logger.warning(f"Failed to create evaluator traces: {e}") +__all__ = [ + "StudioWebProgressReporter", + "EvalReportingStrategy", + "LegacyEvalReportingStrategy", + "CodedEvalReportingStrategy", + "gracefully_handle_errors", +] diff --git a/src/uipath/_cli/_evals/_reporting/__init__.py b/src/uipath/_cli/_evals/_reporting/__init__.py new file mode 100644 index 000000000..30b5d48d1 --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/__init__.py @@ -0,0 +1,21 @@ +"""Evaluation progress reporting module. + +This module provides components for reporting evaluation progress to StudioWeb, +supporting both legacy and coded evaluation formats through the Strategy Pattern. +""" + +from uipath._cli._evals._reporting._reporter import StudioWebProgressReporter +from uipath._cli._evals._reporting._strategies import ( + CodedEvalReportingStrategy, + EvalReportingStrategy, + LegacyEvalReportingStrategy, +) +from uipath._cli._evals._reporting._utils import gracefully_handle_errors + +__all__ = [ + "StudioWebProgressReporter", + "EvalReportingStrategy", + "LegacyEvalReportingStrategy", + "CodedEvalReportingStrategy", + "gracefully_handle_errors", +] diff --git a/src/uipath/_cli/_evals/_reporting/_coded_strategy.py b/src/uipath/_cli/_evals/_reporting/_coded_strategy.py new file mode 100644 index 000000000..d8613fe39 --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_coded_strategy.py @@ -0,0 +1,157 @@ +"""Coded evaluation reporting strategy. + +This module implements the strategy for coded evaluation reporting, +which uses evaluatorRuns format and keeps string IDs unchanged. +""" + +from typing import Any, Callable + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot +from uipath.eval.evaluators import BaseEvaluator + + +class CodedEvalReportingStrategy: + """Strategy for coded evaluation reporting. + + Coded evaluations: + - Keep string IDs unchanged + - Use endpoints with /coded/ prefix + - Use evaluatorRuns format with nested result + - Put evaluationCriterias in evalSnapshot + """ + + @property + def endpoint_suffix(self) -> str: + """Return 'coded/' for coded endpoints.""" + return "coded/" + + def convert_id(self, id_value: str) -> str: + """Keep string ID unchanged for coded API.""" + return id_value + + def create_eval_set_run_payload( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + project_id: str, + ) -> dict[str, Any]: + """Create payload for creating a coded eval set run.""" + return { + "agentId": project_id, + "evalSetId": eval_set_id, + "agentSnapshot": agent_snapshot.model_dump(by_alias=True), + "status": EvaluationStatus.IN_PROGRESS.value, + "numberOfEvalsExecuted": no_of_evals, + "source": 0, # EvalRunSource.Manual + } + + def create_eval_run_payload( + self, + eval_item: EvaluationItem, + eval_set_run_id: str, + ) -> dict[str, Any]: + """Create payload for creating a coded eval run.""" + return { + "evalSetRunId": eval_set_run_id, + "evalSnapshot": { + "id": eval_item.id, + "name": eval_item.name, + "inputs": eval_item.inputs, + "evaluationCriterias": eval_item.evaluation_criterias, + }, + "status": EvaluationStatus.IN_PROGRESS.value, + } + + def create_update_eval_run_payload( + self, + eval_run_id: str, + evaluator_runs: list[dict[str, Any]], + evaluator_scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Create payload for updating a coded eval run.""" + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "scores": evaluator_scores, # Note: "scores" not "evaluatorScores" + }, + "completionMetrics": {"duration": int(execution_time)}, + "evaluatorRuns": evaluator_runs, # Note: "evaluatorRuns" not "assertionRuns" + } + + def create_update_eval_set_run_payload( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + success: bool, + ) -> dict[str, Any]: + """Create payload for updating a coded eval set run.""" + scores_list = [ + {"value": avg_score, "evaluatorId": eval_id} + for eval_id, avg_score in evaluator_scores.items() + ] + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + return { + "evalSetRunId": eval_set_run_id, + "status": status.value, + "evaluatorScores": scores_list, + } + + def collect_results( + self, + eval_results: list[Any], + evaluators: dict[str, BaseEvaluator[Any, Any, Any]], + usage_metrics: dict[str, int | float | None], + serialize_justification_fn: Callable[[Any], str | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results in coded evaluatorRuns format.""" + evaluator_runs: list[dict[str, Any]] = [] + evaluator_scores_list: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + justification = serialize_justification_fn(eval_result.result.details) + + evaluator_scores_list.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": eval_result.evaluator_id, + } + ) + + evaluator_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": eval_result.evaluator_id, + "result": { + "score": { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + }, + "justification": justification, + }, + "completionMetrics": { + "duration": int(eval_result.result.evaluation_time or 0), + "cost": usage_metrics["cost"], + "tokens": usage_metrics["tokens"] or 0, + "completionTokens": usage_metrics["completionTokens"] or 0, + "promptTokens": usage_metrics["promptTokens"] or 0, + }, + } + ) + + return evaluator_runs, evaluator_scores_list diff --git a/src/uipath/_cli/_evals/_reporting/_legacy_strategy.py b/src/uipath/_cli/_evals/_reporting/_legacy_strategy.py new file mode 100644 index 000000000..c427f897a --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_legacy_strategy.py @@ -0,0 +1,177 @@ +"""Legacy evaluation reporting strategy. + +This module implements the strategy for legacy evaluation reporting, +which uses assertionRuns format and converts string IDs to GUIDs. +""" + +import uuid +from typing import Any, Callable + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, + EvaluationStatus, +) +from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot +from uipath.eval.evaluators import LegacyBaseEvaluator + + +class LegacyEvalReportingStrategy: + """Strategy for legacy evaluation reporting. + + Legacy evaluations: + - Convert string IDs to deterministic GUIDs using uuid5 + - Use endpoints without /coded/ prefix + - Use assertionRuns format with assertionSnapshot + - Put expectedOutput directly in evalSnapshot + """ + + @property + def endpoint_suffix(self) -> str: + """Return empty string for legacy endpoints (no /coded/ prefix).""" + return "" + + def convert_id(self, id_value: str) -> str: + """Convert string ID to deterministic GUID for legacy API. + + Args: + id_value: The original string ID + + Returns: + The ID as a GUID (either original if valid, or deterministic uuid5) + """ + try: + uuid.UUID(id_value) + return id_value + except ValueError: + return str(uuid.uuid5(uuid.NAMESPACE_DNS, id_value)) + + def create_eval_set_run_payload( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + project_id: str, + ) -> dict[str, Any]: + """Create payload for creating a legacy eval set run.""" + return { + "agentId": project_id, + "evalSetId": self.convert_id(eval_set_id), + "agentSnapshot": agent_snapshot.model_dump(by_alias=True), + "status": EvaluationStatus.IN_PROGRESS.value, + "numberOfEvalsExecuted": no_of_evals, + "source": 0, # EvalRunSource.Manual + } + + def create_eval_run_payload( + self, + eval_item: EvaluationItem, + eval_set_run_id: str, + ) -> dict[str, Any]: + """Create payload for creating a legacy eval run.""" + eval_item_id = self.convert_id(eval_item.id) + + # Extract expectedOutput from evaluation_criterias + expected_output = {} + if eval_item.evaluation_criterias: + first_criteria = next(iter(eval_item.evaluation_criterias.values()), None) + if first_criteria and isinstance(first_criteria, dict): + expected_output = first_criteria.get("expectedOutput", {}) + + return { + "evalSetRunId": eval_set_run_id, + "evalSnapshot": { + "id": eval_item_id, + "name": eval_item.name, + "inputs": eval_item.inputs, + "expectedOutput": expected_output, + }, + "status": EvaluationStatus.IN_PROGRESS.value, + } + + def create_update_eval_run_payload( + self, + eval_run_id: str, + evaluator_runs: list[dict[str, Any]], + evaluator_scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Create payload for updating a legacy eval run.""" + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + return { + "evalRunId": eval_run_id, + "status": status.value, + "result": { + "output": dict(actual_output), + "evaluatorScores": evaluator_scores, + }, + "completionMetrics": {"duration": int(execution_time)}, + "assertionRuns": evaluator_runs, + } + + def create_update_eval_set_run_payload( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + success: bool, + ) -> dict[str, Any]: + """Create payload for updating a legacy eval set run.""" + scores_list = [ + {"value": avg_score, "evaluatorId": self.convert_id(eval_id)} + for eval_id, avg_score in evaluator_scores.items() + ] + status = EvaluationStatus.COMPLETED if success else EvaluationStatus.FAILED + return { + "evalSetRunId": eval_set_run_id, + "status": status.value, + "evaluatorScores": scores_list, + } + + def collect_results( + self, + eval_results: list[Any], + evaluators: dict[str, LegacyBaseEvaluator[Any]], + usage_metrics: dict[str, int | float | None], + serialize_justification_fn: Callable[[Any], str | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results in legacy assertionRuns format.""" + assertion_runs: list[dict[str, Any]] = [] + evaluator_scores_list: list[dict[str, Any]] = [] + + for eval_result in eval_results: + if eval_result.evaluator_id not in evaluators: + continue + + evaluator_id_value = self.convert_id(eval_result.evaluator_id) + evaluator = evaluators[eval_result.evaluator_id] + justification = serialize_justification_fn(eval_result.result.details) + + evaluator_scores_list.append( + { + "type": eval_result.result.score_type.value, + "value": eval_result.result.score, + "justification": justification, + "evaluatorId": evaluator_id_value, + } + ) + + assertion_runs.append( + { + "status": EvaluationStatus.COMPLETED.value, + "evaluatorId": evaluator_id_value, + "completionMetrics": { + "duration": int(eval_result.result.evaluation_time or 0), + "cost": usage_metrics["cost"], + "tokens": usage_metrics["tokens"] or 0, + "completionTokens": usage_metrics["completionTokens"] or 0, + "promptTokens": usage_metrics["promptTokens"] or 0, + }, + "assertionSnapshot": { + "assertionType": evaluator.evaluator_type.name, + "outputKey": evaluator.target_output_key, + }, + } + ) + + return assertion_runs, evaluator_scores_list diff --git a/src/uipath/_cli/_evals/_reporting/_reporter.py b/src/uipath/_cli/_evals/_reporting/_reporter.py new file mode 100644 index 000000000..6ebfe38a0 --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_reporter.py @@ -0,0 +1,952 @@ +"""StudioWeb Progress Reporter for evaluation runs. + +This module provides the main reporter class for sending evaluation +progress updates to StudioWeb, including creating and updating +eval set runs and individual eval runs. +""" + +import json +import logging +import os +import uuid +from datetime import datetime, timezone +from typing import Any +from urllib.parse import urlparse + +from opentelemetry import trace +from opentelemetry.trace import SpanContext, SpanKind, TraceFlags +from pydantic import BaseModel +from rich.console import Console + +from uipath._cli._evals._models._evaluation_set import ( + EvaluationItem, +) +from uipath._cli._evals._models._evaluator import Evaluator +from uipath._cli._evals._models._sw_reporting import ( + StudioWebAgentSnapshot, + StudioWebProgressItem, +) +from uipath._cli._evals._reporting._strategies import ( + CodedEvalReportingStrategy, + EvalReportingStrategy, + LegacyEvalReportingStrategy, +) +from uipath._cli._evals._reporting._utils import gracefully_handle_errors +from uipath._cli._utils._console import ConsoleLogger +from uipath._events._event_bus import EventBus +from uipath._events._events import ( + EvalRunCreatedEvent, + EvalRunUpdatedEvent, + EvalSetRunCreatedEvent, + EvalSetRunUpdatedEvent, + EvaluationEvents, +) +from uipath._utils import Endpoint, RequestSpec +from uipath._utils.constants import ( + ENV_EVAL_BACKEND_URL, + ENV_TENANT_ID, + HEADER_INTERNAL_TENANT_ID, +) +from uipath.eval.evaluators import ( + BaseEvaluator, + LegacyBaseEvaluator, +) +from uipath.eval.models import EvalItemResult, ScoreType +from uipath.platform import UiPath +from uipath.platform.common import UiPathConfig +from uipath.tracing import LlmOpsHttpExporter + +logger = logging.getLogger(__name__) + + +class StudioWebProgressReporter: + """Handles reporting evaluation progress to StudioWeb. + + Uses the Strategy Pattern to delegate legacy vs coded evaluation + formatting to appropriate strategy classes. + """ + + def __init__(self, spans_exporter: LlmOpsHttpExporter): + self.spans_exporter = spans_exporter + + logging.getLogger("uipath._cli.middlewares").setLevel(logging.CRITICAL) + console_logger = ConsoleLogger.get_instance() + + # Use UIPATH_EVAL_BACKEND_URL for eval-specific routing if set + eval_backend_url = os.getenv(ENV_EVAL_BACKEND_URL) + uipath = UiPath(base_url=eval_backend_url) if eval_backend_url else UiPath() + + self._client = uipath.api_client + self._console = console_logger + self._rich_console = Console() + self._project_id = os.getenv("UIPATH_PROJECT_ID", None) + if not self._project_id: + logger.warning( + "Cannot report data to StudioWeb. Please set UIPATH_PROJECT_ID." + ) + + # Strategy instances + self._legacy_strategy = LegacyEvalReportingStrategy() + self._coded_strategy = CodedEvalReportingStrategy() + + # State tracking + self.eval_set_run_ids: dict[str, str] = {} + self.evaluators: dict[str, Any] = {} + self.evaluator_scores: dict[str, list[float]] = {} + self.eval_run_ids: dict[str, str] = {} + self.is_coded_eval: dict[str, bool] = {} + self.eval_spans: dict[str, list[Any]] = {} + self.eval_set_execution_id: str | None = None + + # ------------------------------------------------------------------------- + # Strategy Selection + # ------------------------------------------------------------------------- + + def _get_strategy(self, is_coded: bool) -> EvalReportingStrategy: + """Get the appropriate strategy for the evaluation type.""" + return self._coded_strategy if is_coded else self._legacy_strategy + + # ------------------------------------------------------------------------- + # Utility Methods + # ------------------------------------------------------------------------- + + def _format_error_message(self, error: Exception, context: str) -> None: + """Helper method to format and display error messages consistently.""" + self._rich_console.print(f" • \u26a0 [dim]{context}: {error}[/dim]") + + def _is_localhost(self) -> bool: + """Check if the eval backend URL is localhost.""" + eval_backend_url = os.getenv(ENV_EVAL_BACKEND_URL, "") + if eval_backend_url: + try: + parsed = urlparse(eval_backend_url) + hostname = parsed.hostname or parsed.netloc.split(":")[0] + return hostname.lower() in ("localhost", "127.0.0.1") + except Exception: + pass + return False + + def _get_endpoint_prefix(self) -> str: + """Determine the endpoint prefix based on environment.""" + if self._is_localhost(): + return "api/" + return "agentsruntime_/api/" + + def _is_coded_evaluator( + self, evaluators: list[BaseEvaluator[Any, Any, Any]] + ) -> bool: + """Check if evaluators are coded (BaseEvaluator) vs legacy (LegacyBaseEvaluator).""" + if not evaluators: + return False + return not isinstance(evaluators[0], LegacyBaseEvaluator) + + def _serialize_justification( + self, justification: BaseModel | str | None + ) -> str | None: + """Serialize justification to JSON string for API compatibility.""" + if isinstance(justification, BaseModel): + justification = json.dumps(justification.model_dump()) + return justification + + def _tenant_header(self) -> dict[str, str | None]: + """Build tenant header for API requests.""" + tenant_id = os.getenv(ENV_TENANT_ID, None) + if not tenant_id: + self._console.error( + f"{ENV_TENANT_ID} env var is not set. Please run 'uipath auth'." + ) + return {HEADER_INTERNAL_TENANT_ID: tenant_id} + + def _extract_usage_from_spans( + self, spans: list[Any] + ) -> dict[str, int | float | None]: + """Extract token usage and cost from OpenTelemetry spans.""" + total_tokens = 0 + completion_tokens = 0 + prompt_tokens = 0 + total_cost = 0.0 + + for span in spans: + try: + attrs = None + if hasattr(span, "attributes") and span.attributes: + if isinstance(span.attributes, dict): + attrs = span.attributes + elif isinstance(span.attributes, str): + attrs = json.loads(span.attributes) + + if not attrs and hasattr(span, "Attributes") and span.Attributes: + if isinstance(span.Attributes, str): + attrs = json.loads(span.Attributes) + elif isinstance(span.Attributes, dict): + attrs = span.Attributes + + if attrs: + if "usage" in attrs and isinstance(attrs["usage"], dict): + usage = attrs["usage"] + prompt_tokens += usage.get("promptTokens", 0) + completion_tokens += usage.get("completionTokens", 0) + total_tokens += usage.get("totalTokens", 0) + total_cost += usage.get("cost", 0.0) + + prompt_tokens += attrs.get("gen_ai.usage.prompt_tokens", 0) + completion_tokens += attrs.get("gen_ai.usage.completion_tokens", 0) + total_tokens += attrs.get("gen_ai.usage.total_tokens", 0) + total_cost += attrs.get("gen_ai.usage.cost", 0.0) + total_cost += attrs.get("llm.usage.cost", 0.0) + + except (json.JSONDecodeError, AttributeError, TypeError) as e: + logger.debug(f"Failed to parse span attributes: {e}") + continue + + return { + "tokens": total_tokens if total_tokens > 0 else None, + "completionTokens": completion_tokens if completion_tokens > 0 else None, + "promptTokens": prompt_tokens if prompt_tokens > 0 else None, + "cost": total_cost if total_cost > 0 else None, + } + + def _extract_agent_snapshot(self, entrypoint: str | None) -> StudioWebAgentSnapshot: + """Extract agent snapshot from entry points configuration or low-code agent file. + + For coded agents, reads from entry-points.json configuration file. + For low-code agents (*.json files like agent.json), reads inputSchema + and outputSchema directly from the agent file. + + Args: + entrypoint: The entrypoint file path to look up + + Returns: + StudioWebAgentSnapshot with input and output schemas + """ + if not entrypoint: + logger.warning( + "Entrypoint not provided - falling back to empty inputSchema " + "and outputSchema" + ) + return StudioWebAgentSnapshot(input_schema={}, output_schema={}) + + try: + # Check if entrypoint is a low-code agent JSON file (e.g., agent.json) + if entrypoint.endswith(".json"): + agent_file_path = os.path.join(os.getcwd(), entrypoint) + if os.path.exists(agent_file_path): + with open(agent_file_path, "r") as f: + agent_data = json.load(f) + + # Low-code agent files have inputSchema and outputSchema at root + input_schema = agent_data.get("inputSchema", {}) + output_schema = agent_data.get("outputSchema", {}) + + logger.debug( + f"Extracted agent snapshot from low-code agent '{entrypoint}': " + f"inputSchema={json.dumps(input_schema)}, " + f"outputSchema={json.dumps(output_schema)}" + ) + + return StudioWebAgentSnapshot( + input_schema=input_schema, output_schema=output_schema + ) + + # Fall back to entry-points.json for coded agents + entry_points_file_path = os.path.join( + os.getcwd(), str(UiPathConfig.entry_points_file_path) + ) + if not os.path.exists(entry_points_file_path): + logger.debug( + f"Entry points file not found at {entry_points_file_path}, " + "using empty schemas" + ) + return StudioWebAgentSnapshot(input_schema={}, output_schema={}) + + with open(entry_points_file_path, "r") as f: + entry_points = json.load(f).get("entryPoints", []) + + ep = None + for entry_point in entry_points: + if entry_point.get("filePath") == entrypoint: + ep = entry_point + break + + if not ep: + logger.warning( + f"Entrypoint {entrypoint} not found in configuration file" + ) + return StudioWebAgentSnapshot(input_schema={}, output_schema={}) + + input_schema = ep.get("input", {}) + output_schema = ep.get("output", {}) + + logger.debug( + f"Extracted agent snapshot for entrypoint '{entrypoint}': " + f"inputSchema={json.dumps(input_schema)}, " + f"outputSchema={json.dumps(output_schema)}" + ) + + return StudioWebAgentSnapshot( + input_schema=input_schema, output_schema=output_schema + ) + except Exception as e: + logger.warning(f"Failed to extract agent snapshot: {e}") + return StudioWebAgentSnapshot(input_schema={}, output_schema={}) + + # ------------------------------------------------------------------------- + # Request Spec Generation (delegating to strategies) + # ------------------------------------------------------------------------- + + def _create_eval_set_run_spec( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + is_coded: bool = False, + ) -> RequestSpec: + """Create request spec for creating an eval set run.""" + assert self._project_id is not None, "project_id is required for SW reporting" + strategy = self._get_strategy(is_coded) + payload = strategy.create_eval_set_run_payload( + eval_set_id, agent_snapshot, no_of_evals, self._project_id + ) + + # Log the payload for debugging eval set run reporting + agent_type = "coded" if is_coded else "low-code" + logger.info( + f"Creating eval set run (type={agent_type}): " + f"evalSetId={eval_set_id}, " + f"inputSchema={json.dumps(payload.get('agentSnapshot', {}).get('inputSchema', {}))}, " + f"outputSchema={json.dumps(payload.get('agentSnapshot', {}).get('outputSchema', {}))}" + ) + logger.debug(f"Full eval set run payload: {json.dumps(payload, indent=2)}") + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/" + f"{strategy.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header(), + ) + + def _create_eval_run_spec( + self, eval_item: EvaluationItem, eval_set_run_id: str, is_coded: bool = False + ) -> RequestSpec: + """Create request spec for creating an eval run.""" + strategy = self._get_strategy(is_coded) + payload = strategy.create_eval_run_payload(eval_item, eval_set_run_id) + + # Log the payload for debugging eval run reporting + agent_type = "coded" if is_coded else "low-code" + logger.debug( + f"Creating eval run (type={agent_type}): " + f"evalSetRunId={eval_set_run_id}, evalItemId={eval_item.id}" + ) + logger.debug(f"Full eval run payload: {json.dumps(payload, indent=2)}") + + return RequestSpec( + method="POST", + endpoint=Endpoint( + f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/" + f"{strategy.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header(), + ) + + def _update_eval_run_spec( + self, + evaluator_runs: list[dict[str, Any]], + evaluator_scores: list[dict[str, Any]], + eval_run_id: str, + actual_output: dict[str, Any], + execution_time: float, + success: bool, + is_coded: bool = False, + ) -> RequestSpec: + """Create request spec for updating an eval run.""" + strategy = self._get_strategy(is_coded) + payload = strategy.create_update_eval_run_payload( + eval_run_id, + evaluator_runs, + evaluator_scores, + actual_output, + execution_time, + success, + ) + + # Log the payload for debugging eval run updates + agent_type = "coded" if is_coded else "low-code" + logger.debug( + f"Updating eval run (type={agent_type}): " + f"evalRunId={eval_run_id}, success={success}" + ) + logger.debug(f"Full eval run update payload: {json.dumps(payload, indent=2)}") + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/" + f"{strategy.endpoint_suffix}evalRun" + ), + json=payload, + headers=self._tenant_header(), + ) + + def _update_eval_set_run_spec( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + is_coded: bool = False, + success: bool = True, + ) -> RequestSpec: + """Create request spec for updating an eval set run.""" + strategy = self._get_strategy(is_coded) + payload = strategy.create_update_eval_set_run_payload( + eval_set_run_id, evaluator_scores, success + ) + + # Log the payload for debugging eval set run updates + agent_type = "coded" if is_coded else "low-code" + logger.info( + f"Updating eval set run (type={agent_type}): " + f"evalSetRunId={eval_set_run_id}, success={success}, " + f"evaluatorScores={json.dumps(payload.get('evaluatorScores', []))}" + ) + logger.debug( + f"Full eval set run update payload: {json.dumps(payload, indent=2)}" + ) + + return RequestSpec( + method="PUT", + endpoint=Endpoint( + f"{self._get_endpoint_prefix()}execution/agents/{self._project_id}/" + f"{strategy.endpoint_suffix}evalSetRun" + ), + json=payload, + headers=self._tenant_header(), + ) + + # ------------------------------------------------------------------------- + # API Methods + # ------------------------------------------------------------------------- + + @gracefully_handle_errors + async def create_eval_set_run_sw( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + evaluators: list[LegacyBaseEvaluator[Any]], + is_coded: bool = False, + ) -> str: + """Create a new evaluation set run in StudioWeb.""" + spec = self._create_eval_set_run_spec( + eval_set_id, agent_snapshot, no_of_evals, is_coded + ) + response = await self._client.request_async( + method=spec.method, + url=spec.endpoint, + params=spec.params, + json=spec.json, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + eval_set_run_id = json.loads(response.content)["id"] + return eval_set_run_id + + @gracefully_handle_errors + async def create_eval_run( + self, eval_item: EvaluationItem, eval_set_run_id: str, is_coded: bool = False + ) -> str: + """Create a new evaluation run in StudioWeb.""" + spec = self._create_eval_run_spec(eval_item, eval_set_run_id, is_coded) + response = await self._client.request_async( + method=spec.method, + url=spec.endpoint, + params=spec.params, + json=spec.json, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + return json.loads(response.content)["id"] + + @gracefully_handle_errors + async def update_eval_run( + self, + sw_progress_item: StudioWebProgressItem, + evaluators: dict[str, Evaluator], + is_coded: bool = False, + spans: list[Any] | None = None, + ): + """Update an evaluation run with results.""" + # Separate evaluators by type + coded_evaluators: dict[str, BaseEvaluator[Any, Any, Any]] = {} + legacy_evaluators: dict[str, LegacyBaseEvaluator[Any]] = {} + + for k, v in evaluators.items(): + if isinstance(v, LegacyBaseEvaluator): + legacy_evaluators[k] = v + elif isinstance(v, BaseEvaluator): + coded_evaluators[k] = v + + usage_metrics = self._extract_usage_from_spans(spans or []) + + evaluator_runs: list[dict[str, Any]] = [] + evaluator_scores: list[dict[str, Any]] = [] + + # Use strategies for result collection + if coded_evaluators: + runs, scores = self._coded_strategy.collect_results( + sw_progress_item.eval_results, + coded_evaluators, + usage_metrics, + self._serialize_justification, + ) + evaluator_runs.extend(runs) + evaluator_scores.extend(scores) + + if legacy_evaluators: + runs, scores = self._legacy_strategy.collect_results( + sw_progress_item.eval_results, + legacy_evaluators, + usage_metrics, + self._serialize_justification, + ) + evaluator_runs.extend(runs) + evaluator_scores.extend(scores) + + # Use strategy for spec generation + spec = self._update_eval_run_spec( + evaluator_runs=evaluator_runs, + evaluator_scores=evaluator_scores, + eval_run_id=sw_progress_item.eval_run_id, + actual_output=sw_progress_item.agent_output, + execution_time=sw_progress_item.agent_execution_time, + success=sw_progress_item.success, + is_coded=is_coded, + ) + + await self._client.request_async( + method=spec.method, + url=spec.endpoint, + params=spec.params, + json=spec.json, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + + @gracefully_handle_errors + async def update_eval_set_run( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + is_coded: bool = False, + success: bool = True, + ): + """Update the evaluation set run status to complete.""" + spec = self._update_eval_set_run_spec( + eval_set_run_id, evaluator_scores, is_coded, success + ) + await self._client.request_async( + method=spec.method, + url=spec.endpoint, + params=spec.params, + json=spec.json, + headers=spec.headers, + scoped="org" if self._is_localhost() else "tenant", + ) + + # ------------------------------------------------------------------------- + # Event Handlers + # ------------------------------------------------------------------------- + + async def handle_create_eval_set_run(self, payload: EvalSetRunCreatedEvent) -> None: + try: + self.evaluators = {eval.id: eval for eval in payload.evaluators} + self.evaluator_scores = {eval.id: [] for eval in payload.evaluators} + self.eval_set_execution_id = payload.execution_id + + is_coded = self._is_coded_evaluator(payload.evaluators) + self.is_coded_eval[payload.execution_id] = is_coded + + eval_set_run_id = payload.eval_set_run_id + if not eval_set_run_id: + eval_set_run_id = await self.create_eval_set_run_sw( + eval_set_id=payload.eval_set_id, + agent_snapshot=self._extract_agent_snapshot(payload.entrypoint), + no_of_evals=payload.no_of_evals, + evaluators=payload.evaluators, + is_coded=is_coded, + ) + self.eval_set_run_ids[payload.execution_id] = eval_set_run_id + current_span = trace.get_current_span() + if current_span.is_recording(): + current_span.set_attribute("eval_set_run_id", eval_set_run_id) + + if eval_set_run_id: + await self._send_parent_trace(eval_set_run_id, payload.eval_set_id) + + logger.debug( + f"Created eval set run with ID: {eval_set_run_id} (coded={is_coded})" + ) + + except Exception as e: + self._format_error_message(e, "StudioWeb create eval set run error") + + async def handle_create_eval_run(self, payload: EvalRunCreatedEvent) -> None: + try: + if self.eval_set_execution_id and ( + eval_set_run_id := self.eval_set_run_ids.get(self.eval_set_execution_id) + ): + is_coded = self.is_coded_eval.get(self.eval_set_execution_id, False) + eval_run_id = await self.create_eval_run( + payload.eval_item, eval_set_run_id, is_coded + ) + if eval_run_id: + self.eval_run_ids[payload.execution_id] = eval_run_id + logger.debug( + f"Created eval run with ID: {eval_run_id} (coded={is_coded})" + ) + else: + logger.warning("Cannot create eval run: eval_set_run_id not available") + + except Exception as e: + self._format_error_message(e, "StudioWeb create eval run error") + + async def handle_update_eval_run(self, payload: EvalRunUpdatedEvent) -> None: + try: + eval_run_id = self.eval_run_ids.get(payload.execution_id) + + if eval_run_id: + self.spans_exporter.trace_id = eval_run_id + else: + if self.eval_set_execution_id: + self.spans_exporter.trace_id = self.eval_set_run_ids.get( + self.eval_set_execution_id + ) + + self.spans_exporter.export(payload.spans) + + for eval_result in payload.eval_results: + evaluator_id = eval_result.evaluator_id + if evaluator_id in self.evaluator_scores: + match eval_result.result.score_type: + case ScoreType.NUMERICAL: + self.evaluator_scores[evaluator_id].append( + eval_result.result.score + ) + case ScoreType.BOOLEAN: + self.evaluator_scores[evaluator_id].append( + 100 if eval_result.result.score else 0 + ) + case ScoreType.ERROR: + self.evaluator_scores[evaluator_id].append(0) + + if eval_run_id and self.eval_set_execution_id: + is_coded = self.is_coded_eval.get(self.eval_set_execution_id, False) + self._extract_usage_from_spans(payload.spans) + + await self._send_evaluator_traces( + eval_run_id, payload.eval_results, payload.spans + ) + + await self.update_eval_run( + StudioWebProgressItem( + eval_run_id=eval_run_id, + eval_results=payload.eval_results, + success=payload.success, + agent_output=payload.agent_output, + agent_execution_time=payload.agent_execution_time, + ), + self.evaluators, + is_coded=is_coded, + spans=payload.spans, + ) + + logger.debug( + f"Updated eval run with ID: {eval_run_id} (coded={is_coded})" + ) + + except Exception as e: + self._format_error_message(e, "StudioWeb reporting error") + + async def handle_update_eval_set_run(self, payload: EvalSetRunUpdatedEvent) -> None: + try: + if eval_set_run_id := self.eval_set_run_ids.get(payload.execution_id): + is_coded = self.is_coded_eval.get(payload.execution_id, False) + await self.update_eval_set_run( + eval_set_run_id, + payload.evaluator_scores, + is_coded=is_coded, + success=payload.success, + ) + status_str = "completed" if payload.success else "failed" + logger.debug( + f"Updated eval set run with ID: {eval_set_run_id} " + f"(coded={is_coded}, status={status_str})" + ) + else: + logger.warning( + "Cannot update eval set run: eval_set_run_id not available" + ) + + except Exception as e: + self._format_error_message(e, "StudioWeb update eval set run error") + + async def subscribe_to_eval_runtime_events(self, event_bus: EventBus) -> None: + event_bus.subscribe( + EvaluationEvents.CREATE_EVAL_SET_RUN, self.handle_create_eval_set_run + ) + event_bus.subscribe( + EvaluationEvents.CREATE_EVAL_RUN, self.handle_create_eval_run + ) + event_bus.subscribe( + EvaluationEvents.UPDATE_EVAL_RUN, self.handle_update_eval_run + ) + event_bus.subscribe( + EvaluationEvents.UPDATE_EVAL_SET_RUN, self.handle_update_eval_set_run + ) + logger.debug("StudioWeb progress reporter subscribed to evaluation events") + + # ------------------------------------------------------------------------- + # Tracing Methods + # ------------------------------------------------------------------------- + + async def _send_parent_trace( + self, eval_set_run_id: str, eval_set_name: str + ) -> None: + """Send the parent trace span for the evaluation set run.""" + try: + tracer = trace.get_tracer(__name__) + trace_id_int = int(uuid.UUID(eval_set_run_id)) + + span_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(span_context)) + + with tracer.start_as_current_span( + eval_set_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluationSet") + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug(f"Created parent trace for eval set run: {eval_set_run_id}") + + except Exception as e: + logger.warning(f"Failed to create parent trace: {e}") + + async def _send_eval_run_trace( + self, eval_run_id: str, eval_set_run_id: str, eval_name: str + ) -> None: + """Send the child trace span for an evaluation run.""" + try: + tracer = trace.get_tracer(__name__) + trace_id_int = int(uuid.UUID(eval_run_id)) + parent_span_id_int = int(uuid.UUID(eval_set_run_id)) + + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=parent_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + + ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + with tracer.start_as_current_span( + eval_name, + context=ctx, + kind=SpanKind.INTERNAL, + start_time=int(datetime.now(timezone.utc).timestamp() * 1_000_000_000), + ) as span: + span.set_attribute("openinference.span.kind", "CHAIN") + span.set_attribute("span.type", "evaluation") + span.set_attribute("eval_run_id", eval_run_id) + span.set_attribute("eval_set_run_id", eval_set_run_id) + + logger.debug( + f"Created trace for eval run: {eval_run_id} (parent: {eval_set_run_id})" + ) + + except Exception as e: + logger.warning(f"Failed to create eval run trace: {e}") + + async def _send_evaluator_traces( + self, eval_run_id: str, eval_results: list[EvalItemResult], spans: list[Any] + ) -> None: + """Send trace spans for all evaluators.""" + try: + if not eval_results: + logger.debug( + f"No evaluator results to trace for eval run: {eval_run_id}" + ) + return + + agent_readable_spans = [] + if spans: + for span in spans: + if hasattr(span, "_readable_span"): + agent_readable_spans.append(span._readable_span()) + + if agent_readable_spans: + self.spans_exporter.export(agent_readable_spans) + logger.debug( + f"Exported {len(agent_readable_spans)} agent execution spans " + f"for eval run: {eval_run_id}" + ) + + tracer = trace.get_tracer(__name__) + now = datetime.now(timezone.utc) + + total_eval_time = ( + sum( + r.result.evaluation_time + for r in eval_results + if r.result.evaluation_time + ) + or 0.0 + ) + + parent_end_time = now + parent_start_time = ( + datetime.fromtimestamp( + now.timestamp() - total_eval_time, tz=timezone.utc + ) + if total_eval_time > 0 + else now + ) + + root_span_uuid = None + if spans: + from uipath.tracing._utils import _SpanUtils + + for span in spans: + if span.parent is None: + span_context = span.get_span_context() + root_span_uuid = _SpanUtils.span_id_to_uuid4( + span_context.span_id + ) + break + + trace_id_int = int(uuid.UUID(eval_run_id)) + + if root_span_uuid: + root_span_id_int = int(root_span_uuid) + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=root_span_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + else: + parent_context = SpanContext( + trace_id=trace_id_int, + span_id=trace_id_int, + is_remote=False, + trace_flags=TraceFlags(0x01), + ) + ctx = trace.set_span_in_context(trace.NonRecordingSpan(parent_context)) + + parent_start_ns = int(parent_start_time.timestamp() * 1_000_000_000) + parent_end_ns = int(parent_end_time.timestamp() * 1_000_000_000) + + parent_span = tracer.start_span( + "Evaluators", + context=ctx, + kind=SpanKind.INTERNAL, + start_time=parent_start_ns, + ) + + parent_span.set_attribute("openinference.span.kind", "CHAIN") + parent_span.set_attribute("span.type", "evaluators") + parent_span.set_attribute("eval_run_id", eval_run_id) + + parent_ctx = trace.set_span_in_context(parent_span, ctx) + current_time = parent_start_time + readable_spans = [] + + for eval_result in eval_results: + evaluator = self.evaluators.get(eval_result.evaluator_id) + evaluator_name = evaluator.id if evaluator else eval_result.evaluator_id + + eval_time = eval_result.result.evaluation_time or 0 + eval_start = current_time + eval_end = datetime.fromtimestamp( + current_time.timestamp() + eval_time, tz=timezone.utc + ) + current_time = eval_end + + eval_start_ns = int(eval_start.timestamp() * 1_000_000_000) + eval_end_ns = int(eval_end.timestamp() * 1_000_000_000) + + evaluator_span = tracer.start_span( + evaluator_name, + context=parent_ctx, + kind=SpanKind.INTERNAL, + start_time=eval_start_ns, + ) + + evaluator_span.set_attribute("openinference.span.kind", "EVALUATOR") + evaluator_span.set_attribute("span.type", "evaluator") + evaluator_span.set_attribute("evaluator_id", eval_result.evaluator_id) + evaluator_span.set_attribute("evaluator_name", evaluator_name) + evaluator_span.set_attribute("eval_run_id", eval_run_id) + evaluator_span.set_attribute("score", eval_result.result.score) + evaluator_span.set_attribute( + "score_type", eval_result.result.score_type.name + ) + + if eval_result.result.details: + if isinstance(eval_result.result.details, BaseModel): + evaluator_span.set_attribute( + "details", + json.dumps(eval_result.result.details.model_dump()), + ) + else: + evaluator_span.set_attribute( + "details", str(eval_result.result.details) + ) + + if eval_result.result.evaluation_time: + evaluator_span.set_attribute( + "evaluation_time", eval_result.result.evaluation_time + ) + + from opentelemetry.trace import Status, StatusCode + + if eval_result.result.score_type == ScoreType.ERROR: + evaluator_span.set_status( + Status(StatusCode.ERROR, "Evaluation failed") + ) + else: + evaluator_span.set_status(Status(StatusCode.OK)) + + evaluator_span.end(end_time=eval_end_ns) + + if hasattr(evaluator_span, "_readable_span"): + readable_spans.append(evaluator_span._readable_span()) + + parent_span.end(end_time=parent_end_ns) + + if hasattr(parent_span, "_readable_span"): + readable_spans.insert(0, parent_span._readable_span()) + + if readable_spans: + self.spans_exporter.export(readable_spans) + + logger.debug( + f"Created evaluator traces for eval run: {eval_run_id} " + f"({len(eval_results)} evaluators)" + ) + except Exception as e: + logger.warning(f"Failed to create evaluator traces: {e}") diff --git a/src/uipath/_cli/_evals/_reporting/_strategies.py b/src/uipath/_cli/_evals/_reporting/_strategies.py new file mode 100644 index 000000000..7100eb698 --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_strategies.py @@ -0,0 +1,15 @@ +"""Evaluation reporting strategies for legacy and coded evaluations. + +This module re-exports strategy classes from their individual modules +for backward compatibility. +""" + +from uipath._cli._evals._reporting._coded_strategy import CodedEvalReportingStrategy +from uipath._cli._evals._reporting._legacy_strategy import LegacyEvalReportingStrategy +from uipath._cli._evals._reporting._strategy_protocol import EvalReportingStrategy + +__all__ = [ + "EvalReportingStrategy", + "LegacyEvalReportingStrategy", + "CodedEvalReportingStrategy", +] diff --git a/src/uipath/_cli/_evals/_reporting/_strategy_protocol.py b/src/uipath/_cli/_evals/_reporting/_strategy_protocol.py new file mode 100644 index 000000000..e817dcea6 --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_strategy_protocol.py @@ -0,0 +1,93 @@ +"""Protocol definition for evaluation reporting strategies. + +This module defines the Strategy Protocol for handling the differences between +legacy and coded evaluation API formats. +""" + +from typing import Any, Callable, Protocol, runtime_checkable + +from uipath._cli._evals._models._evaluation_set import EvaluationItem +from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot + + +@runtime_checkable +class EvalReportingStrategy(Protocol): + """Protocol for evaluation reporting strategies. + + Strategies handle the differences between legacy and coded evaluation + API formats, including ID conversion, endpoint routing, and payload structure. + """ + + @property + def endpoint_suffix(self) -> str: + """Return the endpoint suffix for this strategy. + + Returns: + "" for legacy, "coded/" for coded evaluations + """ + ... + + def convert_id(self, id_value: str) -> str: + """Convert an ID to the format expected by the backend. + + Args: + id_value: The original string ID + + Returns: + For legacy: deterministic GUID from uuid5 + For coded: original string ID unchanged + """ + ... + + def create_eval_set_run_payload( + self, + eval_set_id: str, + agent_snapshot: StudioWebAgentSnapshot, + no_of_evals: int, + project_id: str, + ) -> dict[str, Any]: + """Create the payload for creating an eval set run.""" + ... + + def create_eval_run_payload( + self, + eval_item: EvaluationItem, + eval_set_run_id: str, + ) -> dict[str, Any]: + """Create the payload for creating an eval run.""" + ... + + def create_update_eval_run_payload( + self, + eval_run_id: str, + evaluator_runs: list[dict[str, Any]], + evaluator_scores: list[dict[str, Any]], + actual_output: dict[str, Any], + execution_time: float, + success: bool, + ) -> dict[str, Any]: + """Create the payload for updating an eval run.""" + ... + + def create_update_eval_set_run_payload( + self, + eval_set_run_id: str, + evaluator_scores: dict[str, float], + success: bool, + ) -> dict[str, Any]: + """Create the payload for updating an eval set run.""" + ... + + def collect_results( + self, + eval_results: list[Any], + evaluators: dict[str, Any], + usage_metrics: dict[str, int | float | None], + serialize_justification_fn: Callable[[Any], str | None], + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Collect results from evaluations in strategy-specific format. + + Returns: + Tuple of (evaluator_runs, evaluator_scores) + """ + ... diff --git a/src/uipath/_cli/_evals/_reporting/_utils.py b/src/uipath/_cli/_evals/_reporting/_utils.py new file mode 100644 index 000000000..eb2d39a3b --- /dev/null +++ b/src/uipath/_cli/_evals/_reporting/_utils.py @@ -0,0 +1,44 @@ +"""Utility functions for evaluation progress reporting. + +This module contains decorators and helper functions used by the +progress reporter and related components. +""" + +import functools +import logging + +logger = logging.getLogger(__name__) + + +def gracefully_handle_errors(func): + """Decorator to catch and log errors without stopping execution. + + This decorator wraps async functions and catches any exceptions, + logging them as warnings instead of allowing them to propagate. + This ensures that progress reporting failures don't break the + main evaluation flow. + + Args: + func: The async function to wrap + + Returns: + The wrapped function that catches and logs errors + """ + + @functools.wraps(func) + async def wrapper(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except Exception as e: + if hasattr(self, "_console"): + error_type = type(e).__name__ + logger.debug(f"Full error details: {e}") + logger.warning( + f"Cannot report progress to SW. " + f"Function: {func.__name__}, " + f"Error type: {error_type}, " + f"Details: {e}" + ) + return None + + return wrapper diff --git a/tests/cli/eval/reporting/__init__.py b/tests/cli/eval/reporting/__init__.py new file mode 100644 index 000000000..8f5346601 --- /dev/null +++ b/tests/cli/eval/reporting/__init__.py @@ -0,0 +1 @@ +"""Tests for the evaluation progress reporting module.""" diff --git a/tests/cli/eval/test_progress_reporter.py b/tests/cli/eval/reporting/test_reporter.py similarity index 73% rename from tests/cli/eval/test_progress_reporter.py rename to tests/cli/eval/reporting/test_reporter.py index 17cccf712..5b3a3d691 100644 --- a/tests/cli/eval/test_progress_reporter.py +++ b/tests/cli/eval/reporting/test_reporter.py @@ -15,11 +15,11 @@ import pytest from opentelemetry.sdk.trace import ReadableSpan -from uipath._cli._evals._progress_reporter import StudioWebProgressReporter +from uipath._cli._evals._reporting import StudioWebProgressReporter from uipath._events._events import EvalSetRunCreatedEvent from uipath.tracing import LlmOpsHttpExporter -# Test fixtures - simple mocks without full evaluator instantiation +# Test fixtures @pytest.fixture @@ -215,10 +215,6 @@ def test_extract_usage_from_spans_without_usage(self, progress_reporter): assert usage["cost"] is None -# Result collection tests removed - complex to test without real evaluator instances -# The core functionality is tested indirectly through the request spec generation tests - - # Tests for request spec generation class TestRequestSpecGeneration: """Tests for generating request specs for different evaluator types.""" @@ -261,10 +257,11 @@ def test_create_eval_set_run_spec_for_legacy(self, progress_reporter): assert spec.method == "POST" assert "coded/" not in spec.endpoint - # Both coded and legacy now send payload directly at root level + # Both legacy and coded APIs accept payload directly at root level (no wrapper) + assert "request" not in spec.json # Legacy should not have version field assert "version" not in spec.json - # Source field is now required by backend for all evaluations + # Source field is required for both legacy and coded assert spec.json["source"] == 0 assert spec.json["numberOfEvalsExecuted"] == 5 # Backend expects integer status @@ -281,7 +278,8 @@ def test_update_coded_eval_run_spec(self, progress_reporter): ] evaluator_scores = [{"evaluatorId": "test-1", "value": 0.9}] - spec = progress_reporter._update_coded_eval_run_spec( + # Now uses unified _update_eval_run_spec with is_coded=True + spec = progress_reporter._update_eval_run_spec( evaluator_runs=evaluator_runs, evaluator_scores=evaluator_scores, eval_run_id="test-run-id", @@ -301,13 +299,14 @@ def test_update_coded_eval_run_spec(self, progress_reporter): def test_update_legacy_eval_run_spec(self, progress_reporter): """Test updating eval run spec for legacy evaluators.""" - assertion_runs = [ + # Note: unified method uses evaluator_runs param, strategy outputs assertionRuns + evaluator_runs = [ {"evaluatorId": "test-1", "status": "completed", "assertionSnapshot": {}} ] evaluator_scores = [{"evaluatorId": "test-1", "value": 0.9}] spec = progress_reporter._update_eval_run_spec( - assertion_runs=assertion_runs, + evaluator_runs=evaluator_runs, evaluator_scores=evaluator_scores, eval_run_id="test-run-id", actual_output={"result": "success"}, @@ -318,10 +317,11 @@ def test_update_legacy_eval_run_spec(self, progress_reporter): assert spec.method == "PUT" assert "coded/" not in spec.endpoint - # Both coded and legacy now send payload directly at root level + # Both legacy and coded APIs accept payload directly at root level (no wrapper) assert "request" not in spec.json assert spec.json["evalRunId"] == "test-run-id" - assert spec.json["assertionRuns"] == assertion_runs + # Legacy strategy outputs assertionRuns in payload + assert spec.json["assertionRuns"] == evaluator_runs assert spec.json["result"]["evaluatorScores"] == evaluator_scores assert spec.json["completionMetrics"]["duration"] == 5 # Backend expects integer status @@ -332,7 +332,8 @@ def test_update_coded_eval_run_spec_with_failure(self, progress_reporter): evaluator_runs: list[dict[str, Any]] = [] evaluator_scores: list[dict[str, Any]] = [] - spec = progress_reporter._update_coded_eval_run_spec( + # Now uses unified _update_eval_run_spec with is_coded=True + spec = progress_reporter._update_eval_run_spec( evaluator_runs=evaluator_runs, evaluator_scores=evaluator_scores, eval_run_id="test-run-id", @@ -349,11 +350,11 @@ def test_update_coded_eval_run_spec_with_failure(self, progress_reporter): def test_update_legacy_eval_run_spec_with_failure(self, progress_reporter): """Test updating eval run spec for legacy evaluators with failure.""" - assertion_runs: list[dict[str, Any]] = [] + evaluator_runs: list[dict[str, Any]] = [] evaluator_scores: list[dict[str, Any]] = [] spec = progress_reporter._update_eval_run_spec( - assertion_runs=assertion_runs, + evaluator_runs=evaluator_runs, evaluator_scores=evaluator_scores, eval_run_id="test-run-id", actual_output={}, @@ -364,7 +365,7 @@ def test_update_legacy_eval_run_spec_with_failure(self, progress_reporter): assert spec.method == "PUT" assert "coded/" not in spec.endpoint - # Both coded and legacy now send payload directly at root level + # Both legacy and coded APIs accept payload directly at root level (no wrapper) assert "request" not in spec.json assert spec.json["evalRunId"] == "test-run-id" # Backend expects integer status @@ -527,7 +528,7 @@ def test_update_eval_set_run_spec_with_success_legacy(self, progress_reporter): assert spec.method == "PUT" assert "coded/" not in spec.endpoint - # Both coded and legacy now send payload directly at root level + # Both legacy and coded APIs accept payload directly at root level (no wrapper) assert "request" not in spec.json assert spec.json["evalSetRunId"] == "test-run-id" # Backend expects integer status @@ -546,8 +547,176 @@ def test_update_eval_set_run_spec_with_failure_legacy(self, progress_reporter): assert spec.method == "PUT" assert "coded/" not in spec.endpoint - # Both coded and legacy now send payload directly at root level + # Both legacy and coded APIs accept payload directly at root level (no wrapper) assert "request" not in spec.json assert spec.json["evalSetRunId"] == "test-run-id" # Backend expects integer status assert spec.json["status"] == 3 # FAILED + + +# Tests for agent snapshot extraction +class TestAgentSnapshotExtraction: + """Tests for extracting agent snapshot with proper schema handling.""" + + def test_extract_agent_snapshot_reads_from_entry_points( + self, progress_reporter, tmp_path, monkeypatch + ): + """Test that agent snapshot reads schemas from entry points file.""" + import os + + # Create a temporary entry points file with full schemas + entry_points_data = { + "entryPoints": [ + { + "filePath": "test_agent", + "uniqueId": "test-uuid", + "type": "agent", + "input": { + "type": "object", + "properties": {"query": {"type": "string"}}, + }, + "output": { + "type": "object", + "properties": {"response": {"type": "string"}}, + }, + } + ] + } + + entry_points_file = tmp_path / "entry-points.json" + with open(entry_points_file, "w") as f: + json.dump(entry_points_data, f) + + # Change to the temp directory so the reporter finds the file + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + snapshot = progress_reporter._extract_agent_snapshot( + entrypoint="test_agent" + ) + + # Should read full schemas from entry points + assert snapshot.input_schema == { + "type": "object", + "properties": {"query": {"type": "string"}}, + } + assert snapshot.output_schema == { + "type": "object", + "properties": {"response": {"type": "string"}}, + } + finally: + os.chdir(original_cwd) + + def test_extract_agent_snapshot_returns_empty_when_no_file(self, progress_reporter): + """Test that empty schemas are returned when entry points file doesn't exist.""" + snapshot = progress_reporter._extract_agent_snapshot( + entrypoint="nonexistent_agent" + ) + + assert snapshot.input_schema == {} + assert snapshot.output_schema == {} + + def test_extract_agent_snapshot_warns_when_entrypoint_is_none( + self, progress_reporter, caplog + ): + """Test that a warning is logged when entrypoint is None.""" + import logging + + with caplog.at_level(logging.WARNING): + snapshot = progress_reporter._extract_agent_snapshot(entrypoint=None) + + assert snapshot.input_schema == {} + assert snapshot.output_schema == {} + assert "Entrypoint not provided" in caplog.text + assert "falling back to empty inputSchema" in caplog.text + + def test_extract_agent_snapshot_warns_when_entrypoint_is_empty( + self, progress_reporter, caplog + ): + """Test that a warning is logged when entrypoint is empty string.""" + import logging + + with caplog.at_level(logging.WARNING): + snapshot = progress_reporter._extract_agent_snapshot(entrypoint="") + + assert snapshot.input_schema == {} + assert snapshot.output_schema == {} + assert "Entrypoint not provided" in caplog.text + + def test_extract_agent_snapshot_returns_empty_when_entrypoint_not_found( + self, progress_reporter, tmp_path + ): + """Test that empty schemas are returned when entrypoint is not in file.""" + import os + + # Create entry points file without the requested entrypoint + entry_points_data = { + "entryPoints": [ + { + "filePath": "other_agent", + "uniqueId": "test-uuid", + "type": "agent", + "input": {"type": "object"}, + "output": {"type": "object"}, + } + ] + } + + entry_points_file = tmp_path / "entry-points.json" + with open(entry_points_file, "w") as f: + json.dump(entry_points_data, f) + + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + snapshot = progress_reporter._extract_agent_snapshot( + entrypoint="nonexistent_agent" + ) + + assert snapshot.input_schema == {} + assert snapshot.output_schema == {} + finally: + os.chdir(original_cwd) + + def test_agent_snapshot_serializes_with_camel_case( + self, progress_reporter, tmp_path + ): + """Test that agent snapshot serializes to correct JSON format with camelCase.""" + import os + + entry_points_data = { + "entryPoints": [ + { + "filePath": "test_agent", + "uniqueId": "test-uuid", + "type": "agent", + "input": {"type": "object", "properties": {}}, + "output": {"type": "object", "properties": {}}, + } + ] + } + + entry_points_file = tmp_path / "entry-points.json" + with open(entry_points_file, "w") as f: + json.dump(entry_points_data, f) + + original_cwd = os.getcwd() + os.chdir(tmp_path) + + try: + snapshot = progress_reporter._extract_agent_snapshot( + entrypoint="test_agent" + ) + + # Serialize using pydantic + serialized = snapshot.model_dump(by_alias=True) + + # Should have camelCase keys + assert "inputSchema" in serialized + assert "outputSchema" in serialized + assert serialized["inputSchema"] == {"type": "object", "properties": {}} + assert serialized["outputSchema"] == {"type": "object", "properties": {}} + finally: + os.chdir(original_cwd) diff --git a/tests/cli/eval/reporting/test_strategies.py b/tests/cli/eval/reporting/test_strategies.py new file mode 100644 index 000000000..400424f52 --- /dev/null +++ b/tests/cli/eval/reporting/test_strategies.py @@ -0,0 +1,244 @@ +"""Tests for evaluation reporting strategies. + +This module tests the strategy classes including: +- LegacyEvalReportingStrategy +- CodedEvalReportingStrategy +- ID conversion behavior +- Payload structure generation +""" + +import uuid + +import pytest + +from uipath._cli._evals._reporting._strategies import ( + CodedEvalReportingStrategy, + LegacyEvalReportingStrategy, +) + + +class TestLegacyEvalReportingStrategy: + """Tests for LegacyEvalReportingStrategy.""" + + @pytest.fixture + def strategy(self): + """Create a LegacyEvalReportingStrategy instance.""" + return LegacyEvalReportingStrategy() + + def test_endpoint_suffix_is_empty(self, strategy): + """Test that legacy strategy has empty endpoint suffix.""" + assert strategy.endpoint_suffix == "" + + def test_convert_id_with_valid_uuid(self, strategy): + """Test that valid UUIDs are returned unchanged.""" + valid_uuid = "550e8400-e29b-41d4-a716-446655440000" + assert strategy.convert_id(valid_uuid) == valid_uuid + + def test_convert_id_with_string_id(self, strategy): + """Test that string IDs are converted to deterministic UUIDs.""" + string_id = "my-custom-id" + result = strategy.convert_id(string_id) + + # Result should be a valid UUID + uuid.UUID(result) + + # Same input should produce same output (deterministic) + assert strategy.convert_id(string_id) == result + + def test_convert_id_with_different_strings_produces_different_uuids(self, strategy): + """Test that different string IDs produce different UUIDs.""" + id1 = strategy.convert_id("id-one") + id2 = strategy.convert_id("id-two") + + assert id1 != id2 + + def test_create_eval_set_run_payload_structure(self, strategy): + """Test the structure of legacy eval set run payload.""" + from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot + + agent_snapshot = StudioWebAgentSnapshot( + input_schema={"type": "object"}, output_schema={"type": "object"} + ) + + payload = strategy.create_eval_set_run_payload( + eval_set_id="test-eval-set", + agent_snapshot=agent_snapshot, + no_of_evals=5, + project_id="test-project", + ) + + assert payload["agentId"] == "test-project" + assert payload["status"] == 1 # IN_PROGRESS + assert payload["numberOfEvalsExecuted"] == 5 + assert payload["source"] == 0 + assert "agentSnapshot" in payload + + def test_create_update_eval_run_payload_uses_assertion_runs(self, strategy): + """Test that legacy update payload uses assertionRuns field.""" + evaluator_runs = [{"evaluatorId": "test-1", "status": 2}] + evaluator_scores = [{"evaluatorId": "test-1", "value": 0.9}] + + payload = strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=evaluator_runs, + evaluator_scores=evaluator_scores, + actual_output={"result": "success"}, + execution_time=5.0, + success=True, + ) + + assert "assertionRuns" in payload + assert payload["assertionRuns"] == evaluator_runs + assert "evaluatorRuns" not in payload + assert payload["result"]["evaluatorScores"] == evaluator_scores + + def test_create_update_eval_set_run_payload_converts_ids(self, strategy): + """Test that eval set run update converts evaluator IDs.""" + evaluator_scores = {"my-evaluator": 0.85} + + payload = strategy.create_update_eval_set_run_payload( + eval_set_run_id="run-id", + evaluator_scores=evaluator_scores, + success=True, + ) + + # Check that the evaluator ID was converted + assert len(payload["evaluatorScores"]) == 1 + score_entry = payload["evaluatorScores"][0] + assert score_entry["evaluatorId"] != "my-evaluator" # Should be converted + # Verify it's a valid UUID + uuid.UUID(score_entry["evaluatorId"]) + + +class TestCodedEvalReportingStrategy: + """Tests for CodedEvalReportingStrategy.""" + + @pytest.fixture + def strategy(self): + """Create a CodedEvalReportingStrategy instance.""" + return CodedEvalReportingStrategy() + + def test_endpoint_suffix_is_coded(self, strategy): + """Test that coded strategy has 'coded/' endpoint suffix.""" + assert strategy.endpoint_suffix == "coded/" + + def test_convert_id_returns_unchanged(self, strategy): + """Test that IDs are returned unchanged.""" + string_id = "my-custom-id" + assert strategy.convert_id(string_id) == string_id + + uuid_id = "550e8400-e29b-41d4-a716-446655440000" + assert strategy.convert_id(uuid_id) == uuid_id + + def test_create_eval_set_run_payload_keeps_original_id(self, strategy): + """Test that eval set ID is kept unchanged.""" + from uipath._cli._evals._models._sw_reporting import StudioWebAgentSnapshot + + agent_snapshot = StudioWebAgentSnapshot( + input_schema={"type": "object"}, output_schema={"type": "object"} + ) + + payload = strategy.create_eval_set_run_payload( + eval_set_id="my-eval-set-id", + agent_snapshot=agent_snapshot, + no_of_evals=3, + project_id="test-project", + ) + + assert payload["evalSetId"] == "my-eval-set-id" # Unchanged + + def test_create_update_eval_run_payload_uses_evaluator_runs(self, strategy): + """Test that coded update payload uses evaluatorRuns field.""" + evaluator_runs = [{"evaluatorId": "test-1", "status": 2}] + evaluator_scores = [{"evaluatorId": "test-1", "value": 0.9}] + + payload = strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=evaluator_runs, + evaluator_scores=evaluator_scores, + actual_output={"result": "success"}, + execution_time=5.0, + success=True, + ) + + assert "evaluatorRuns" in payload + assert payload["evaluatorRuns"] == evaluator_runs + assert "assertionRuns" not in payload + assert ( + payload["result"]["scores"] == evaluator_scores + ) # "scores" not "evaluatorScores" + + def test_create_update_eval_set_run_payload_keeps_ids(self, strategy): + """Test that eval set run update keeps evaluator IDs unchanged.""" + evaluator_scores = {"my-evaluator": 0.85} + + payload = strategy.create_update_eval_set_run_payload( + eval_set_run_id="run-id", + evaluator_scores=evaluator_scores, + success=True, + ) + + # Check that the evaluator ID was NOT converted + assert len(payload["evaluatorScores"]) == 1 + score_entry = payload["evaluatorScores"][0] + assert score_entry["evaluatorId"] == "my-evaluator" # Should be unchanged + + +class TestStrategyStatusHandling: + """Tests for status handling in both strategies.""" + + @pytest.fixture + def legacy_strategy(self): + return LegacyEvalReportingStrategy() + + @pytest.fixture + def coded_strategy(self): + return CodedEvalReportingStrategy() + + def test_legacy_success_status(self, legacy_strategy): + """Test legacy strategy sets COMPLETED status on success.""" + payload = legacy_strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=[], + evaluator_scores=[], + actual_output={}, + execution_time=0.0, + success=True, + ) + assert payload["status"] == 2 # COMPLETED + + def test_legacy_failure_status(self, legacy_strategy): + """Test legacy strategy sets FAILED status on failure.""" + payload = legacy_strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=[], + evaluator_scores=[], + actual_output={}, + execution_time=0.0, + success=False, + ) + assert payload["status"] == 3 # FAILED + + def test_coded_success_status(self, coded_strategy): + """Test coded strategy sets COMPLETED status on success.""" + payload = coded_strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=[], + evaluator_scores=[], + actual_output={}, + execution_time=0.0, + success=True, + ) + assert payload["status"] == 2 # COMPLETED + + def test_coded_failure_status(self, coded_strategy): + """Test coded strategy sets FAILED status on failure.""" + payload = coded_strategy.create_update_eval_run_payload( + eval_run_id="run-id", + evaluator_runs=[], + evaluator_scores=[], + actual_output={}, + execution_time=0.0, + success=False, + ) + assert payload["status"] == 3 # FAILED diff --git a/tests/cli/eval/reporting/test_utils.py b/tests/cli/eval/reporting/test_utils.py new file mode 100644 index 000000000..15afbfeab --- /dev/null +++ b/tests/cli/eval/reporting/test_utils.py @@ -0,0 +1,89 @@ +"""Tests for evaluation reporting utilities. + +This module tests utility functions and decorators including: +- gracefully_handle_errors decorator +""" + +from unittest.mock import Mock + +import pytest + +from uipath._cli._evals._reporting._utils import gracefully_handle_errors + + +class TestGracefullyHandleErrors: + """Tests for the gracefully_handle_errors decorator.""" + + @pytest.mark.asyncio + async def test_successful_execution(self): + """Test that successful functions return normally.""" + + class TestClass: + _console = Mock() + + @gracefully_handle_errors + async def test_method(self, value): + return value * 2 + + obj = TestClass() + result = await obj.test_method(5) + assert result == 10 + + @pytest.mark.asyncio + async def test_exception_returns_none(self): + """Test that exceptions are caught and None is returned.""" + + class TestClass: + _console = Mock() + + @gracefully_handle_errors + async def test_method(self): + raise ValueError("Test error") + + obj = TestClass() + result = await obj.test_method() + assert result is None + + @pytest.mark.asyncio + async def test_exception_without_console(self): + """Test that exceptions are handled even without _console attribute.""" + + class TestClass: + @gracefully_handle_errors + async def test_method(self): + raise RuntimeError("Test error") + + obj = TestClass() + result = await obj.test_method() + assert result is None + + @pytest.mark.asyncio + async def test_preserves_function_metadata(self): + """Test that the decorator preserves function metadata.""" + + class TestClass: + _console = Mock() + + @gracefully_handle_errors + async def documented_method(self): + """This is a documented method.""" + return "success" + + obj = TestClass() + assert obj.documented_method.__name__ == "documented_method" + assert "documented" in obj.documented_method.__doc__ + + @pytest.mark.asyncio + async def test_handles_multiple_args_and_kwargs(self): + """Test that the decorator handles multiple arguments correctly.""" + + class TestClass: + _console = Mock() + + @gracefully_handle_errors + async def test_method(self, a, b, c=None, d=None): + return a + b + (c or 0) + (d or 0) + + obj = TestClass() + result = await obj.test_method(1, 2, c=3, d=4) + assert result == 10