diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index bccd32d..1fb418f 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -49,6 +49,7 @@ LDApiClient, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_message_history_text, build_new_variation_prompt, @@ -57,6 +58,7 @@ from ldai_optimizer.util import ( RedactionFilter, await_if_needed, + estimate_cost, extract_json_from_response, generate_slug, interpolate_variables, @@ -128,6 +130,11 @@ def _compute_validation_count(pool_size: int) -> int: # under 80% of the baseline — i.e. at least 20% improvement. _DURATION_TOLERANCE = 0.80 +# Cost gate: a candidate must cost at most this fraction of the baseline +# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a +# cost reduction goal. 0.80 means at least 20% cheaper than the baseline. +_COST_TOLERANCE = 0.80 + # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid # allocating the dict on every on_status_update invocation. @@ -160,6 +167,7 @@ def __init__(self, ldClient: LDAIClient) -> None: self._last_optimization_result_id: Optional[str] = None self._initial_tool_keys: List[str] = [] self._total_token_usage: int = 0 + self._model_configs: List[Dict[str, Any]] = [] if os.environ.get("LAUNCHDARKLY_API_KEY"): self._has_api_key = True @@ -392,6 +400,7 @@ async def _call_judges( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> Dict[str, JudgeResult]: """ Call all judges in parallel (auto-path). @@ -411,6 +420,8 @@ async def _call_judges( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. Forwarded to acceptance judges whose statement implies a latency goal so they can mention the duration change in their rationale. + :param agent_usage: Token usage from the agent call. Forwarded to acceptance judges + whose statement implies a cost goal so they can mention token usage in their rationale. :return: Dictionary of judge results (score and rationale) """ if not self._options.judges: @@ -464,6 +475,7 @@ async def _call_judges( agent_tools=resolved_agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_usage, ) judge_results[judge_key] = result @@ -682,6 +694,7 @@ async def _evaluate_acceptance_judge( agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, agent_duration_ms: Optional[float] = None, + agent_usage: Optional[Any] = None, ) -> JudgeResult: """ Evaluate using an acceptance statement judge. @@ -699,6 +712,8 @@ async def _evaluate_acceptance_judge( :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. When the acceptance statement implies a latency goal, the judge is instructed to mention the duration change in its rationale. + :param agent_usage: Token usage from the agent call. When the acceptance statement + implies a cost goal, the judge is instructed to mention token usage and cost. :return: The judge result with score and rationale """ if not optimization_judge.acceptance_statement: @@ -757,8 +772,50 @@ async def _evaluate_acceptance_judge( f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. " ) instructions += ( - "Please mention the duration and any change from baseline in your rationale." + "In your rationale, state the duration and any change from baseline. " + "If the latency goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "response time — for example: switching to a faster model, shortening the " + "system prompt, or removing instructions that cause multi-step reasoning. " + "These suggestions will be used directly to generate the next variation." + ) + + if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}): + current_cost = estimate_cost( + agent_usage, + _find_model_config(self._current_model or "", self._model_configs), ) + baseline_cost = ( + self._history[0].estimated_cost_usd + if self._history and self._history[0].estimated_cost_usd is not None + else None + ) + if current_cost is not None: + instructions += ( + f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. " + ) + if agent_usage is not None: + instructions += ( + f"The agent's response used {agent_usage.input} input tokens " + f"and {agent_usage.output} output tokens " + f"(estimated cost: ${current_cost:.6f}). " + ) + if baseline_cost is not None: + delta = current_cost - baseline_cost + direction = "less" if delta < 0 else "more" + instructions += ( + f"The baseline cost (first iteration) was ${baseline_cost:.6f}. " + f"This response cost ${abs(delta):.6f} {direction} than the baseline. " + ) + instructions += ( + "In your rationale, state the token usage and cost, and any change from baseline. " + "If the cost goal is not yet met, include specific, actionable suggestions " + "for how the agent's instructions or model choice could be changed to reduce " + "cost — for example: switching to a cheaper model, shortening the system prompt " + "to reduce input tokens, removing unnecessary output instructions, or tightening " + "response length constraints. " + "These suggestions will be used directly to generate the next variation." + ) if resolved_variables: instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}" @@ -1082,6 +1139,11 @@ async def _run_ground_truth_optimization( ): sample_passed = self._evaluate_duration(optimize_context) + if sample_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + sample_passed = self._evaluate_cost(optimize_context) + if not sample_passed: logger.info( "[GT Attempt %d] -> Sample %d/%d FAILED", @@ -1227,12 +1289,19 @@ def _apply_new_variation_response( # This is a deterministic safety net for when the LLM ignores the prompt # instructions and hardcodes a concrete value (e.g. "user-123") instead # of the placeholder ("{{user_id}}"). + # Only check the variables that were actually used for this invocation so + # we don't spuriously replace values that happen to appear in other choices. + active_variables = ( + [variation_ctx.current_variables] + if variation_ctx.current_variables + else self._options.variable_choices + ) self._current_instructions, placeholder_warnings = restore_variable_placeholders( self._current_instructions, - self._options.variable_choices, + active_variables, ) for msg in placeholder_warnings: - logger.warning("[Iteration %d] -> %s", iteration, msg) + logger.debug("[Iteration %d] -> %s", iteration, msg) self._current_parameters = response_data["current_parameters"] @@ -1321,6 +1390,9 @@ async def _generate_new_variation( optimize_for_duration = _acceptance_criteria_implies_duration_optimization( self._options.judges ) + optimize_for_cost = _acceptance_criteria_implies_cost_optimization( + self._options.judges + ) instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1331,6 +1403,7 @@ async def _generate_new_variation( self._options.variable_choices, self._initial_instructions, optimize_for_duration=optimize_for_duration, + optimize_for_cost=optimize_for_cost, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1424,6 +1497,7 @@ async def optimize_from_config( model_configs = api_client.get_model_configs(options.project_key) except Exception as exc: logger.debug("Could not pre-fetch model configs: %s", exc) + self._model_configs = model_configs context = random.choice(options.context_choices) # _get_agent_config calls _initialize_class_members_from_config internally; @@ -1793,18 +1867,24 @@ async def _execute_agent_turn( agent_tools=agent_tools, expected_response=expected_response, agent_duration_ms=agent_duration_ms, + agent_usage=agent_response.usage, ) # Build the fully-populated result context before firing the evaluating event so # the PATCH includes scores, generationLatency, and completionResponse. This is # particularly important for non-final GT samples which receive no further status # events — without this, those fields would never be written to their API records. + agent_cost = estimate_cost( + agent_response.usage, + _find_model_config(self._current_model or "", self._model_configs), + ) result_ctx = dataclasses.replace( optimize_context, completion_response=completion_response, scores=scores, duration_ms=agent_duration_ms, usage=agent_response.usage, + estimated_cost_usd=agent_cost, ) if self._options.judges: @@ -1829,13 +1909,13 @@ def _accumulate_tokens(self, optimize_context: OptimizationContext) -> None: def _is_token_limit_exceeded(self) -> bool: """Return True if the accumulated token usage has met or exceeded the configured limit. - Returns False when no token limit is set so callers can use this as a - simple guard without needing to check for ``None`` themselves. + Returns False when no token limit is set, or when the limit is 0 (which is + treated as "no limit" — a sentinel value meaning the field was left unset). - :return: True if token limit is set and ``_total_token_usage >= token_limit``. + :return: True if a positive token limit is set and ``_total_token_usage >= token_limit``. """ limit: Optional[int] = getattr(self._options, "token_limit", None) - return limit is not None and self._total_token_usage >= limit + return bool(limit) and self._total_token_usage >= limit def _evaluate_response(self, optimize_context: OptimizationContext) -> bool: """ @@ -1896,6 +1976,42 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: ) return passed + def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool: + """ + Check whether the candidate's estimated cost meets the improvement target vs. the baseline. + + The baseline is history[0].estimated_cost_usd — the very first completed iteration, + representing the original unoptimized configuration's cost. The candidate must be + at least _COST_TOLERANCE cheaper (default: 20% improvement). + + The cost value is in USD when model pricing data is available, or raw total token + count as a proxy when pricing is absent. Both are comparable relative to their + own baselines. + + Returns True without blocking when no baseline is available (empty history or + history[0].estimated_cost_usd is None), or when the candidate's cost was not + captured. This avoids penalising configurations when cost data is missing. + + :param optimize_context: The completed turn context containing estimated_cost_usd + :return: True if the cost requirement is met or cannot be checked + """ + if not self._history or self._history[0].estimated_cost_usd is None: + return True + if optimize_context.estimated_cost_usd is None: + return True + baseline = self._history[0].estimated_cost_usd + passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE + if not passed: + logger.warning( + "[Iteration %d] -> Cost check failed: %.6f >= baseline %.6f * %.0f%% (%.6f)", + optimize_context.iteration, + optimize_context.estimated_cost_usd, + baseline, + _COST_TOLERANCE * 100, + baseline * _COST_TOLERANCE, + ) + return passed + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -2174,6 +2290,11 @@ async def _run_validation_phase( ): sample_passed = self._evaluate_duration(val_ctx) + if sample_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + sample_passed = self._evaluate_cost(val_ctx) + last_ctx = val_ctx if not sample_passed: @@ -2298,6 +2419,11 @@ async def _run_optimization( ): initial_passed = self._evaluate_duration(optimize_context) + if initial_passed and _acceptance_criteria_implies_cost_optimization( + self._options.judges + ): + initial_passed = self._evaluate_cost(optimize_context) + if initial_passed: all_valid, last_ctx = await self._run_validation_phase( optimize_context, iteration diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index fab3ed7..1f5e28c 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -217,6 +217,7 @@ class OptimizationContext: iteration: int = 0 # current iteration number duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration + estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens def copy_without_history(self) -> OptimizationContext: """ @@ -236,6 +237,7 @@ def copy_without_history(self) -> OptimizationContext: iteration=self.iteration, duration_ms=self.duration_ms, usage=self.usage, + estimated_cost_usd=self.estimated_cost_usd, ) def to_json(self) -> Dict[str, Any]: @@ -261,6 +263,7 @@ def to_json(self) -> Dict[str, Any]: "history": history_list, "iteration": self.iteration, "duration_ms": self.duration_ms, + "estimated_cost_usd": self.estimated_cost_usd, } if self.usage is not None: result["usage"] = { diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index 4aae5fe..eadf35d 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -16,6 +16,13 @@ re.IGNORECASE, ) +_COST_KEYWORDS = re.compile( + r"\b(cheap|cheaper|cheapest|costs?|costly|expensive|budget|affordable|" + r"spend|spending|economical|cost-effective|frugal|" + r"price|pricing|bill|billing)\b", + re.IGNORECASE, +) + def _acceptance_criteria_implies_duration_optimization( judges: Optional[Dict[str, OptimizationJudge]], @@ -39,6 +46,28 @@ def _acceptance_criteria_implies_duration_optimization( return False +def _acceptance_criteria_implies_cost_optimization( + judges: Optional[Dict[str, OptimizationJudge]], +) -> bool: + """Return True if any judge acceptance statement implies a cost reduction goal. + + Scans each judge's acceptance_statement for cost-related keywords. The + check is case-insensitive. Returns False when judges is None or no judge + carries an acceptance statement. + + :param judges: Judge configuration dict from OptimizationOptions, or None. + :return: True if cost optimization should be applied. + """ + if not judges: + return False + for judge in judges.values(): + if judge.acceptance_statement and _COST_KEYWORDS.search( + judge.acceptance_statement + ): + return True + return False + + def build_message_history_text( history: List[OptimizationContext], input_text: str, @@ -114,6 +143,7 @@ def build_new_variation_prompt( variable_choices: List[Dict[str, Any]], initial_instructions: str, optimize_for_duration: bool = False, + optimize_for_cost: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -133,6 +163,8 @@ def build_new_variation_prompt( :param initial_instructions: The original unmodified instructions template :param optimize_for_duration: When True, appends a duration optimization section instructing the LLM to prefer faster models and simpler instructions. + :param optimize_for_cost: When True, appends a cost optimization section + instructing the LLM to prefer cheaper models and reduce token usage. :return: The assembled prompt string """ sections = [ @@ -147,6 +179,7 @@ def build_new_variation_prompt( history, model_choices, variable_choices, initial_instructions ), variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", + variation_prompt_cost_optimization(model_choices) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) @@ -248,6 +281,8 @@ def variation_prompt_configuration( lines.append(f"Agent response: {previous_ctx.completion_response}") if previous_ctx.duration_ms is not None: lines.append(f"Agent duration: {previous_ctx.duration_ms:.0f}ms") + if previous_ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${previous_ctx.estimated_cost_usd:.6f}") return "\n".join(lines) else: return "\n".join( @@ -301,6 +336,8 @@ def variation_prompt_feedback( lines.append(feedback_line) if ctx.duration_ms is not None: lines.append(f"Agent duration: {ctx.duration_ms:.0f}ms") + if ctx.estimated_cost_usd is not None: + lines.append(f"Estimated agent cost: ${ctx.estimated_cost_usd:.6f}") return "\n".join(lines) @@ -556,3 +593,49 @@ def variation_prompt_duration_optimization(model_choices: List[str]) -> str: "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower latency.", ] ) + + +def variation_prompt_cost_optimization(model_choices: List[str]) -> str: + """ + Cost optimization section of the variation prompt. + + Included when acceptance criteria imply a cost reduction goal. Instructs + the LLM to treat token usage as a secondary objective — quality criteria + must still be met first — and provides concrete guidance on how to reduce + cost through model selection and instruction simplification. + + :param model_choices: List of model IDs the LLM may select from, so it can + apply its own knowledge of which models tend to be cheaper. + :return: The cost optimization prompt block. + """ + return "\n".join( + [ + "## Cost Optimization:", + "The acceptance criteria for this optimization implies that token usage / cost should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's cost.", + "Cost is driven by two factors: (1) the number of tokens processed, and (2) the per-token price of the model.", + "Target both factors with the strategies below.", + "", + "### Reducing token usage (input tokens):", + "- Remove redundant, verbose, or repeated phrasing from the instructions.", + "- Collapse multi-sentence explanations into a single concise directive.", + "- Remove examples or few-shot demonstrations unless they are essential for accuracy.", + "- Eliminate instructional scaffolding that the model does not need (e.g. 'You are a helpful assistant that...').", + "- Use bullet points instead of prose where possible — they are more token-efficient.", + "", + "### Reducing token usage (output tokens):", + "- Instruct the agent to be concise and avoid unnecessary elaboration.", + "- Specify the exact format and length of the expected response (e.g. 'Respond in one sentence.').", + "- Set or reduce max_tokens if the current value allows longer responses than needed.", + "- Avoid instructions that encourage the agent to 'explain its reasoning' unless required by the acceptance criteria.", + "", + "### Reducing per-token cost via model selection:", + "- Consider switching to a cheaper model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of relative model pricing to prefer lower-cost options.", + " Only switch models if the cheaper model is capable of satisfying the acceptance criteria.", + "", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower cost.", + "Apply cost-reduction changes incrementally: prefer the smallest change that measurably reduces cost.", + ] + ) diff --git a/packages/optimization/src/ldai_optimizer/util.py b/packages/optimization/src/ldai_optimizer/util.py index 46429e5..a3671e2 100644 --- a/packages/optimization/src/ldai_optimizer/util.py +++ b/packages/optimization/src/ldai_optimizer/util.py @@ -5,7 +5,10 @@ import logging import random import re -from typing import Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union + +if TYPE_CHECKING: + from ldai.tracker import TokenUsage from ldai_optimizer._slug_words import _ADJECTIVES, _NOUNS @@ -313,3 +316,40 @@ def judge_passed(score: float, threshold: float, is_inverted: bool) -> bool: the score must stay at or below the threshold: ``score <= threshold``. """ return score <= threshold if is_inverted else score >= threshold + + +def estimate_cost( + usage: Optional["TokenUsage"], + model_config: Optional[Dict[str, Any]], +) -> Optional[float]: + """Estimate the monetary cost of a single agent call in USD. + + Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config. + Returns ``None`` when either ``usage`` is ``None`` or no pricing fields are + present on the model config — ensuring the return value is always in USD or + absent, never a raw token count. This prevents unit-mismatch bugs when + comparing costs across iterations where the model (and its pricing + availability) may differ. + + ``costPerCachedInputToken`` is intentionally ignored — the estimate uses + input/output tokens only. + + :param usage: Token usage from the agent call. When ``None``, returns ``None``. + :param model_config: Model config dict from ``get_model_configs()``, or ``None``. + :return: Estimated cost in USD, or ``None`` if usage or pricing data is absent. + """ + if usage is None: + return None + + input_price = model_config.get("costPerInputToken") if model_config else None + output_price = model_config.get("costPerOutputToken") if model_config else None + + if input_price is None and output_price is None: + return None + + cost = 0.0 + if input_price is not None and usage.input is not None: + cost += usage.input * input_price + if output_price is not None and usage.output is not None: + cost += usage.output * output_price + return cost diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 46f2d87..82e10d2 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -26,15 +26,17 @@ ToolDefinition, ) from ldai_optimizer.prompts import ( + _acceptance_criteria_implies_cost_optimization, _acceptance_criteria_implies_duration_optimization, build_new_variation_prompt, variation_prompt_acceptance_criteria, + variation_prompt_cost_optimization, variation_prompt_feedback, variation_prompt_improvement_instructions, variation_prompt_overfit_warning, variation_prompt_preamble, ) -from ldai_optimizer.util import interpolate_variables +from ldai_optimizer.util import estimate_cost, interpolate_variables from ldai_optimizer.util import ( restore_variable_placeholders, ) @@ -235,6 +237,43 @@ def test_converts_object_with_to_dict(self): # --------------------------------------------------------------------------- +class TestIsTokenLimitExceeded: + def _client_with_limit(self, limit): + client = _make_client() + client._options = _make_options(token_limit=limit) + return client + + def test_no_limit_returns_false(self): + client = self._client_with_limit(None) + client._total_token_usage = 9999 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_treated_as_no_limit(self): + client = self._client_with_limit(0) + client._total_token_usage = 0 + assert client._is_token_limit_exceeded() is False + + def test_zero_limit_with_high_usage_returns_false(self): + client = self._client_with_limit(0) + client._total_token_usage = 100_000 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_not_yet_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 999 + assert client._is_token_limit_exceeded() is False + + def test_positive_limit_exactly_reached(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1000 + assert client._is_token_limit_exceeded() is True + + def test_positive_limit_exceeded(self): + client = self._client_with_limit(1000) + client._total_token_usage = 1001 + assert client._is_token_limit_exceeded() is True + + class TestEvaluateResponse: def setup_method(self): self.client = _make_client() @@ -496,7 +535,7 @@ async def test_duration_context_added_to_instructions_when_latency_keyword_prese ) _, config, _, _ = self.handle_judge_call.call_args.args assert "1500ms" in config.instructions - assert "mention the duration" in config.instructions + assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): """When history[0] has a duration, the judge instructions include a baseline comparison.""" @@ -1803,11 +1842,11 @@ async def test_apply_variation_response_calls_restore_and_logs_warning(self): with patch("ldai_optimizer.client.logger") as mock_logger: await client._generate_new_variation(iteration=1, variables={}) - warning_calls = [ - call for call in mock_logger.warning.call_args_list + debug_calls = [ + call for call in mock_logger.debug.call_args_list if "user-123" in str(call) or "business" in str(call) ] - assert len(warning_calls) >= 1 + assert len(debug_calls) >= 1 assert "{{user_id}}" in client._current_instructions assert "user-123" not in client._current_instructions @@ -4656,3 +4695,385 @@ def test_mixed_judges_feedback_reflects_correct_pass_fail(self): # Both should be PASSED — relevance high enough, toxicity low enough assert result.count("PASSED") == 2 assert "FAILED" not in result + + +# --------------------------------------------------------------------------- +# estimate_cost helper +# --------------------------------------------------------------------------- + + +class TestEstimateCost: + def _usage(self, total=100, inp=60, out=40) -> TokenUsage: + return TokenUsage(total=total, input=inp, output=out) + + def test_returns_none_when_usage_is_none(self): + assert estimate_cost(None, {"costPerInputToken": 0.001}) is None + + def test_uses_pricing_when_available(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_uses_only_input_price_when_output_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerInputToken": 0.001} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001) + + def test_uses_only_output_price_when_input_absent(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = {"costPerOutputToken": 0.002} + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(40 * 0.002) + + def test_returns_none_when_no_pricing_in_config(self): + usage = self._usage(total=100) + assert estimate_cost(usage, {}) is None + + def test_returns_none_when_model_config_none(self): + usage = self._usage(total=250) + assert estimate_cost(usage, None) is None + + def test_ignores_cached_input_token_price(self): + usage = self._usage(total=100, inp=60, out=40) + model_config = { + "costPerInputToken": 0.001, + "costPerOutputToken": 0.002, + "costPerCachedInputToken": 0.0005, + } + cost = estimate_cost(usage, model_config) + assert cost == pytest.approx(60 * 0.001 + 40 * 0.002) + + def test_zero_usage_with_pricing_returns_zero(self): + usage = TokenUsage(total=0, input=0, output=0) + model_config = {"costPerInputToken": 0.001, "costPerOutputToken": 0.002} + assert estimate_cost(usage, model_config) == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# _acceptance_criteria_implies_cost_optimization +# --------------------------------------------------------------------------- + + +class TestAcceptanceCriteriaImpliesCostOptimization: + def _judge(self, statement: str) -> Dict[str, OptimizationJudge]: + return {"j": OptimizationJudge(threshold=0.9, acceptance_statement=statement)} + + def test_returns_false_when_judges_none(self): + assert _acceptance_criteria_implies_cost_optimization(None) is False + + def test_returns_false_when_no_acceptance_statements(self): + judges = {"j": OptimizationJudge(threshold=0.9, judge_key="some-judge")} + assert _acceptance_criteria_implies_cost_optimization(judges) is False + + def test_detects_cheap(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Keep it cheap.")) + + def test_detects_cost(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce overall cost.")) + + def test_detects_costs_plural(self): + assert _acceptance_criteria_implies_cost_optimization( + self._judge("Keep the costs stable or lower them.") + ) + + def test_detects_budget(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Stay within budget.")) + + def test_does_not_detect_token_to_avoid_false_positives(self): + assert not _acceptance_criteria_implies_cost_optimization(self._judge("Generate a valid authentication token.")) + + def test_detects_billing(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Minimize billing.")) + + def test_detects_spend(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("Reduce spend on API calls.")) + + def test_case_insensitive(self): + assert _acceptance_criteria_implies_cost_optimization(self._judge("BUDGET FRIENDLY response")) + + def test_no_match_on_unrelated_statement(self): + assert not _acceptance_criteria_implies_cost_optimization( + self._judge("Respond accurately and concisely.") + ) + + def test_multiple_judges_one_matches(self): + judges = { + "j1": OptimizationJudge(threshold=0.9, acceptance_statement="Be accurate."), + "j2": OptimizationJudge(threshold=0.9, acceptance_statement="Keep costs low."), + } + assert _acceptance_criteria_implies_cost_optimization(judges) + + +# --------------------------------------------------------------------------- +# _evaluate_cost +# --------------------------------------------------------------------------- + + +class TestEvaluateCost: + def setup_method(self): + self.client = _make_client() + self.client._agent_key = "test-agent" + self.client._initialize_class_members_from_config(_make_agent_config()) + self.client._options = _make_options() + + def _ctx(self, cost: float, iteration: int = 2) -> OptimizationContext: + return OptimizationContext( + scores={}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def _seed_history(self, baseline_cost: float): + self.client._history = [self._ctx(baseline_cost, iteration=1)] + + def test_passes_when_cost_improved_beyond_tolerance(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.007)) is True + + def test_fails_when_cost_not_improved_enough(self): + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(0.009)) is False + + def test_passes_at_exact_tolerance_boundary(self): + self._seed_history(0.010) + # 0.010 * 0.80 = 0.008; must be strictly less than 0.008 + assert self.client._evaluate_cost(self._ctx(0.0079)) is True + assert self.client._evaluate_cost(self._ctx(0.008)) is False + + def test_skips_gracefully_when_history_empty(self): + self.client._history = [] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_baseline_cost_none(self): + self.client._history = [self._ctx(None)] # type: ignore[arg-type] + assert self.client._evaluate_cost(self._ctx(0.005)) is True + + def test_skips_gracefully_when_candidate_cost_none(self): + self._seed_history(0.010) + ctx = self._ctx(None) # type: ignore[arg-type] + assert self.client._evaluate_cost(ctx) is True + + def test_skips_gracefully_when_units_differ_across_model_switch(self): + # If baseline was captured with pricing (USD) but candidate has no pricing, + # candidate cost is None and the gate skips rather than comparing incompatible units. + self._seed_history(0.010) + assert self.client._evaluate_cost(self._ctx(None)) is True + + +# --------------------------------------------------------------------------- +# variation_prompt_cost_optimization +# --------------------------------------------------------------------------- + + +class TestVariationPromptCostOptimization: + def test_section_header_present(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "## Cost Optimization:" in result + + def test_mentions_available_models(self): + result = variation_prompt_cost_optimization(["gpt-4o", "gpt-4o-mini"]) + assert "gpt-4o" in result + + def test_mentions_quality_primary(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "primary objective" in result.lower() + + def test_mentions_token_reduction(self): + result = variation_prompt_cost_optimization(["gpt-4o"]) + assert "token" in result.lower() + + +class TestBuildNewVariationPromptCost: + def _make_history(self) -> list: + return [ + OptimizationContext( + scores={}, + completion_response="response", + current_instructions="instructions", + current_parameters={}, + current_variables={}, + iteration=1, + ) + ] + + def test_cost_section_absent_by_default(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst" + ) + assert "Cost Optimization" not in result + + def test_cost_section_included_when_flag_set(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_cost=True, + ) + assert "Cost Optimization" in result + + def test_duration_and_cost_sections_both_present(self): + result = build_new_variation_prompt( + self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + optimize_for_duration=True, + optimize_for_cost=True, + ) + assert "Duration Optimization" in result + assert "Cost Optimization" in result + + +# --------------------------------------------------------------------------- +# variation_prompt_feedback shows estimated_cost_usd +# --------------------------------------------------------------------------- + + +class TestVariationPromptFeedbackCost: + def _make_ctx(self, cost: float | None, iteration: int = 1) -> OptimizationContext: + return OptimizationContext( + scores={"judge": JudgeResult(score=0.9)}, + completion_response="ok", + current_instructions="inst", + current_parameters={}, + current_variables={}, + iteration=iteration, + estimated_cost_usd=cost, + ) + + def test_cost_shown_when_present(self): + ctx = self._make_ctx(0.001234) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost: $0.001234" in result + + def test_cost_omitted_when_none(self): + ctx = self._make_ctx(None) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx], judges) + assert "Estimated agent cost" not in result + + def test_cost_shown_per_iteration(self): + ctx1 = self._make_ctx(0.001, iteration=1) + ctx2 = self._make_ctx(0.0007, iteration=2) + judges = {"judge": OptimizationJudge(threshold=0.8, acceptance_statement="Be good.")} + result = variation_prompt_feedback([ctx1, ctx2], judges) + assert "$0.001000" in result + assert "$0.000700" in result + + +# --------------------------------------------------------------------------- +# _evaluate_acceptance_judge cost augmentation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestEvaluateAcceptanceJudgeCostAugmentation: + def setup_method(self): + self.mock_ldai = _make_ldai_client() + self.client = _make_client(self.mock_ldai) + agent_config = _make_agent_config() + self.client._agent_key = "test-agent" + self.client._agent_config = agent_config + self.client._initialize_class_members_from_config(agent_config) + handle_judge_call = AsyncMock(return_value=OptimizationResponse(output=JUDGE_PASS_RESPONSE)) + self.client._options = _make_options(handle_judge_call=handle_judge_call) + self.client._model_configs = [] + + def _cost_judge(self) -> OptimizationJudge: + return OptimizationJudge( + threshold=0.9, + acceptance_statement="Keep costs low and stay within budget.", + ) + + def _set_pricing(self): + """Give the client a model config with pricing so estimate_cost returns USD.""" + self.client._current_model = "gpt-4o" + self.client._model_configs = [ + {"id": "gpt-4o", "costPerInputToken": 0.000005, "costPerOutputToken": 0.000015} + ] + + async def test_cost_context_injected_into_instructions(self): + self._set_pricing() + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured, "handle_judge_call was not called" + instructions = captured[0] + assert "60 input tokens" in instructions + assert "40 output tokens" in instructions + + async def test_cost_context_not_injected_for_non_cost_judge(self): + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + non_cost_judge = OptimizationJudge( + threshold=0.9, + acceptance_statement="Be accurate and concise.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="quality-judge", + optimization_judge=non_cost_judge, + completion_response="response", + iteration=1, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + # The cost-specific augmentation phrase should not appear + assert "cost/token-usage goal" not in instructions + + async def test_baseline_cost_shown_when_history_present(self): + self._set_pricing() + usage = TokenUsage(total=100, input=60, output=40) + captured: list = [] + + async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): + captured.append(judge_config.instructions) + return OptimizationResponse(output=JUDGE_PASS_RESPONSE) + + baseline_ctx = OptimizationContext( + scores={}, + completion_response="", + current_instructions="", + current_parameters={}, + current_variables={}, + iteration=1, + estimated_cost_usd=500.0, + ) + self.client._history = [baseline_ctx] + self.client._options = _make_options(handle_judge_call=_capture_judge_call) + await self.client._evaluate_acceptance_judge( + judge_key="cost-judge", + optimization_judge=self._cost_judge(), + completion_response="response", + iteration=2, + reasoning_history="", + user_input="question", + agent_usage=usage, + ) + assert captured + instructions = captured[0] + assert "baseline" in instructions.lower()