-
Notifications
You must be signed in to change notification settings - Fork 94
LCORE-: Wired compaction turn persistence into agentic query flows #1953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -338,6 +338,7 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals | |
| responses_params=responses_params, | ||
| context=context, | ||
| endpoint_path=endpoint_path, | ||
| original_input=None, | ||
| ) | ||
|
|
||
| # Combine inline RAG results (BYOK + Solr) with tool-based results | ||
|
|
@@ -353,6 +354,8 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals | |
| responses_params=responses_params, | ||
| turn_summary=turn_summary, | ||
| background_topic_summary_tasks=_background_topic_summary_tasks, | ||
| emit_start=True, | ||
| original_input=None, | ||
| ), | ||
| media_type=response_media_type, | ||
| ) | ||
|
|
@@ -387,7 +390,6 @@ async def retrieve_response_generator( | |
| if context.moderation_result.decision == "blocked": | ||
| turn_summary.llm_response = context.moderation_result.message | ||
| turn_summary.id = context.moderation_result.moderation_id | ||
| turn_summary.output_items = [context.moderation_result.refusal_response] | ||
| # In compacted mode the conversation parameter was omitted, so the | ||
| # refusal turn (with the original input) is persisted by | ||
| # generate_response; storing it here too would duplicate it. | ||
|
|
@@ -506,6 +508,7 @@ async def generate_response_with_compaction( | |
| responses_params=responses_params, | ||
| context=context, | ||
| endpoint_path=endpoint_path, | ||
| original_input=compacted_original_input, | ||
| ) | ||
| except HTTPException as e: | ||
| yield http_exception_stream_event(e) | ||
|
|
@@ -705,7 +708,7 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi | |
| if original_input is not None | ||
| else context.query_request.query | ||
| ), | ||
| turn_summary.output_items, | ||
| [], # field was removed from TurnSummary | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is inside deprecated helper function so it has no effect. |
||
| ) | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.exception( | ||
|
|
@@ -884,10 +887,6 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat | |
| getattr(chunk, "response"), # noqa: B009 | ||
| ) | ||
| turn_summary.llm_response = turn_summary.llm_response or "".join(text_parts) | ||
| # Capture structured output items for compacted-mode turn storage | ||
| # (LCORE-1572), so the persisted turn keeps non-text output items | ||
| # rather than being flattened to the response text. | ||
| turn_summary.output_items = list(latest_response_object.output or []) | ||
| event_id = chunk_id | ||
| chunk_id += 1 | ||
| turn_summary.next_chunk_id = chunk_id | ||
|
|
@@ -906,9 +905,6 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat | |
| OpenAIResponseObject, | ||
| getattr(chunk, "response"), # noqa: B009 | ||
| ) | ||
| # Capture any partial output items so a compacted-mode turn is not | ||
| # persisted with empty output on these terminals (LCORE-1572). | ||
| turn_summary.output_items = list(latest_response_object.output or []) | ||
| error_message = ( | ||
| latest_response_object.error.message | ||
| if latest_response_object.error | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,6 @@ | |
|
|
||
| from typing import Any, Optional | ||
|
|
||
| from llama_stack_api import OpenAIResponseOutput | ||
| from pydantic import AnyUrl, BaseModel, Field | ||
|
|
||
| from utils.token_counter import TokenCounter | ||
|
|
@@ -109,11 +108,6 @@ class TurnSummary(BaseModel): | |
| rag_chunks: list[RAGChunk] = Field(default_factory=list) | ||
| referenced_documents: list[ReferencedDocument] = Field(default_factory=list) | ||
| token_usage: TokenCounter = Field(default_factory=TokenCounter) | ||
| output_items: list[OpenAIResponseOutput] = Field( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This attribute is unnecessary now, we capture the output inside the agentic loop |
||
| default_factory=list, | ||
| description="Structured response output items, captured for compacted-mode " | ||
| "turn persistence (LCORE-1572). Empty on the non-compacted path.", | ||
| ) | ||
| partial_tokens: list[str] = Field( | ||
| default_factory=list, | ||
| description="Accumulated text deltas during streaming, used to reconstruct " | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,13 @@ | ||
| """Pydantic AI provider for Llama Stack.""" | ||
|
|
||
| from pydantic_ai_lightspeed.llamastack._model import LlamaStackResponsesModel | ||
| from pydantic_ai_lightspeed.llamastack._model import ( | ||
| CompactionTurnContext, | ||
| LlamaStackResponsesModel, | ||
| ) | ||
| from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider | ||
|
|
||
| __all__ = ["LlamaStackProvider", "LlamaStackResponsesModel"] | ||
| __all__ = [ | ||
| "CompactionTurnContext", | ||
| "LlamaStackProvider", | ||
| "LlamaStackResponsesModel", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,23 +10,30 @@ | |
| deltas must be replayed with the matching suffix so pydantic_ai can append the | ||
| streamed ``tool_args`` content to the correct part. | ||
|
|
||
| When compaction omits the ``conversation`` parameter from inference requests, | ||
| ``LlamaStackResponsesModel`` appends completed turns to the conversation via | ||
| ``CompactionTurnContext`` (in :meth:`_responses_create` for non-streaming rounds, | ||
| and on ``response.completed`` for streaming rounds in :meth:`request_stream`). | ||
|
|
||
| This module provides ``LlamaStackResponsesModel`` which wraps the event stream to | ||
| buffer those early delta events and replay them correctly once the item is announced. | ||
| """ | ||
|
|
||
| from __future__ import annotations as _annotations | ||
|
|
||
| from collections import defaultdict | ||
| from collections.abc import AsyncIterator | ||
| from collections.abc import AsyncIterator, Sequence | ||
| from contextlib import asynccontextmanager | ||
| from typing import Any, cast | ||
| from dataclasses import dataclass | ||
| from typing import Any, Literal, Optional, cast, overload | ||
|
|
||
| from llama_stack_client import AsyncLlamaStackClient | ||
| from openai import AsyncStream | ||
| from openai.types import responses | ||
| from pydantic_ai import UnexpectedModelBehavior | ||
| from pydantic_ai._run_context import RunContext | ||
| from pydantic_ai._utils import PeekableAsyncStream, Unset, number_to_datetime | ||
| from pydantic_ai.messages import ModelMessage, ModelResponse | ||
| from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse | ||
| from pydantic_ai.models import ( | ||
| ModelRequestParameters, | ||
| StreamedResponse, | ||
|
|
@@ -38,13 +45,38 @@ | |
| OpenAIResponsesStreamedResponse, | ||
| _map_api_errors, | ||
| ) | ||
| from pydantic_ai.profiles import ModelProfileSpec | ||
| from pydantic_ai.settings import ModelSettings | ||
|
|
||
| from log import get_logger | ||
| from models.common.responses.types import ResponseInput | ||
| from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider | ||
| from utils.conversations import append_turn_items_to_conversation | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
|
|
||
| @dataclass | ||
| class CompactionTurnContext: | ||
| """Mutable state for manually persisting compacted agent turns. | ||
|
|
||
| ``latest_round_input`` is initialized to the real user query. The create patch | ||
| leaves it unchanged on the first LLM round, then records pydantic-ai input | ||
| for follow-up rounds after that turn is persisted. | ||
|
|
||
| Attributes: | ||
| client: Llama Stack client used to append conversation items. | ||
| conversation_id: Conversation to store turns against. | ||
| latest_round_input: Input stored for the current or next inference round. | ||
| original_input_persisted: Whether the first compacted round was appended. | ||
| """ | ||
|
|
||
| client: AsyncLlamaStackClient | ||
| conversation_id: str | ||
| latest_round_input: ResponseInput | ||
| original_input_persisted: bool = False | ||
|
|
||
|
|
||
| class _FilteredResponseStream: | ||
| """Wraps an OpenAI AsyncStream to reorder spurious events from Llama Stack. | ||
|
|
||
|
|
@@ -58,13 +90,19 @@ class _FilteredResponseStream: | |
| a closing ``}`` to complete the outer JSON object that pydantic_ai opens. | ||
| """ | ||
|
|
||
| def __init__(self, source: AsyncStream[responses.ResponseStreamEvent]) -> None: | ||
| def __init__( | ||
| self, | ||
| source: AsyncStream[responses.ResponseStreamEvent], | ||
| compaction: Optional[CompactionTurnContext] = None, | ||
| ) -> None: | ||
| """Wrap an existing stream with reordering logic. | ||
|
|
||
| Args: | ||
| source: The raw OpenAI AsyncStream to reorder. | ||
| compaction: Compaction state for turn persistence, if active. | ||
| """ | ||
| self._source = source | ||
| self._compaction = compaction | ||
| self._announced_item_ids: set[str] = set() | ||
| self._buffered_deltas: dict[ | ||
| str, list[responses.ResponseFunctionCallArgumentsDeltaEvent] | ||
|
|
@@ -112,6 +150,19 @@ async def _filtered_iter( | |
| self._buffered_deltas[event.item_id].append(event) | ||
| continue | ||
|
|
||
| if ( | ||
| isinstance(event, responses.ResponseCompletedEvent) | ||
| and self._compaction is not None | ||
| ): | ||
| compaction = self._compaction | ||
| await append_turn_items_to_conversation( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Store streaming compacted turn from final event |
||
| compaction.client, | ||
| compaction.conversation_id, | ||
| compaction.latest_round_input, | ||
| cast(Sequence[Any], event.response.output), | ||
| ) | ||
| compaction.original_input_persisted = True | ||
|
|
||
| yield event | ||
|
|
||
| def _replay_buffered_deltas( | ||
|
|
@@ -179,8 +230,108 @@ class LlamaStackResponsesModel(OpenAIResponsesModel): | |
| Overrides the streaming response processing to buffer and replay | ||
| ``ResponseFunctionCallArgumentsDeltaEvent`` events that Llama Stack emits | ||
| before the corresponding ``McpCall`` or ``ResponseFunctionToolCall`` item. | ||
|
|
||
| When ``compaction`` is set, completed inference rounds are appended to the | ||
| conversation because compacted mode omits the ``conversation`` parameter. | ||
| """ | ||
|
|
||
| def __init__( # pylint: disable=too-many-arguments | ||
| self, | ||
| model_name: str, | ||
| provider: LlamaStackProvider, | ||
| profile: ModelProfileSpec | None = None, | ||
| settings: ModelSettings | None = None, | ||
| compaction: Optional[CompactionTurnContext] = None, | ||
| ) -> None: | ||
| """Initialize the model. | ||
|
|
||
| Args: | ||
| model_name: Model identifier passed to pydantic-ai. | ||
| provider: Pydantic AI provider or provider name. | ||
| profile: Optional model profile override. | ||
| settings: Optional pydantic-ai model settings. | ||
| compaction: Compaction state when turns must be stored manually. | ||
| """ | ||
| super().__init__( | ||
| model_name, | ||
| provider=provider, | ||
| profile=profile, | ||
| settings=settings, | ||
| ) | ||
| self.compaction = compaction | ||
|
|
||
| @overload | ||
| async def _responses_create( | ||
| self, | ||
| messages: list[ModelRequest | ModelResponse], | ||
| stream: Literal[False], | ||
| model_settings: OpenAIResponsesModelSettings, | ||
| model_request_parameters: ModelRequestParameters, | ||
| ) -> responses.Response: ... | ||
|
|
||
| @overload | ||
| async def _responses_create( | ||
| self, | ||
| messages: list[ModelRequest | ModelResponse], | ||
| stream: Literal[True], | ||
| model_settings: OpenAIResponsesModelSettings, | ||
| model_request_parameters: ModelRequestParameters, | ||
| ) -> AsyncStream[responses.ResponseStreamEvent]: ... | ||
|
|
||
| async def _responses_create( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Patching _responses_create function in order to intercept input and output of each responses api call |
||
| self, | ||
| messages: list[ModelRequest | ModelResponse], | ||
| stream: bool, | ||
| model_settings: OpenAIResponsesModelSettings, | ||
| model_request_parameters: ModelRequestParameters, | ||
| ) -> ( | ||
| responses.Response | AsyncStream[responses.ResponseStreamEvent] | ModelResponse | ||
| ): | ||
| """Create a Responses API request with compacted turn persistence. | ||
|
|
||
| After the first compacted round is persisted, records pydantic-ai input | ||
| for follow-up tool-loop rounds. Non-streaming responses are appended | ||
| immediately; streaming persistence is handled in :meth:`request_stream`. | ||
| """ | ||
| compaction = self.compaction | ||
| if compaction is not None and compaction.original_input_persisted: | ||
| request_params = await self._build_responses_request_params( | ||
| messages, | ||
| model_settings, | ||
| model_request_parameters, | ||
| self.profile, | ||
| ) | ||
| compaction.latest_round_input = cast(ResponseInput, request_params.input) | ||
|
|
||
| result: ( | ||
| responses.Response | ||
| | AsyncStream[responses.ResponseStreamEvent] | ||
| | ModelResponse | ||
| ) | ||
| if stream: | ||
| result = await super()._responses_create( | ||
| messages, True, model_settings, model_request_parameters | ||
| ) | ||
| else: | ||
| result = await super()._responses_create( | ||
| messages, False, model_settings, model_request_parameters | ||
| ) | ||
|
|
||
| if ( | ||
| compaction is not None | ||
| and not stream | ||
| and isinstance(result, responses.Response) | ||
| ): | ||
| await append_turn_items_to_conversation( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Store here for non-streaming query |
||
| compaction.client, | ||
| compaction.conversation_id, | ||
| compaction.latest_round_input, | ||
| cast(Sequence[Any], result.output), | ||
| ) | ||
| compaction.original_input_persisted = True | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First LLM call contains the whole compacted history, therefore we need to store onpy the original input, subsequent turns are stored normally |
||
|
|
||
| return result | ||
|
|
||
| async def request( # pylint: disable=unused-argument | ||
| self, | ||
| messages: list[ModelMessage], | ||
|
|
@@ -274,7 +425,7 @@ async def request_stream( # pylint: disable=unused-argument | |
| messages, True, model_settings_cast, model_request_parameters | ||
| ) | ||
|
|
||
| filtered_stream = _FilteredResponseStream(response) | ||
| filtered_stream = _FilteredResponseStream(response, self.compaction) | ||
|
|
||
| async with response: | ||
| peekable: PeekableAsyncStream[ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -282,7 +282,7 @@ async def retrieve_agent_response( | |
| responses_params: ResponsesApiParams, | ||
| moderation_result: ShieldModerationResult, | ||
| endpoint_path: str, | ||
| _original_input: Optional[ResponseInput] = None, | ||
| original_input: Optional[ResponseInput] = None, | ||
| ) -> TurnSummary: | ||
| """Retrieve a turn summary from a blocking agent run. | ||
|
|
||
|
|
@@ -293,7 +293,7 @@ async def retrieve_agent_response( | |
| responses_params: Prepared Responses API parameters. | ||
| moderation_result: Shield moderation outcome for the turn. | ||
| endpoint_path: Endpoint path used for metric labeling. | ||
| _original_input: Original user input before the explicit-input rewrite. | ||
| original_input: Original user input before the explicit-input rewrite. | ||
|
|
||
| Returns: | ||
| Turn summary for the completed agent run. | ||
|
|
@@ -305,15 +305,17 @@ async def retrieve_agent_response( | |
| await append_turn_items_to_conversation( | ||
| client, | ||
| responses_params.conversation, | ||
| responses_params.input, | ||
| original_input or responses_params.input, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If original input is not None -> store the original prompt, responses_api.input contains the whole compacted history. |
||
| [moderation_result.refusal_response], | ||
| ) | ||
| return TurnSummary( | ||
| id=moderation_result.moderation_id, | ||
| llm_response=moderation_result.message, | ||
| ) | ||
| try: | ||
| agent = build_agent(client, responses_params, configuration.skills) | ||
| agent = build_agent( | ||
| client, responses_params, configuration.skills, original_input | ||
| ) | ||
| logger.debug("Starting agent non-streaming response processing") | ||
| run_result = await agent.run(cast(str, responses_params.input)) | ||
| except (AgentRunError, APIStatusError, APIConnectionError, RuntimeError) as exc: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compaction.compactedis True iffcompaction.original_inputis not None