Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 133 additions & 7 deletions packages/optimization/src/ldai_optimizer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
LDApiClient,
)
from ldai_optimizer.prompts import (
_acceptance_criteria_implies_cost_optimization,
_acceptance_criteria_implies_duration_optimization,
build_message_history_text,
build_new_variation_prompt,
Expand All @@ -57,6 +58,7 @@
from ldai_optimizer.util import (
RedactionFilter,
await_if_needed,
estimate_cost,
extract_json_from_response,
generate_slug,
interpolate_variables,
Expand Down Expand Up @@ -128,6 +130,11 @@ def _compute_validation_count(pool_size: int) -> int:
# under 80% of the baseline — i.e. at least 20% improvement.
_DURATION_TOLERANCE = 0.80

# Cost gate: a candidate must cost at most this fraction of the baseline
# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a
# cost reduction goal. 0.80 means at least 20% cheaper than the baseline.
_COST_TOLERANCE = 0.80

# Maps SDK status strings to the API status/activity values expected by
# agent_optimization_result records. Defined at module level to avoid
# allocating the dict on every on_status_update invocation.
Expand Down Expand Up @@ -160,6 +167,7 @@ def __init__(self, ldClient: LDAIClient) -> None:
self._last_optimization_result_id: Optional[str] = None
self._initial_tool_keys: List[str] = []
self._total_token_usage: int = 0
self._model_configs: List[Dict[str, Any]] = []

if os.environ.get("LAUNCHDARKLY_API_KEY"):
self._has_api_key = True
Expand Down Expand Up @@ -392,6 +400,7 @@ async def _call_judges(
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> Dict[str, JudgeResult]:
"""
Call all judges in parallel (auto-path).
Expand All @@ -411,6 +420,8 @@ async def _call_judges(
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
Forwarded to acceptance judges whose statement implies a latency goal so they
can mention the duration change in their rationale.
:param agent_usage: Token usage from the agent call. Forwarded to acceptance judges
whose statement implies a cost goal so they can mention token usage in their rationale.
:return: Dictionary of judge results (score and rationale)
"""
if not self._options.judges:
Expand Down Expand Up @@ -464,6 +475,7 @@ async def _call_judges(
agent_tools=resolved_agent_tools,
expected_response=expected_response,
agent_duration_ms=agent_duration_ms,
agent_usage=agent_usage,
)
judge_results[judge_key] = result

Expand Down Expand Up @@ -682,6 +694,7 @@ async def _evaluate_acceptance_judge(
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> JudgeResult:
"""
Evaluate using an acceptance statement judge.
Expand All @@ -699,6 +712,8 @@ async def _evaluate_acceptance_judge(
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
When the acceptance statement implies a latency goal, the judge is instructed
to mention the duration change in its rationale.
:param agent_usage: Token usage from the agent call. When the acceptance statement
implies a cost goal, the judge is instructed to mention token usage and cost.
:return: The judge result with score and rationale
"""
if not optimization_judge.acceptance_statement:
Expand Down Expand Up @@ -757,8 +772,50 @@ async def _evaluate_acceptance_judge(
f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. "
)
instructions += (
"Please mention the duration and any change from baseline in your rationale."
"In your rationale, state the duration and any change from baseline. "
"If the latency goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"response time — for example: switching to a faster model, shortening the "
"system prompt, or removing instructions that cause multi-step reasoning. "
"These suggestions will be used directly to generate the next variation."
)

if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}):
current_cost = estimate_cost(
agent_usage,
_find_model_config(self._current_model or "", self._model_configs),
)
baseline_cost = (
self._history[0].estimated_cost_usd
if self._history and self._history[0].estimated_cost_usd is not None
else None
)
if current_cost is not None:
instructions += (
f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. "
)
if agent_usage is not None:
instructions += (
f"The agent's response used {agent_usage.input} input tokens "
f"and {agent_usage.output} output tokens "
f"(estimated cost: ${current_cost:.6f}). "
)
if baseline_cost is not None:
delta = current_cost - baseline_cost
direction = "less" if delta < 0 else "more"
instructions += (
f"The baseline cost (first iteration) was ${baseline_cost:.6f}. "
f"This response cost ${abs(delta):.6f} {direction} than the baseline. "
)
instructions += (
"In your rationale, state the token usage and cost, and any change from baseline. "
"If the cost goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"cost — for example: switching to a cheaper model, shortening the system prompt "
"to reduce input tokens, removing unnecessary output instructions, or tightening "
"response length constraints. "
"These suggestions will be used directly to generate the next variation."
)

if resolved_variables:
instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}"
Expand Down Expand Up @@ -1082,6 +1139,11 @@ async def _run_ground_truth_optimization(
):
sample_passed = self._evaluate_duration(optimize_context)

if sample_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
sample_passed = self._evaluate_cost(optimize_context)

if not sample_passed:
logger.info(
"[GT Attempt %d] -> Sample %d/%d FAILED",
Expand Down Expand Up @@ -1227,12 +1289,19 @@ def _apply_new_variation_response(
# This is a deterministic safety net for when the LLM ignores the prompt
# instructions and hardcodes a concrete value (e.g. "user-123") instead
# of the placeholder ("{{user_id}}").
# Only check the variables that were actually used for this invocation so
# we don't spuriously replace values that happen to appear in other choices.
active_variables = (
[variation_ctx.current_variables]
if variation_ctx.current_variables
else self._options.variable_choices
)
self._current_instructions, placeholder_warnings = restore_variable_placeholders(
self._current_instructions,
self._options.variable_choices,
active_variables,
)
for msg in placeholder_warnings:
logger.warning("[Iteration %d] -> %s", iteration, msg)
logger.debug("[Iteration %d] -> %s", iteration, msg)

self._current_parameters = response_data["current_parameters"]

Expand Down Expand Up @@ -1321,6 +1390,9 @@ async def _generate_new_variation(
optimize_for_duration = _acceptance_criteria_implies_duration_optimization(
self._options.judges
)
optimize_for_cost = _acceptance_criteria_implies_cost_optimization(
self._options.judges
)
instructions = build_new_variation_prompt(
self._history,
self._options.judges,
Expand All @@ -1331,6 +1403,7 @@ async def _generate_new_variation(
self._options.variable_choices,
self._initial_instructions,
optimize_for_duration=optimize_for_duration,
optimize_for_cost=optimize_for_cost,
)

# Create a flat history list (without nested history) to avoid exponential growth
Expand Down Expand Up @@ -1424,6 +1497,7 @@ async def optimize_from_config(
model_configs = api_client.get_model_configs(options.project_key)
except Exception as exc:
logger.debug("Could not pre-fetch model configs: %s", exc)
self._model_configs = model_configs

context = random.choice(options.context_choices)
# _get_agent_config calls _initialize_class_members_from_config internally;
Expand Down Expand Up @@ -1793,18 +1867,24 @@ async def _execute_agent_turn(
agent_tools=agent_tools,
expected_response=expected_response,
agent_duration_ms=agent_duration_ms,
agent_usage=agent_response.usage,
)

# Build the fully-populated result context before firing the evaluating event so
# the PATCH includes scores, generationLatency, and completionResponse. This is
# particularly important for non-final GT samples which receive no further status
# events — without this, those fields would never be written to their API records.
agent_cost = estimate_cost(
agent_response.usage,
_find_model_config(self._current_model or "", self._model_configs),
)
result_ctx = dataclasses.replace(
optimize_context,
completion_response=completion_response,
scores=scores,
duration_ms=agent_duration_ms,
usage=agent_response.usage,
estimated_cost_usd=agent_cost,
)

if self._options.judges:
Expand All @@ -1829,13 +1909,13 @@ def _accumulate_tokens(self, optimize_context: OptimizationContext) -> None:
def _is_token_limit_exceeded(self) -> bool:
"""Return True if the accumulated token usage has met or exceeded the configured limit.

Returns False when no token limit is set so callers can use this as a
simple guard without needing to check for ``None`` themselves.
Returns False when no token limit is set, or when the limit is 0 (which is
treated as "no limit" — a sentinel value meaning the field was left unset).

:return: True if token limit is set and ``_total_token_usage >= token_limit``.
:return: True if a positive token limit is set and ``_total_token_usage >= token_limit``.
"""
limit: Optional[int] = getattr(self._options, "token_limit", None)
return limit is not None and self._total_token_usage >= limit
return bool(limit) and self._total_token_usage >= limit

def _evaluate_response(self, optimize_context: OptimizationContext) -> bool:
"""
Expand Down Expand Up @@ -1896,6 +1976,42 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool:
)
return passed

def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool:
"""
Check whether the candidate's estimated cost meets the improvement target vs. the baseline.

The baseline is history[0].estimated_cost_usd — the very first completed iteration,
representing the original unoptimized configuration's cost. The candidate must be
at least _COST_TOLERANCE cheaper (default: 20% improvement).

The cost value is in USD when model pricing data is available, or raw total token
count as a proxy when pricing is absent. Both are comparable relative to their
own baselines.

Returns True without blocking when no baseline is available (empty history or
history[0].estimated_cost_usd is None), or when the candidate's cost was not
captured. This avoids penalising configurations when cost data is missing.

:param optimize_context: The completed turn context containing estimated_cost_usd
:return: True if the cost requirement is met or cannot be checked
"""
if not self._history or self._history[0].estimated_cost_usd is None:
return True
if optimize_context.estimated_cost_usd is None:
return True
baseline = self._history[0].estimated_cost_usd
passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE
Comment thread
cursor[bot] marked this conversation as resolved.
if not passed:
logger.warning(
"[Iteration %d] -> Cost check failed: %.6f >= baseline %.6f * %.0f%% (%.6f)",
optimize_context.iteration,
optimize_context.estimated_cost_usd,
baseline,
_COST_TOLERANCE * 100,
baseline * _COST_TOLERANCE,
)
return passed

def _handle_success(
self, optimize_context: OptimizationContext, iteration: int
) -> Any:
Expand Down Expand Up @@ -2174,6 +2290,11 @@ async def _run_validation_phase(
):
sample_passed = self._evaluate_duration(val_ctx)

if sample_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
sample_passed = self._evaluate_cost(val_ctx)

last_ctx = val_ctx

if not sample_passed:
Expand Down Expand Up @@ -2298,6 +2419,11 @@ async def _run_optimization(
):
initial_passed = self._evaluate_duration(optimize_context)

if initial_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
initial_passed = self._evaluate_cost(optimize_context)

if initial_passed:
all_valid, last_ctx = await self._run_validation_phase(
optimize_context, iteration
Expand Down
3 changes: 3 additions & 0 deletions packages/optimization/src/ldai_optimizer/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class OptimizationContext:
iteration: int = 0 # current iteration number
duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds
usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration
estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens

def copy_without_history(self) -> OptimizationContext:
"""
Expand All @@ -236,6 +237,7 @@ def copy_without_history(self) -> OptimizationContext:
iteration=self.iteration,
duration_ms=self.duration_ms,
usage=self.usage,
estimated_cost_usd=self.estimated_cost_usd,
)

def to_json(self) -> Dict[str, Any]:
Expand All @@ -261,6 +263,7 @@ def to_json(self) -> Dict[str, Any]:
"history": history_list,
"iteration": self.iteration,
"duration_ms": self.duration_ms,
"estimated_cost_usd": self.estimated_cost_usd,
}
if self.usage is not None:
result["usage"] = {
Expand Down
Loading
Loading