From af4ec5dd607aeb3a7a2de2a74125289c17d97e44 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 3 Jun 2026 10:12:36 -0800 Subject: [PATCH 1/4] changes cost & latency optimization to post-process --- .../optimization/src/ldai_optimizer/client.py | 274 +++++++++++++++--- .../src/ldai_optimizer/dataclasses.py | 1 + .../src/ldai_optimizer/prompts.py | 123 +++++++- packages/optimization/tests/test_client.py | 138 +++++---- 4 files changed, 409 insertions(+), 127 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 7a901660..9affb1f0 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -53,6 +53,7 @@ build_message_history_text, build_new_variation_prompt, build_reasoning_history, + build_token_latency_variation_prompt, ) from ldai_optimizer.util import ( RedactionFilter, @@ -222,6 +223,7 @@ def __init__(self, ldClient: LDAIClient) -> None: self._total_token_usage: int = 0 self._model_configs: List[Dict[str, Any]] = [] self._last_batch_size: int = 1 + self._in_cost_latency_phase: bool = False if os.environ.get("LAUNCHDARKLY_API_KEY"): self._has_api_key = True @@ -904,7 +906,8 @@ async def _evaluate_acceptance_judge( ) if ( - agent_duration_ms is not None + self._in_cost_latency_phase + and agent_duration_ms is not None and bool(self._options.latency_optimization) ): baseline_ms = self._baseline_duration_ms @@ -922,13 +925,12 @@ async def _evaluate_acceptance_judge( instructions += ( "In your rationale, state the duration and any change from baseline. " "If the latency goal is not yet met, include specific, actionable suggestions " - "for how the agent's instructions or model choice could be changed to reduce " - "response time — for example: switching to a faster model, shortening the " - "system prompt, or removing instructions that cause multi-step reasoning. " + "for how the model choice or parameters could be changed to reduce " + "response time — for example: switching to a faster model or reducing max_tokens. " "These suggestions will be used directly to generate the next variation." ) - if bool(self._options.token_optimization): + if self._in_cost_latency_phase and bool(self._options.token_optimization): current_cost = estimate_cost( agent_usage, _find_model_config(self._current_model or "", self._model_configs), @@ -954,10 +956,8 @@ async def _evaluate_acceptance_judge( instructions += ( "In your rationale, state the token usage and cost, and any change from baseline. " "If the cost goal is not yet met, include specific, actionable suggestions " - "for how the agent's instructions or model choice could be changed to reduce " - "cost — for example: switching to a cheaper model, shortening the system prompt " - "to reduce input tokens, removing unnecessary output instructions, or tightening " - "response length constraints. " + "for how the model choice or parameters could be changed to reduce " + "cost — for example: switching to a cheaper model or reducing max_tokens. " "These suggestions will be used directly to generate the next variation." ) @@ -1366,13 +1366,6 @@ async def _run_ground_truth_optimization( else: sample_passed = self._evaluate_response(optimize_context) - sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context) - sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context) - - # Flush gate scores to the API for this sample. Without this, - # the next sample's "generating" event closes out this record - # with a status-only PATCH before gate scores are sent, so only - # the last sample would ever show latency/cost gate entries. self._safe_status_update("evaluating", optimize_context, linear_iter) if not sample_passed: @@ -1462,6 +1455,19 @@ async def _run_ground_truth_optimization( logger.exception( "[GT Attempt %d] -> on_passing_result callback failed", attempt ) + # Phase 2: optimize model/params on the frozen winning variation. + if ( + self._options.latency_optimization + or self._options.token_optimization + ) and not self._is_token_limit_exceeded(): + phase1_winner = self._last_succeeded_context + await self._run_cost_latency_phase( + last_ctx, + last_ctx.iteration, + ) + if self._last_succeeded_context is None: + self._last_run_succeeded = True + self._last_succeeded_context = phase1_winner return attempt_results # We've hit max attempts for the batches, bail at this point @@ -1561,7 +1567,18 @@ def _apply_new_variation_response( f"Received fields: {list(response_data.keys())}" ) - self._current_instructions = response_data["current_instructions"] + new_instructions = response_data["current_instructions"] + + if self._in_cost_latency_phase: + if new_instructions != self._current_instructions: + logger.warning( + "[Iteration %d] -> Phase 2 (cost/latency): LLM attempted to change instructions; " + "restoring frozen winning variation instructions to enforce content lock.", + iteration, + ) + new_instructions = self._current_instructions + + self._current_instructions = new_instructions # Post-process: replace any leaked variable values back to {{key}} form. # This is a deterministic safety net for when the LLM ignores the prompt @@ -1690,22 +1707,24 @@ async def _generate_new_variation( ) self._safe_status_update("generating variation", status_ctx, iteration) - optimize_for_duration = bool(self._options.latency_optimization) - optimize_for_cost = bool(self._options.token_optimization) - quality_already_passing = self._all_judges_passing() - instructions = build_new_variation_prompt( - self._history, - self._options.judges, - self._current_model, - self._current_instructions, - self._current_parameters, - self._options.model_choices, - self._options.variable_choices, - self._initial_instructions, - optimize_for_duration=optimize_for_duration, - optimize_for_cost=optimize_for_cost, - quality_already_passing=quality_already_passing, - ) + if self._in_cost_latency_phase: + instructions = build_token_latency_variation_prompt( + self._history, + self._options.model_choices, + optimize_for_latency=bool(self._options.latency_optimization), + optimize_for_cost=bool(self._options.token_optimization), + ) + else: + instructions = build_new_variation_prompt( + self._history, + self._options.judges, + self._current_model, + self._current_instructions, + self._current_parameters, + self._options.model_choices, + self._options.variable_choices, + self._initial_instructions, + ) # Create a flat history list (without nested history) to avoid exponential growth flat_history = [prev_ctx.copy_without_history() for prev_ctx in self._history] @@ -2523,6 +2542,175 @@ def _handle_failure( ) return optimize_context + def _pick_best_candidate( + self, candidates: List[OptimizationContext] + ) -> OptimizationContext: + """Select the best Phase 2 candidate by normalized combined cost/latency score. + + Ranks all candidates using the sum of their normalized metrics: + score = (duration_ms / baseline_duration_ms) + (estimated_cost_usd / baseline_cost_usd) + + Terms whose baseline or measurement is unavailable are omitted from the sum. + The candidate with the lowest score wins. If scores are equal, the first + candidate (earliest iteration) is returned. + + :param candidates: Non-empty list of passing Phase 2 OptimizationContexts. + :return: The best-scoring candidate. + """ + def _score(ctx: OptimizationContext) -> float: + total = 0.0 + if ( + ctx.duration_ms is not None + and self._baseline_duration_ms is not None + and self._baseline_duration_ms > 0 + ): + total += ctx.duration_ms / self._baseline_duration_ms + if ( + ctx.estimated_cost_usd is not None + and self._baseline_cost_usd is not None + and self._baseline_cost_usd > 0 + ): + total += ctx.estimated_cost_usd / self._baseline_cost_usd + return total + + return min(candidates, key=_score) + + async def _run_cost_latency_phase( + self, + winning_ctx: OptimizationContext, + last_iteration: int, + ) -> None: + """Run Phase 2: optimize model and parameters for cost/latency on the frozen winning variation. + + The agent's content (instructions, tools) is frozen from the Phase 1 winner. + Only model and parameters may be adjusted. One turn is executed per model + choice (or at least two turns total), collecting passing candidates, then + the best is selected by combined normalized cost/latency score. + + Phase 2 always uses a single turn per iteration regardless of whether the + Phase 1 run was in GT mode — the winning user_input and variables from the + Phase 1 winner are reused for every Phase 2 turn. + + :param winning_ctx: The Phase 1 winning OptimizationContext. + :param last_iteration: The last iteration number from Phase 1; Phase 2 + continues from last_iteration + 1. + """ + self._in_cost_latency_phase = True + self._history = [winning_ctx] + self._current_instructions = winning_ctx.current_instructions + self._current_parameters = winning_ctx.current_parameters.copy() + self._current_model = winning_ctx.current_model + + frozen_variables = winning_ctx.current_variables + frozen_user_input = winning_ctx.user_input + + # Build a deterministic, deduplicated list of models to evaluate: + # start with the Phase 1 winner's model, then add each model_choice + # that hasn't been seen yet. This guarantees every user-selected model + # is tried exactly once, in a predictable order. + phase1_model = winning_ctx.current_model or "" + seen_models: set = {phase1_model} + ordered_models: List[str] = [phase1_model] + for m in self._options.model_choices or []: + if m not in seen_models: + seen_models.add(m) + ordered_models.append(m) + # Ensure at least 2 iterations + while len(ordered_models) < 2: + ordered_models.append(ordered_models[-1]) + + candidates: List[OptimizationContext] = [] + non_candidates: List[OptimizationContext] = [] + max_iters = len(ordered_models) + iteration = last_iteration + + for i in range(max_iters): + iteration += 1 + + # Cycle to the next scheduled model. Instructions and parameters + # are always reset to the Phase 1 winner's frozen values so only + # the model varies — Phase 2 verifies the winning content still + # passes under each candidate model. + self._current_model = ordered_models[i] + self._current_parameters = winning_ctx.current_parameters.copy() + logger.info( + "[Phase 2 Iter %d] -> Evaluating model '%s' (%d/%d)", + iteration, + self._current_model, + i + 1, + max_iters, + ) + + ctx = self._create_optimization_context( + iteration=iteration, + variables=frozen_variables, + user_input=frozen_user_input, + ) + self._safe_status_update("generating", ctx, iteration) + try: + ctx = await self._execute_agent_turn(ctx, iteration) + except Exception: + logger.warning( + "[Phase 2 Iter %d] -> Agent call failed (model=%s); " + "skipping this model and trying the next", + iteration, + self._current_model, + ) + non_candidates.append(ctx) + continue + self._accumulate_tokens(ctx) + ctx = dataclasses.replace( + ctx, accumulated_token_usage=self._total_token_usage + ) + + quality_passed = self._evaluate_response(ctx) + quality_passed, ctx = self._apply_duration_gate(quality_passed, ctx) + quality_passed, ctx = self._apply_cost_gate(quality_passed, ctx) + + if self._is_token_limit_exceeded(): + logger.warning( + "[Phase 2 Iter %d] -> Token limit exceeded; stopping Phase 2 early", + iteration, + ) + non_candidates.append(ctx) + break + + if quality_passed: + logger.info( + "[Phase 2 Iter %d] -> Passed quality + gates — added to candidates", + iteration, + ) + candidates.append(ctx) + else: + logger.info( + "[Phase 2 Iter %d] -> Failed quality or gates", iteration + ) + non_candidates.append(ctx) + + if i < max_iters - 1: + self._safe_status_update("turn completed", ctx, iteration) + + # Report results: fail non-winners first (preserving _last_succeeded_context + # as the Phase 1 winner until the very end), then succeed the best candidate. + for failed_ctx in non_candidates: + self._handle_failure(failed_ctx, failed_ctx.iteration) + + if candidates: + best = self._pick_best_candidate(candidates) + self._handle_success(best, best.iteration) + logger.info( + "[Phase 2] -> Best candidate selected: model=%s, duration_ms=%s, cost=%s", + best.current_model, + f"{best.duration_ms:.0f}ms" if best.duration_ms is not None else "N/A", + f"${best.estimated_cost_usd:.6f}" if best.estimated_cost_usd is not None else "N/A", + ) + else: + logger.info( + "[Phase 2] -> No candidates passed; keeping Phase 1 winner as final result" + ) + + self._in_cost_latency_phase = False + def _commit_variation( self, optimize_context: OptimizationContext, @@ -2595,6 +2783,8 @@ def _commit_variation( "instructions": optimize_context.current_instructions, "modelConfigKey": model_config_key, } + if optimize_context.current_parameters: + payload["parameters"] = optimize_context.current_parameters if self._initial_tool_keys: payload["toolKeys"] = list(self._initial_tool_keys) if self._initial_model_custom: @@ -2741,9 +2931,6 @@ async def _run_validation_phase( else: sample_passed = self._evaluate_response(val_ctx) - sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx) - sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx) - if self._is_token_limit_exceeded(): logger.error( "[Validation %d/%d] -> Token limit exceeded (total=%d)", @@ -2869,9 +3056,6 @@ async def _run_optimization( iteration, ) - initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context) - initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context) - # Token limit check after pass/fail evaluation so the persisted record # correctly reflects whether the iteration passed before stopping the run. if self._is_token_limit_exceeded(): @@ -2887,7 +3071,17 @@ async def _run_optimization( optimize_context, iteration ) if all_valid: - return self._handle_success(optimize_context, iteration) + self._handle_success(optimize_context, iteration) + phase1_winner = self._last_succeeded_context + if ( + self._options.latency_optimization + or self._options.token_optimization + ) and not self._is_token_limit_exceeded(): + await self._run_cost_latency_phase(optimize_context, iteration) + if self._last_succeeded_context is None: + self._last_run_succeeded = True + self._last_succeeded_context = phase1_winner + return self._last_succeeded_context if self._is_token_limit_exceeded(): return self._handle_failure(last_ctx, iteration) # Validation failed — treat as a normal failed attempt. diff --git a/packages/optimization/src/ldai_optimizer/dataclasses.py b/packages/optimization/src/ldai_optimizer/dataclasses.py index 2d45b909..9a308265 100644 --- a/packages/optimization/src/ldai_optimizer/dataclasses.py +++ b/packages/optimization/src/ldai_optimizer/dataclasses.py @@ -316,6 +316,7 @@ class OptimizationJudgeContext: "turn completed", "success", "failure", + "optimizing cost/latency", ] diff --git a/packages/optimization/src/ldai_optimizer/prompts.py b/packages/optimization/src/ldai_optimizer/prompts.py index 9e42ca49..028a51f0 100644 --- a/packages/optimization/src/ldai_optimizer/prompts.py +++ b/packages/optimization/src/ldai_optimizer/prompts.py @@ -83,9 +83,6 @@ def build_new_variation_prompt( model_choices: List[str], variable_choices: List[Dict[str, Any]], initial_instructions: str, - optimize_for_duration: bool = False, - optimize_for_cost: bool = False, - quality_already_passing: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -103,13 +100,6 @@ def build_new_variation_prompt( :param model_choices: List of model IDs the LLM may select from :param variable_choices: List of variable dicts (used to derive placeholder names) :param initial_instructions: The original unmodified instructions template - :param optimize_for_duration: When True, appends a duration optimization section - instructing the LLM to prefer faster models and simpler instructions. - :param optimize_for_cost: When True, appends a cost optimization section - instructing the LLM to prefer cheaper models and reduce token usage. - :param quality_already_passing: When True, signals that all judge criteria are - currently passing and the cost optimization section should instruct the LLM - to preserve existing behavior while only reducing cost. :return: The assembled prompt string """ sections = [ @@ -123,15 +113,120 @@ def build_new_variation_prompt( variation_prompt_improvement_instructions( history, model_choices, variable_choices, initial_instructions ), - variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", - variation_prompt_cost_optimization( - model_choices, quality_already_passing=quality_already_passing - ) if optimize_for_cost else "", ] return "\n\n".join(s for s in sections if s) +def build_token_latency_variation_prompt( + history: List[OptimizationContext], + model_choices: List[str], + optimize_for_latency: bool = False, + optimize_for_cost: bool = False, +) -> str: + """ + Build the Phase 2 LLM prompt for generating a cost/latency-optimized configuration. + + The agent's content (instructions, tools) is frozen after Phase 1 quality + iterations. Only the model and parameters may be adjusted in this phase. + The LLM is strongly directed to try different models from model_choices across + iterations to ensure each model is evaluated at least once. + + :param history: Phase 2 OptimizationContexts, oldest first (starts with Phase 1 winner). + :param model_choices: List of model IDs the LLM may select from. + :param optimize_for_latency: When True, includes a latency reduction section. + :param optimize_for_cost: When True, includes a cost reduction section. + :return: The assembled Phase 2 prompt string. + """ + # Collect models already tried in Phase 2 history (skip index 0 = Phase 1 winner) + tried_models = [ctx.current_model for ctx in history[1:] if ctx.current_model] + untried_models = [m for m in model_choices if m not in tried_models] + + preamble = "\n".join([ + "You are an assistant that helps optimize an agent's model and parameters for cost and latency.", + "", + "## *** CONTENT LOCK — CRITICAL ***", + "The agent's winning configuration has been selected and its content is now FROZEN.", + "You MUST return the instructions exactly as provided below — character for character.", + "You MUST NOT modify, paraphrase, shorten, or expand the instructions in any way.", + "The ONLY things you may change are:", + " 1. The model (choose from the available model list below)", + " 2. The parameters (e.g. temperature, max_tokens — but NOT tools or instructions)", + "", + "Any change to instructions will be silently reverted. Only model and parameter changes matter here.", + ]) + + model_section = "\n".join([ + "## Model Selection:", + f"Available models: {model_choices}", + "IMPORTANT: You MUST try a DIFFERENT model from the one used in the most recent iteration.", + "The goal is to evaluate each available model at least once to find the fastest and/or cheapest option.", + ]) + if untried_models: + model_section += f"\nModels not yet tried: {untried_models} — strongly prefer one of these." + if tried_models: + model_section += f"\nModels already tried: {tried_models} — avoid repeating these unless all options are exhausted." + + config_section = variation_prompt_configuration( + history, + history[-1].current_model if history else None, + history[-1].current_instructions if history else "", + history[-1].current_parameters if history else {}, + ) + + feedback_section = _build_cost_latency_feedback(history) + + output_format = "\n".join([ + "## Output Format:", + "Return a JSON object with exactly these keys: current_instructions, current_parameters, model.", + "The current_instructions value MUST be identical to the instructions shown above.", + "Example:", + "{", + ' "current_instructions": "",', + ' "current_parameters": { "temperature": 0.5, "max_tokens": 256 },', + ' "model": "gpt-4o-mini"', + "}", + ]) + + sections = [preamble, model_section, config_section, feedback_section] + + if optimize_for_latency: + sections.append(variation_prompt_duration_optimization(model_choices)) + if optimize_for_cost: + sections.append(variation_prompt_cost_optimization(model_choices)) + + sections.append(output_format) + + return "\n\n".join(s for s in sections if s) + + +def _build_cost_latency_feedback(history: List[OptimizationContext]) -> str: + """ + Build a feedback section showing cost/latency metrics from Phase 2 history. + + :param history: Phase 2 history (index 0 = Phase 1 winner baseline). + :return: Formatted feedback string, or empty string if no history. + """ + if not history: + return "" + + lines = ["## Iteration History (Cost/Latency):"] + for i, ctx in enumerate(history): + label = "Phase 1 winner (baseline)" if i == 0 else f"Phase 2 iteration {i}" + lines.append(f"\n### {label}:") + lines.append(f"Model: {ctx.current_model}") + if ctx.duration_ms is not None: + lines.append(f"Duration: {ctx.duration_ms:.0f}ms") + if ctx.estimated_cost_usd is not None: + lines.append(f"Estimated cost: ${ctx.estimated_cost_usd:.6f}") + if ctx.scores: + for judge_key, result in ctx.scores.items(): + if not judge_key.startswith("_"): + lines.append(f"Judge [{judge_key}]: score={result.score:.3f}") + + return "\n".join(lines) + + def variation_prompt_preamble() -> str: """Static opening section for the variation generation prompt.""" return "\n".join( diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 5a1d9895..178ad5ea 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -36,6 +36,7 @@ ) from ldai_optimizer.prompts import ( build_new_variation_prompt, + build_token_latency_variation_prompt, variation_prompt_acceptance_criteria, variation_prompt_cost_optimization, variation_prompt_feedback, @@ -560,11 +561,12 @@ async def test_variables_in_context(self): assert ctx.current_variables == variables async def test_duration_context_added_when_latency_optimization_true_and_duration_provided(self): - """When latency_optimization=True and agent_duration_ms is provided, - the judge instructions mention the duration.""" + """In Phase 2 (cost/latency), when latency_optimization=True and agent_duration_ms is + provided, the judge instructions mention the duration.""" self.client._options = _make_options( handle_judge_call=self.handle_judge_call, latency_optimization=True ) + self.client._in_cost_latency_phase = True judge = OptimizationJudge(threshold=0.8, acceptance_statement="Be accurate.") await self.client._evaluate_acceptance_judge( judge_key="speed", @@ -580,10 +582,11 @@ async def test_duration_context_added_when_latency_optimization_true_and_duratio assert "state the duration" in config.instructions async def test_duration_context_includes_baseline_comparison_when_history_present(self): - """When a baseline duration is captured, the judge instructions include a baseline comparison.""" + """In Phase 2, when a baseline duration is captured, instructions include a baseline comparison.""" self.client._options = _make_options( handle_judge_call=self.handle_judge_call, latency_optimization=True ) + self.client._in_cost_latency_phase = True self.client._history = [ OptimizationContext( scores={}, @@ -612,10 +615,11 @@ async def test_duration_context_includes_baseline_comparison_when_history_presen assert "faster" in config.instructions async def test_duration_context_says_slower_when_candidate_is_slower(self): - """When the candidate is slower than baseline, the instructions say 'slower'.""" + """In Phase 2, when the candidate is slower than baseline, instructions say 'slower'.""" self.client._options = _make_options( handle_judge_call=self.handle_judge_call, latency_optimization=True ) + self.client._in_cost_latency_phase = True self.client._history = [ OptimizationContext( scores={}, @@ -3975,18 +3979,17 @@ def _ctx_with(self, duration_ms, score=1.0, iteration=1): duration_ms=duration_ms, ) - async def test_duration_gate_triggers_variation_when_not_fast_enough(self): - """Judge passes but duration fails threshold → variation generated → second attempt succeeds.""" + async def test_phase1_succeeds_regardless_of_duration_when_quality_passes(self): + """Phase 1 quality loop ignores duration — slow iterations still pass if quality judges pass.""" client = _make_client(self.mock_ldai) - # Iter 1: judge fails → history[0].duration_ms = 2000 - # Iter 2: judge passes, duration 1800ms ≥ 2000 * 0.80 = 1600ms → duration fails → variation - # Iter 3: judge passes, duration 1500ms < 1600ms → passes → validation → success + # Iter 1: judge fails → variation generated + # Iter 2: judge passes even with slow duration → validation → Phase 1 success + # (duration gate runs only in Phase 2, not Phase 1) execute_side_effects = [ - self._ctx_with(duration_ms=2000, score=0.2, iteration=1), # iter 1: judge fails - self._ctx_with(duration_ms=1800, score=1.0, iteration=2), # iter 2: judge passes, duration fails - self._ctx_with(duration_ms=1500, score=1.0, iteration=3), # iter 3: both pass - self._ctx_with(duration_ms=1500, score=1.0, iteration=4), # validation + self._ctx_with(duration_ms=9000, score=0.2, iteration=1), # iter 1: judge fails + self._ctx_with(duration_ms=9000, score=1.0, iteration=2), # iter 2: quality passes despite slow + self._ctx_with(duration_ms=9000, score=1.0, iteration=3), # validation passes ] handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) @@ -3998,21 +4001,23 @@ async def test_duration_gate_triggers_variation_when_not_fast_enough(self): with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - result = await client.optimize_from_options("test-agent", opts) + # Phase 2 tries to run after Phase 1 success; mock additional calls for it + # (Phase 2 uses variation-generation + agent-turn pairs, so we need more mocks) + # To keep this test focused on Phase 1 gate removal, mock _run_cost_latency_phase + with patch.object(client, "_run_cost_latency_phase", new_callable=AsyncMock): + result = await client.optimize_from_options("test-agent", opts) - assert result.duration_ms == 1500 - # 2 variations generated (after iter 1 judge fail, after iter 2 duration fail) - assert handle_agent_call.call_count == 2 - assert mock_execute.call_count == 4 + # Phase 1 succeeded despite high duration — 1 variation generated (after iter 1 judge fail) + assert result is not None + assert handle_agent_call.call_count == 1 async def test_duration_check_skipped_on_first_iteration_no_baseline(self): - """First iteration has no history → duration check always skipped → succeeds even if slow.""" + """Phase 1 never applies duration gate — first quality pass succeeds regardless of duration.""" client = _make_client(self.mock_ldai) - # Iter 1 (no history): judge passes, duration check skipped → validation - # Validation: judge passes, duration check still uses history[0] = None since nothing appended yet + # Iter 1: judge passes with high duration → validation → Phase 1 success (no gate in Phase 1) execute_side_effects = [ - self._ctx_with(duration_ms=9999, score=1.0, iteration=1), # iter 1: would fail if checked + self._ctx_with(duration_ms=9999, score=1.0, iteration=1), self._ctx_with(duration_ms=9999, score=1.0, iteration=2), # validation ] @@ -4024,10 +4029,11 @@ async def test_duration_check_skipped_on_first_iteration_no_baseline(self): with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - result = await client.optimize_from_options("test-agent", opts) + with patch.object(client, "_run_cost_latency_phase", new_callable=AsyncMock): + result = await client.optimize_from_options("test-agent", opts) - # Succeeds because history is empty and duration check is skipped - assert result.duration_ms == 9999 + # Phase 1 succeeds despite 9999ms — no duration gate in Phase 1 + assert result is not None async def test_no_duration_gate_when_latency_optimization_is_none(self): """latency_optimization=None → duration gate never applied.""" @@ -4054,21 +4060,16 @@ async def test_no_duration_gate_when_latency_optimization_is_none(self): assert result is not None - async def test_evaluate_duration_called_in_validation_phase(self): - """Duration gate also runs on validation samples, not just the primary turn.""" + async def test_phase1_validation_succeeds_regardless_of_duration(self): + """Phase 1 validation phase also does not apply the duration gate — quality only.""" client = _make_client(self.mock_ldai) - # Iter 1: judge fails → history[0].duration_ms = 2000 - # Iter 2: judge passes, duration 1500ms → primary passes - # Validation sample: judge passes, duration 1800ms ≥ 1600ms → validation fails → variation - # Iter 3: judge passes, duration 1500ms → primary passes - # Validation: judge passes, duration 1500ms → validation passes → success + # Iter 1: judge fails → variation + # Iter 2: judge passes → validation: judge passes (high duration, but no gate in Phase 1) → success execute_side_effects = [ self._ctx_with(duration_ms=2000, score=0.2, iteration=1), # iter 1: judge fails - self._ctx_with(duration_ms=1500, score=1.0, iteration=2), # iter 2: passes - self._ctx_with(duration_ms=1800, score=1.0, iteration=3), # validation: duration fails - self._ctx_with(duration_ms=1500, score=1.0, iteration=4), # iter 3: passes - self._ctx_with(duration_ms=1500, score=1.0, iteration=5), # validation: passes + self._ctx_with(duration_ms=1500, score=1.0, iteration=2), # iter 2: quality passes + self._ctx_with(duration_ms=9999, score=1.0, iteration=3), # validation: passes despite slow ] handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) @@ -4080,10 +4081,12 @@ async def test_evaluate_duration_called_in_validation_phase(self): with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - result = await client.optimize_from_options("test-agent", opts) + with patch.object(client, "_run_cost_latency_phase", new_callable=AsyncMock): + result = await client.optimize_from_options("test-agent", opts) - assert result.duration_ms == 1500 - assert mock_execute.call_count == 5 + # Validation passes even though duration is 9999ms — no gate in Phase 1 + assert result is not None + assert mock_execute.call_count == 3 # --------------------------------------------------------------------------- @@ -4107,35 +4110,19 @@ def _gt_ctx(self, duration_ms, score=1.0, iteration=1, user_input="q"): user_input=user_input, ) - async def test_duration_gate_applied_per_sample_in_ground_truth_mode(self): - """In GT mode, the duration check fires per sample, not just once per attempt.""" + async def test_phase1_gt_succeeds_regardless_of_duration_when_quality_passes(self): + """In GT Phase 1, duration gate does not apply — all-quality-passing attempts succeed.""" client = _make_client(self.mock_ldai) - # Attempt 1: - # Sample 1: judge fails (score 0.2) → all_passed = False - # Sample 2: judge passes → duration skipped (history empty for sample 2) - # → history extended with attempt 1 results → variation generated - # Attempt 2: - # Sample 1: judge passes, duration 1800ms vs baseline history[0].duration_ms = 2000ms - # → 1800 >= 1600 → duration fails → sample_passed = False → all_passed = False - # (attempt 2 fails due to duration on sample 1) - # → variation generated - # Attempt 3: - # Sample 1: judge passes, duration 1500ms < 1600ms → passes - # Sample 2: judge passes, duration 1500ms (history[0] still 2000ms) → passes - # → all_passed = True → success + # Attempt 1: judge fails on sample 1 → all_passed = False → variation + # Attempt 2: both samples pass quality despite high duration → success (no gate in Phase 1) execute_side_effects = [ # Attempt 1 self._gt_ctx(duration_ms=2000, score=0.2, iteration=1, user_input="q1"), self._gt_ctx(duration_ms=2000, score=1.0, iteration=2, user_input="q2"), - # Variation (not from _execute_agent_turn, from handle_agent_call) - # Attempt 2 - self._gt_ctx(duration_ms=1800, score=1.0, iteration=3, user_input="q1"), - self._gt_ctx(duration_ms=1800, score=1.0, iteration=4, user_input="q2"), - # Variation - # Attempt 3 - self._gt_ctx(duration_ms=1500, score=1.0, iteration=5, user_input="q1"), - self._gt_ctx(duration_ms=1500, score=1.0, iteration=6, user_input="q2"), + # Attempt 2 — passes even with slow duration because no gate in Phase 1 + self._gt_ctx(duration_ms=9000, score=1.0, iteration=3, user_input="q1"), + self._gt_ctx(duration_ms=9000, score=1.0, iteration=4, user_input="q2"), ] handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) @@ -4147,14 +4134,13 @@ async def test_duration_gate_applied_per_sample_in_ground_truth_mode(self): with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: mock_execute.side_effect = execute_side_effects - results = await client.optimize_from_ground_truth_options("test-agent", opts) + with patch.object(client, "_run_cost_latency_phase", new_callable=AsyncMock): + results = await client.optimize_from_ground_truth_options("test-agent", opts) assert isinstance(results, list) - for ctx in results: - assert ctx.duration_ms == 1500 - # 2 variations generated - assert handle_agent_call.call_count == 2 - assert mock_execute.call_count == 6 + # 1 variation generated (after attempt 1 failed on quality) + assert handle_agent_call.call_count == 1 + assert mock_execute.call_count == 4 async def test_no_duration_gate_in_gt_mode_when_latency_optimization_not_set(self): """In GT mode, duration gate is not applied when latency_optimization is None.""" @@ -5770,16 +5756,18 @@ def test_cost_section_absent_by_default(self): assert "Cost Optimization" not in result def test_cost_section_included_when_flag_set(self): - result = build_new_variation_prompt( - self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", + """Phase 2 prompt includes Cost Optimization section when optimize_for_cost=True.""" + result = build_token_latency_variation_prompt( + self._make_history(), ["gpt-4o"], optimize_for_cost=True, ) assert "Cost Optimization" in result def test_duration_and_cost_sections_both_present(self): - result = build_new_variation_prompt( - self._make_history(), None, "gpt-4o", "inst", {}, ["gpt-4o"], [{}], "inst", - optimize_for_duration=True, + """Phase 2 prompt includes both Duration and Cost Optimization sections when both flags set.""" + result = build_token_latency_variation_prompt( + self._make_history(), ["gpt-4o"], + optimize_for_latency=True, optimize_for_cost=True, ) assert "Duration Optimization" in result @@ -5856,6 +5844,7 @@ def _set_pricing(self): ] async def test_cost_context_injected_when_token_optimization_true(self): + """In Phase 2 (cost/latency), token usage is injected into judge instructions.""" self._set_pricing() usage = TokenUsage(total=100, input=60, output=40) captured: list = [] @@ -5867,6 +5856,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): self.client._options = _make_options( handle_judge_call=_capture_judge_call, token_optimization=True ) + self.client._in_cost_latency_phase = True await self.client._evaluate_acceptance_judge( judge_key="cost-judge", optimization_judge=self._cost_judge(), @@ -5907,6 +5897,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): assert "cost/token-usage goal" not in instructions async def test_baseline_cost_shown_when_history_present(self): + """In Phase 2, when a baseline cost is set, judge instructions include baseline comparison.""" self._set_pricing() usage = TokenUsage(total=100, input=60, output=40) captured: list = [] @@ -5929,6 +5920,7 @@ async def _capture_judge_call(judge_key, judge_config, ctx, is_judge): self.client._options = _make_options( handle_judge_call=_capture_judge_call, token_optimization=True ) + self.client._in_cost_latency_phase = True await self.client._evaluate_acceptance_judge( judge_key="cost-judge", optimization_judge=self._cost_judge(), From de5f24f8ad8db24dc6e16abe31a4893297b60aab Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 3 Jun 2026 12:16:07 -0800 Subject: [PATCH 2/4] cursor feedback --- .../optimization/src/ldai_optimizer/client.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 9affb1f0..cd665abb 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -1466,8 +1466,14 @@ async def _run_ground_truth_optimization( last_ctx.iteration, ) if self._last_succeeded_context is None: + # No Phase 2 candidate won; restore the Phase 1 winner. self._last_run_succeeded = True self._last_succeeded_context = phase1_winner + elif self._last_succeeded_context is not phase1_winner: + # Phase 2 selected a better model; return that context so + # callers (including auto_commit) see the actual final winner + # rather than the stale Phase 1 GT batch results. + return [self._last_succeeded_context] return attempt_results # We've hit max attempts for the batches, bail at this point @@ -2690,10 +2696,13 @@ async def _run_cost_latency_phase( if i < max_iters - 1: self._safe_status_update("turn completed", ctx, iteration) - # Report results: fail non-winners first (preserving _last_succeeded_context - # as the Phase 1 winner until the very end), then succeed the best candidate. + # Send terminal FAILED status for each non-winning model attempt. + # We use _safe_status_update directly rather than _handle_failure so that + # exploratory Phase 2 misses don't corrupt _last_run_succeeded, + # _last_succeeded_context, or trigger on_failing_result — those are + # run-level signals that should only fire if the whole optimization fails. for failed_ctx in non_candidates: - self._handle_failure(failed_ctx, failed_ctx.iteration) + self._safe_status_update("failure", failed_ctx, failed_ctx.iteration) if candidates: best = self._pick_best_candidate(candidates) From aa0a77f5181d9f934a2512d8197fee5c46fbe46b Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 3 Jun 2026 13:16:02 -0800 Subject: [PATCH 3/4] more cursor feedback --- .../optimization/src/ldai_optimizer/client.py | 91 +++++++++++++------ 1 file changed, 64 insertions(+), 27 deletions(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index cd665abb..9b964731 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -1445,21 +1445,16 @@ async def _run_ground_truth_optimization( attempt, n, ) - self._last_run_succeeded = True - self._last_succeeded_context = last_ctx - self._safe_status_update("success", last_ctx, last_ctx.iteration) - if self._options.on_passing_result: - try: - self._options.on_passing_result(last_ctx) - except Exception: - logger.exception( - "[GT Attempt %d] -> on_passing_result callback failed", attempt - ) # Phase 2: optimize model/params on the frozen winning variation. if ( self._options.latency_optimization or self._options.token_optimization ) and not self._is_token_limit_exceeded(): + # Record Phase 1 success without firing on_passing_result yet; + # we fire it once below with the true final winner. + self._last_run_succeeded = True + self._last_succeeded_context = last_ctx + self._safe_status_update("success", last_ctx, last_ctx.iteration) phase1_winner = self._last_succeeded_context await self._run_cost_latency_phase( last_ctx, @@ -1469,11 +1464,28 @@ async def _run_ground_truth_optimization( # No Phase 2 candidate won; restore the Phase 1 winner. self._last_run_succeeded = True self._last_succeeded_context = phase1_winner - elif self._last_succeeded_context is not phase1_winner: - # Phase 2 selected a better model; return that context so - # callers (including auto_commit) see the actual final winner - # rather than the stale Phase 1 GT batch results. - return [self._last_succeeded_context] + else: + self._last_run_succeeded = True + self._last_succeeded_context = last_ctx + self._safe_status_update("success", last_ctx, last_ctx.iteration) + + # Fire on_passing_result exactly once with the true final winner. + final_winner = self._last_succeeded_context + if final_winner and self._options.on_passing_result: + try: + self._options.on_passing_result(final_winner) + except Exception: + logger.exception( + "[GT Attempt %d] -> on_passing_result callback failed", attempt + ) + + if ( + self._last_succeeded_context is not None + and self._last_succeeded_context is not last_ctx + ): + # Phase 2 selected a better model; return that context so + # callers (including auto_commit) see the actual final winner. + return [self._last_succeeded_context] return attempt_results # We've hit max attempts for the batches, bail at this point @@ -2193,8 +2205,6 @@ async def _execute_agent_turn( ) except Exception: logger.exception("[Iteration %d] -> Agent call failed", iteration) - if self._options.on_failing_result: - self._options.on_failing_result(optimize_context) raise scores: Dict[str, JudgeResult] = {} @@ -2495,23 +2505,29 @@ def _apply_cost_gate( return passed_so_far and passed, ctx def _handle_success( - self, optimize_context: OptimizationContext, iteration: int + self, + optimize_context: OptimizationContext, + iteration: int, + suppress_user_callbacks: bool = False, ) -> Any: """ Handle a successful optimization result. - Fires the "success" status update, invokes on_passing_result if set, - and returns the winning OptimizationContext. + Fires the "success" status update and (unless suppressed) invokes + on_passing_result. Pass suppress_user_callbacks=True from Phase 2 so + the API record is updated without firing on_passing_result a second time + — the caller is responsible for firing it once with the true final winner. :param optimize_context: The context from the passing iteration :param iteration: Current iteration number for logging + :param suppress_user_callbacks: When True, skip on_passing_result. :return: The passing OptimizationContext """ logger.info("[Iteration %d] -> Optimization succeeded", iteration) self._last_run_succeeded = True self._last_succeeded_context = optimize_context self._safe_status_update("success", optimize_context, iteration) - if self._options.on_passing_result: + if not suppress_user_callbacks and self._options.on_passing_result: try: self._options.on_passing_result(optimize_context) except Exception: @@ -2566,13 +2582,15 @@ def _pick_best_candidate( def _score(ctx: OptimizationContext) -> float: total = 0.0 if ( - ctx.duration_ms is not None + self._options.latency_optimization + and ctx.duration_ms is not None and self._baseline_duration_ms is not None and self._baseline_duration_ms > 0 ): total += ctx.duration_ms / self._baseline_duration_ms if ( - ctx.estimated_cost_usd is not None + self._options.token_optimization + and ctx.estimated_cost_usd is not None and self._baseline_cost_usd is not None and self._baseline_cost_usd > 0 ): @@ -2706,7 +2724,9 @@ async def _run_cost_latency_phase( if candidates: best = self._pick_best_candidate(candidates) - self._handle_success(best, best.iteration) + # Suppress on_passing_result here — the caller fires it once with the + # true final winner after Phase 2 returns, so it is never double-fired. + self._handle_success(best, best.iteration, suppress_user_callbacks=True) logger.info( "[Phase 2] -> Best candidate selected: model=%s, duration_ms=%s, cost=%s", best.current_model, @@ -3080,17 +3100,34 @@ async def _run_optimization( optimize_context, iteration ) if all_valid: - self._handle_success(optimize_context, iteration) - phase1_winner = self._last_succeeded_context + # Suppress on_passing_result in _handle_success; we fire it + # exactly once below with the true final winner so that Phase 2 + # (if it runs) cannot cause a double callback. + self._handle_success( + optimize_context, iteration, suppress_user_callbacks=True + ) if ( self._options.latency_optimization or self._options.token_optimization ) and not self._is_token_limit_exceeded(): + phase1_winner = self._last_succeeded_context await self._run_cost_latency_phase(optimize_context, iteration) if self._last_succeeded_context is None: self._last_run_succeeded = True self._last_succeeded_context = phase1_winner - return self._last_succeeded_context + # Fire on_passing_result exactly once with the true final winner + # (Phase 1 winner if Phase 2 was skipped/found nothing better, + # or the Phase 2 best candidate otherwise). + final_winner = self._last_succeeded_context + if final_winner and self._options.on_passing_result: + try: + self._options.on_passing_result(final_winner) + except Exception: + logger.exception( + "[Iteration %d] -> on_passing_result callback failed", + iteration, + ) + return final_winner if self._is_token_limit_exceeded(): return self._handle_failure(last_ctx, iteration) # Validation failed — treat as a normal failed attempt. From 4eb0bb0c634e40539d9ae4b7f125eb6729c28c74 Mon Sep 17 00:00:00 2001 From: Andrew Klatzke Date: Wed, 3 Jun 2026 14:11:56 -0800 Subject: [PATCH 4/4] fix: ensure cost data is persisted --- packages/optimization/src/ldai_optimizer/client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/optimization/src/ldai_optimizer/client.py b/packages/optimization/src/ldai_optimizer/client.py index 9b964731..0885f812 100644 --- a/packages/optimization/src/ldai_optimizer/client.py +++ b/packages/optimization/src/ldai_optimizer/client.py @@ -2229,10 +2229,17 @@ async def _execute_agent_turn( agent_response.usage, _find_model_config(self._current_model or "", self._model_configs), ) + # Build a _meta entry capturing raw cost and latency telemetry for every + # iteration, regardless of which optimization goals are enabled. This + # surfaces the measurements in the API/UI even when the gates are inactive. result_ctx = dataclasses.replace( optimize_context, completion_response=completion_response, - scores=scores, + scores={**scores, "_meta": JudgeResult( + score=0.0, + duration_ms=agent_duration_ms, + estimated_cost_usd=agent_cost, + )}, duration_ms=agent_duration_ms, usage=agent_response.usage, estimated_cost_usd=agent_cost,