diff --git a/AGENTS.md b/AGENTS.md index ae89322..b407a71 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -163,6 +163,22 @@ Tests mirror source structure in `tests/`: Use `pytest.mark.performance` for performance tests (exclude with `-m "not performance"`). +### Test Fixture Patterns + +When writing integration tests that construct `WorkflowConfig` programmatically, follow these conventions (see `tests/test_engine/test_limits.py` for canonical examples): + +- `AgentDef` uses `prompt=` (not `instructions=`), `output={"key": OutputField(type="string")}` (dict, not list), and `routes=[RouteDef(...)]` (not raw dicts). +- `WorkflowDef` requires `entry_point=` and places `limits=` inside `workflow=`. `agents=` and `output=` are top-level on `WorkflowConfig`. +- The engine entry point is `await engine.run({})` (not `execute`). +- To test with controlled token/cost data, patch `provider.execute` to return a custom `AgentOutput` with explicit `input_tokens`, `output_tokens`, and `model` fields. + +### Resume / Checkpoint Parity + +When adding new fields to `LimitEnforcer`: + +- **Transient fields** (reset each run): add to `from_dict()` as parameters sourced from the current workflow config, like `timeout_seconds`, `budget_usd`, `budget_mode`. Update the call site in `cli/run.py` → `resume_workflow_async()`. +- **Persistent fields** (survive across resume): add to both `to_dict()` and `from_dict()` deserialization, like `max_iterations`, `current_iteration`, `execution_history`. + ## Code Style - Python 3.12+ diff --git a/docs/configuration.md b/docs/configuration.md index 679469b..8d87039 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -266,8 +266,10 @@ Safety limits prevent runaway execution: ```yaml workflow: limits: - max_iterations: 10 # Default: 10, max: 100 - timeout_seconds: 600 # Default: 600, max: 3600 + max_iterations: 10 # Default: 10, max: 500 + timeout_seconds: 600 # Default: None (unlimited) + budget_usd: 5.00 # Default: None (no budget tracking) + budget_mode: audit # Default: audit. Options: audit, enforce ``` **max_iterations**: @@ -278,6 +280,21 @@ workflow: - Total workflow timeout - Includes all agent executions +**budget_usd** and **budget_mode**: +- Tracks cumulative cost and acts when the budget is exceeded +- `audit` mode (default): emits a `budget_exceeded` event and logs a warning, + but the workflow continues — use this to discover cost profiles +- `enforce` mode: emits a `budget_exceeded` event, saves a checkpoint, + and stops the workflow with a `BudgetExceededError` (resumable via + `conductor resume` after increasing the budget) +- When `budget_usd` is not set, no budget tracking occurs + +**Recommended graduation path**: + +1. Run workflows without a budget to see costs in the summary +2. Add `budget_usd` in `audit` mode to track overshoots without breaking workflows +3. Switch to `enforce` mode once you know your cost profile + ## Complete Examples ### Claude Configuration @@ -394,7 +411,8 @@ export CONDUCTOR_LOG_LEVEL=DEBUG # INFO, DEBUG, WARNING, ERROR 1. **Set conservative limits** initially (`max_iterations: 10`) 2. **Use timeout** to prevent long-running workflows -3. **Test with dry-run** before production +3. **Set a cost budget** — start with `budget_usd` in `audit` mode to learn your cost profile, then switch to `enforce` +4. **Test with dry-run** before production ## Troubleshooting diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 722aec1..94cc5ca 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -1704,6 +1704,8 @@ async def resume_workflow_async( restored_limits = LimitEnforcer.from_dict( cp.limits, timeout_seconds=config.workflow.limits.timeout_seconds, + budget_usd=config.workflow.limits.budget_usd, + budget_mode=config.workflow.limits.budget_mode, ) # Construct the web dashboard early (subscribes to the emitter on diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 43e9d65..f0dc8cf 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -309,6 +309,26 @@ class LimitsConfig(BaseModel): a hard time limit. """ + budget_usd: float | None = Field(default=None, ge=0.0) + """Maximum cost budget for the workflow in USD. + + When set, the engine tracks cumulative cost and acts according to + ``budget_mode`` when the budget is exceeded. Default is None + (no budget tracking). + """ + + budget_mode: Literal["audit", "enforce"] = "audit" + """How the engine responds when ``budget_usd`` is exceeded. + + - ``audit``: emit a ``budget_exceeded`` event and log a warning, + but allow the workflow to continue. Use this to discover cost + profiles before applying hard limits. + - ``enforce``: emit a ``budget_exceeded`` event, save a checkpoint, + and stop the workflow with a ``BudgetExceededError``. + + Only takes effect when ``budget_usd`` is set. Default is ``audit``. + """ + class PricingOverride(BaseModel): """Custom pricing for a specific model. diff --git a/src/conductor/engine/limits.py b/src/conductor/engine/limits.py index a0baec5..35ee554 100644 --- a/src/conductor/engine/limits.py +++ b/src/conductor/engine/limits.py @@ -14,6 +14,7 @@ from typing import Any from conductor.exceptions import ( + BudgetExceededError, MaxIterationsError, ) from conductor.exceptions import ( @@ -52,6 +53,12 @@ class LimitEnforcer: timeout_seconds: int | None = None """Maximum wall-clock time for entire workflow. None means unlimited.""" + budget_usd: float | None = None + """Maximum cost budget in USD. None means no budget tracking.""" + + budget_mode: str = "audit" + """Budget enforcement mode: 'audit' (warn only) or 'enforce' (stop).""" + current_iteration: int = 0 """Current iteration count.""" @@ -64,6 +71,9 @@ class LimitEnforcer: current_agent: str | None = None """Currently executing agent name.""" + _budget_exceeded_emitted: bool = field(default=False, repr=False) + """Internal flag to emit budget_exceeded event only once.""" + def to_dict(self) -> dict[str, Any]: """Serialize limit state to a JSON-compatible dict. @@ -85,17 +95,22 @@ def from_dict( cls, data: dict[str, Any], timeout_seconds: int | None = None, + budget_usd: float | None = None, + budget_mode: str = "audit", ) -> LimitEnforcer: """Reconstruct a LimitEnforcer from a serialized dict. Uses ``max_iterations`` from the checkpoint (it may have been increased by the user) and ``timeout_seconds`` from the current workflow config so that the resumed run gets a fresh timeout - window. + window. ``budget_usd`` and ``budget_mode`` come from the current + config so that the resumed run gets a fresh budget window. Args: data: Dict previously produced by ``to_dict()``. timeout_seconds: Timeout from the workflow config (fresh window). + budget_usd: Budget from the workflow config (fresh window). + budget_mode: Budget mode from the workflow config. Returns: A new LimitEnforcer with restored iteration state and a fresh @@ -104,6 +119,8 @@ def from_dict( enforcer = cls( max_iterations=data.get("max_iterations", 10), timeout_seconds=timeout_seconds, + budget_usd=budget_usd, + budget_mode=budget_mode, ) enforcer.current_iteration = data.get("current_iteration", 0) enforcer.execution_history = list(data.get("execution_history", [])) @@ -239,6 +256,35 @@ def check_timeout(self) -> None: current_agent=self.current_agent, ) + def check_budget(self, spent_usd: float) -> tuple[bool, bool]: + """Check if the workflow cost budget has been exceeded. + + Returns a tuple indicating whether the budget was exceeded and + whether this is the first time the overshoot was detected (so + the caller can emit a one-time event). + + In ``enforce`` mode the caller should raise ``BudgetExceededError`` + after emitting the event. In ``audit`` mode the caller should + log a warning and continue. + + Args: + spent_usd: Current cumulative cost from UsageTracker. + + Returns: + Tuple of (exceeded, first_time). ``exceeded`` is True when + ``spent_usd > budget_usd``. ``first_time`` is True only on + the first call that detects the overshoot. + """ + if self.budget_usd is None: + return (False, False) + + if spent_usd > self.budget_usd: + first_time = not self._budget_exceeded_emitted + self._budget_exceeded_emitted = True + return (True, first_time) + + return (False, False) + def get_elapsed_time(self) -> float: """Get the elapsed time since workflow start. diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index eccc740..f5b20b6 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -27,6 +27,7 @@ from conductor.events import WorkflowEvent, WorkflowEventEmitter from conductor.exceptions import ( AgentTimeoutError, + BudgetExceededError, ConductorError, ExecutionError, InterruptError, @@ -356,6 +357,8 @@ def __init__( self.limits = LimitEnforcer( max_iterations=config.workflow.limits.max_iterations, timeout_seconds=config.workflow.limits.timeout_seconds, + budget_usd=config.workflow.limits.budget_usd, + budget_mode=config.workflow.limits.budget_mode, ) self.gate_handler = HumanGateHandler(skip_gates=skip_gates) self.max_iterations_handler = MaxIterationsHandler(skip_gates=skip_gates) @@ -479,6 +482,58 @@ def _emit(self, event_type: str, data: dict[str, Any]) -> None: event = WorkflowEvent(type=event_type, timestamp=_time.time(), data=data) self._event_emitter.emit(event) + def _check_budget(self) -> None: + """Check whether the workflow cost budget has been exceeded. + + Reads current spend from ``usage_tracker``, delegates the + threshold check to ``LimitEnforcer.check_budget()``, and acts + according to ``budget_mode``: + + - On first overshoot: emit a ``budget_exceeded`` event. + - ``enforce`` mode: raise ``BudgetExceededError`` (triggers + checkpoint + workflow stop). + - ``audit`` mode: log a warning and continue. + + Subsequent overshoots in ``audit`` mode are silent (no repeated + events or warnings). + """ + summary = self.usage_tracker.get_summary() + spent = summary.total_cost_usd or 0.0 + exceeded, first_time = self.limits.check_budget(spent) + + if not exceeded: + return + + budget = self.limits.budget_usd # guaranteed non-None when exceeded + assert budget is not None # for type narrowing + + if first_time: + self._emit( + "budget_exceeded", + { + "budget_usd": budget, + "spent_usd": spent, + "budget_mode": self.limits.budget_mode, + "current_agent": self.limits.current_agent, + }, + ) + + if self.limits.budget_mode == "enforce": + raise BudgetExceededError( + f"Workflow exceeded cost budget (${budget:.2f}): " + f"spent ${spent:.2f}", + budget_usd=budget, + spent_usd=spent, + current_agent=self.limits.current_agent, + ) + + if first_time: + logger.warning( + "Budget exceeded (audit mode): spent $%.4f of $%.2f budget", + spent, + budget, + ) + def _yaml_source_field(self) -> dict[str, str]: """Return ``{"yaml_source": }`` if the workflow file is readable.""" if self.workflow_path is None: @@ -2038,8 +2093,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: for_each_group.name, count=for_each_output.count ) - # Check timeout after for-each group + # Check timeout and budget after for-each group self.limits.check_timeout() + self._check_budget() # Evaluate routes from for-each group route_result = self._evaluate_for_each_routes( @@ -2109,8 +2165,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: agent_count = len(parallel_group.agents) self.limits.record_execution(parallel_group.name, count=agent_count) - # Check timeout after parallel group + # Check timeout and budget after parallel group self.limits.check_timeout() + self._check_budget() # Evaluate routes from parallel group route_result = self._evaluate_parallel_routes( @@ -2355,6 +2412,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: self.context.store(agent.name, output_content) self.limits.record_execution(agent.name) self.limits.check_timeout() + self._check_budget() route_result = self._evaluate_routes(agent, output_content) @@ -2446,6 +2504,7 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: self.context.store(agent.name, sub_output) self.limits.record_execution(agent.name) self.limits.check_timeout() + self._check_budget() route_result = self._evaluate_routes(agent, sub_output) @@ -2576,8 +2635,9 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: # Record successful execution self.limits.record_execution(agent.name) - # Check timeout after each agent + # Check timeout and budget after each agent self.limits.check_timeout() + self._check_budget() # Evaluate routes using the Router route_result = self._evaluate_routes(agent, output.content) @@ -2624,6 +2684,10 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: fail_data["elapsed_seconds"] = e.elapsed_seconds fail_data["timeout_seconds"] = e.timeout_seconds fail_data["current_agent"] = e.current_agent + elif isinstance(e, BudgetExceededError): + fail_data["budget_usd"] = e.budget_usd + fail_data["spent_usd"] = e.spent_usd + fail_data["current_agent"] = e.current_agent self._emit("workflow_failed", fail_data) # Execute on_error hook with error information self._execute_hook("on_error", error=e) diff --git a/src/conductor/exceptions.py b/src/conductor/exceptions.py index d3bac88..a39c965 100644 --- a/src/conductor/exceptions.py +++ b/src/conductor/exceptions.py @@ -484,6 +484,44 @@ def __init__( self.agent_name = agent_name +class BudgetExceededError(ExecutionError): + """Raised when a workflow exceeds its cost budget in enforce mode. + + This is a safety mechanism to prevent runaway spending in agentic + workflows. Only raised when ``budget_mode`` is ``enforce``. + + Attributes: + budget_usd: The configured budget limit. + spent_usd: The actual amount spent when the limit was exceeded. + current_agent: The agent that was executing when the budget was exceeded. + """ + + def __init__( + self, + message: str, + *, + budget_usd: float, + spent_usd: float, + current_agent: str | None = None, + suggestion: str | None = None, + file_path: str | None = None, + line_number: int | None = None, + ) -> None: + self.budget_usd = budget_usd + self.spent_usd = spent_usd + self.current_agent = current_agent + + if suggestion is None: + suggestion = ( + f"Increase limits.budget_usd (currently ${budget_usd:.2f}) " + f"or switch to budget_mode: audit to continue without enforcement" + ) + if current_agent: + suggestion += f". Budget exceeded after agent '{current_agent}'" + + super().__init__(message, suggestion, file_path, line_number) + + class HumanGateError(ExecutionError): """Raised when a human gate encounters an error. diff --git a/tests/test_engine/test_budget.py b/tests/test_engine/test_budget.py new file mode 100644 index 0000000..6aef6e9 --- /dev/null +++ b/tests/test_engine/test_budget.py @@ -0,0 +1,411 @@ +"""Tests for budget enforcement. + +Tests cover the graduation path: +1. No budget set (default) — no tracking, no warnings, no errors. +2. Budget set in audit mode — event emitted on overshoot, workflow continues. +3. Budget set in enforce mode — event emitted, workflow stops with BudgetExceededError. +4. LimitEnforcer.check_budget() unit tests (first-time flag, repeated calls). +5. Schema validation for budget fields. +""" + +from __future__ import annotations + +import pytest + +from conductor.config.schema import ( + AgentDef, + LimitsConfig, + OutputField, + RouteDef, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.limits import LimitEnforcer +from conductor.engine.usage import UsageTracker +from conductor.engine.workflow import WorkflowEngine +from conductor.events import WorkflowEventEmitter +from conductor.exceptions import BudgetExceededError +from conductor.providers.base import AgentOutput +from conductor.providers.copilot import CopilotProvider + + +# --------------------------------------------------------------------------- +# Schema / Config Tests +# --------------------------------------------------------------------------- + + +class TestBudgetSchema: + """Test LimitsConfig budget fields.""" + + def test_default_no_budget(self) -> None: + """No budget by default — graduation step 0.""" + limits = LimitsConfig() + assert limits.budget_usd is None + assert limits.budget_mode == "audit" + + def test_budget_usd_set(self) -> None: + limits = LimitsConfig(budget_usd=5.0) + assert limits.budget_usd == 5.0 + assert limits.budget_mode == "audit" + + def test_budget_enforce_mode(self) -> None: + limits = LimitsConfig(budget_usd=5.0, budget_mode="enforce") + assert limits.budget_usd == 5.0 + assert limits.budget_mode == "enforce" + + def test_budget_usd_zero_allowed(self) -> None: + limits = LimitsConfig(budget_usd=0.0) + assert limits.budget_usd == 0.0 + + def test_budget_usd_negative_rejected(self) -> None: + with pytest.raises(Exception): + LimitsConfig(budget_usd=-1.0) + + def test_budget_mode_invalid_rejected(self) -> None: + with pytest.raises(Exception): + LimitsConfig(budget_mode="invalid") # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# LimitEnforcer Unit Tests +# --------------------------------------------------------------------------- + + +class TestLimitEnforcerBudget: + """Unit tests for LimitEnforcer.check_budget().""" + + def test_no_budget_set_returns_not_exceeded(self) -> None: + """When budget_usd is None, check_budget always returns (False, False).""" + enforcer = LimitEnforcer() + exceeded, first_time = enforcer.check_budget(100.0) + assert exceeded is False + assert first_time is False + + def test_under_budget_returns_not_exceeded(self) -> None: + enforcer = LimitEnforcer(budget_usd=10.0) + exceeded, first_time = enforcer.check_budget(5.0) + assert exceeded is False + assert first_time is False + + def test_at_budget_returns_not_exceeded(self) -> None: + """Exactly at budget is not exceeded (> not >=).""" + enforcer = LimitEnforcer(budget_usd=10.0) + exceeded, first_time = enforcer.check_budget(10.0) + assert exceeded is False + assert first_time is False + + def test_over_budget_returns_exceeded_first_time(self) -> None: + enforcer = LimitEnforcer(budget_usd=10.0) + exceeded, first_time = enforcer.check_budget(10.01) + assert exceeded is True + assert first_time is True + + def test_over_budget_second_call_not_first_time(self) -> None: + """After first overshoot detection, subsequent calls return first_time=False.""" + enforcer = LimitEnforcer(budget_usd=10.0) + _, first1 = enforcer.check_budget(10.01) + assert first1 is True + _, first2 = enforcer.check_budget(15.0) + assert first2 is False + + def test_budget_mode_stored(self) -> None: + enforcer = LimitEnforcer(budget_usd=5.0, budget_mode="enforce") + assert enforcer.budget_mode == "enforce" + + def test_default_budget_mode_is_audit(self) -> None: + enforcer = LimitEnforcer(budget_usd=5.0) + assert enforcer.budget_mode == "audit" + + +# --------------------------------------------------------------------------- +# WorkflowEngine Integration Tests +# --------------------------------------------------------------------------- + + +def _make_config( + budget_usd: float | None = None, + budget_mode: str = "audit", + max_iterations: int = 10, +) -> WorkflowConfig: + """Build a minimal single-agent workflow config for budget tests.""" + return WorkflowConfig( + workflow=WorkflowDef( + name="budget-test", + entry_point="agent1", + limits=LimitsConfig( + max_iterations=max_iterations, + budget_usd=budget_usd, + budget_mode=budget_mode, + ), + ), + agents=[ + AgentDef( + name="agent1", + prompt="Say hello", + output={"result": OutputField(type="string")}, + ), + ], + output={"result": "{{ agent1.output.result }}"}, + ) + + +def _make_expensive_output() -> AgentOutput: + """Build an AgentOutput that costs roughly $18 (1M input + 1M output on claude-sonnet-4).""" + return AgentOutput( + content={"result": "expensive"}, + raw_response="{}", + input_tokens=1_000_000, + output_tokens=1_000_000, + model="claude-sonnet-4", + ) + + +def _make_cheap_output() -> AgentOutput: + """Build an AgentOutput that costs a fraction of a cent.""" + return AgentOutput( + content={"result": "cheap"}, + raw_response="{}", + input_tokens=100, + output_tokens=50, + model="claude-sonnet-4", + ) + + +class TestBudgetGraduationStep0: + """Graduation Step 0: No budget set — default behavior unchanged.""" + + @pytest.mark.asyncio + async def test_no_budget_workflow_completes(self) -> None: + """Without budget_usd, expensive agents run without any budget checks.""" + config = _make_config(budget_usd=None) + expensive = _make_expensive_output() + + def mock_handler(agent, prompt, context): + return expensive.content + + provider = CopilotProvider(mock_handler=mock_handler) + emitter = WorkflowEventEmitter() + events: list = [] + emitter.subscribe(lambda e: events.append(e)) + + engine = WorkflowEngine(config, provider, event_emitter=emitter) + # Patch the output so usage_tracker records the expensive tokens + original_execute = provider.execute + + async def patched_execute(*args, **kwargs): + return expensive + + provider.execute = patched_execute # type: ignore[assignment] + + result = await engine.run({}) + assert result is not None + + # No budget_exceeded event + budget_events = [e for e in events if e.type == "budget_exceeded"] + assert len(budget_events) == 0 + + +class TestBudgetGraduationStep1: + """Graduation Step 1: Budget in audit mode — warn but continue.""" + + @pytest.mark.asyncio + async def test_audit_mode_emits_event_and_continues(self) -> None: + """In audit mode, budget overshoot emits event but workflow completes.""" + config = _make_config(budget_usd=0.001, budget_mode="audit") + expensive = _make_expensive_output() + + def mock_handler(agent, prompt, context): + return expensive.content + + provider = CopilotProvider(mock_handler=mock_handler) + emitter = WorkflowEventEmitter() + events: list = [] + emitter.subscribe(lambda e: events.append(e)) + + engine = WorkflowEngine(config, provider, event_emitter=emitter) + original_execute = provider.execute + + async def patched_execute(*args, **kwargs): + return expensive + + provider.execute = patched_execute # type: ignore[assignment] + + result = await engine.run({}) + assert result is not None # Workflow completed despite overshoot + + # budget_exceeded event was emitted + budget_events = [e for e in events if e.type == "budget_exceeded"] + assert len(budget_events) == 1 + assert budget_events[0].data["budget_mode"] == "audit" + assert budget_events[0].data["budget_usd"] == 0.001 + assert budget_events[0].data["spent_usd"] > 0.001 + + @pytest.mark.asyncio + async def test_audit_mode_emits_event_only_once(self) -> None: + """In audit mode with looping workflow, event emits only on first overshoot.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="budget-loop-test", + entry_point="agent1", + limits=LimitsConfig( + max_iterations=5, + budget_usd=0.001, + budget_mode="audit", + ), + ), + agents=[ + AgentDef( + name="agent1", + prompt="Say hello", + output={"result": OutputField(type="string")}, + routes=[ + RouteDef( + to="agent1", + when="{{ context.iteration < 3 }}", + ), + RouteDef(to="$end"), + ], + ), + ], + output={"result": "{{ agent1.output.result }}"}, + ) + expensive = _make_expensive_output() + + call_count = 0 + + def mock_handler(agent, prompt, context): + return expensive.content + + provider = CopilotProvider(mock_handler=mock_handler) + emitter = WorkflowEventEmitter() + events: list = [] + emitter.subscribe(lambda e: events.append(e)) + + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + async def patched_execute(*args, **kwargs): + nonlocal call_count + call_count += 1 + return expensive + + provider.execute = patched_execute # type: ignore[assignment] + + result = await engine.run({}) + assert result is not None + assert call_count >= 2 + + # budget_exceeded emitted exactly once despite multiple over-budget agents + budget_events = [e for e in events if e.type == "budget_exceeded"] + assert len(budget_events) == 1 + + +class TestBudgetGraduationStep2: + """Graduation Step 2: Budget in enforce mode — stop on overshoot.""" + + @pytest.mark.asyncio + async def test_enforce_mode_raises_budget_exceeded(self) -> None: + """In enforce mode, budget overshoot raises BudgetExceededError.""" + config = _make_config(budget_usd=0.001, budget_mode="enforce") + expensive = _make_expensive_output() + + def mock_handler(agent, prompt, context): + return expensive.content + + provider = CopilotProvider(mock_handler=mock_handler) + emitter = WorkflowEventEmitter() + events: list = [] + emitter.subscribe(lambda e: events.append(e)) + + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + async def patched_execute(*args, **kwargs): + return expensive + + provider.execute = patched_execute # type: ignore[assignment] + + with pytest.raises(BudgetExceededError) as exc_info: + await engine.run({}) + + assert exc_info.value.budget_usd == 0.001 + assert exc_info.value.spent_usd > 0.001 + + # budget_exceeded event was emitted before the error + budget_events = [e for e in events if e.type == "budget_exceeded"] + assert len(budget_events) == 1 + assert budget_events[0].data["budget_mode"] == "enforce" + + # workflow_failed event was also emitted + fail_events = [e for e in events if e.type == "workflow_failed"] + assert len(fail_events) == 1 + assert fail_events[0].data["error_type"] == "BudgetExceededError" + assert "budget_usd" in fail_events[0].data + assert "spent_usd" in fail_events[0].data + + @pytest.mark.asyncio + async def test_enforce_mode_under_budget_completes(self) -> None: + """In enforce mode, a cheap workflow under budget completes normally.""" + config = _make_config(budget_usd=100.0, budget_mode="enforce") + cheap = _make_cheap_output() + + def mock_handler(agent, prompt, context): + return cheap.content + + provider = CopilotProvider(mock_handler=mock_handler) + emitter = WorkflowEventEmitter() + events: list = [] + emitter.subscribe(lambda e: events.append(e)) + + engine = WorkflowEngine(config, provider, event_emitter=emitter) + + async def patched_execute(*args, **kwargs): + return cheap + + provider.execute = patched_execute # type: ignore[assignment] + + result = await engine.run({}) + assert result is not None + + # No budget_exceeded events + budget_events = [e for e in events if e.type == "budget_exceeded"] + assert len(budget_events) == 0 + + +# --------------------------------------------------------------------------- +# BudgetExceededError Tests +# --------------------------------------------------------------------------- + + +class TestBudgetExceededError: + """Test BudgetExceededError exception.""" + + def test_error_attributes(self) -> None: + error = BudgetExceededError( + "over budget", + budget_usd=5.0, + spent_usd=7.50, + current_agent="agent1", + ) + assert error.budget_usd == 5.0 + assert error.spent_usd == 7.50 + assert error.current_agent == "agent1" + assert "5.00" in error.suggestion + assert "agent1" in error.suggestion + + def test_error_auto_suggestion(self) -> None: + error = BudgetExceededError( + "over budget", + budget_usd=10.0, + spent_usd=15.0, + ) + assert "limits.budget_usd" in error.suggestion + assert "audit" in error.suggestion + + def test_error_is_execution_error(self) -> None: + from conductor.exceptions import ExecutionError + + error = BudgetExceededError( + "over budget", + budget_usd=1.0, + spent_usd=2.0, + ) + assert isinstance(error, ExecutionError)