Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/endpoints/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ async def query_endpoint_handler(
responses_params,
moderation_result,
endpoint_path,
compaction.original_input if compaction.compacted else None,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compaction.compacted is True iff compaction.original_input is not None

compaction.original_input,
)

if moderation_result.decision == "passed":
Expand Down
14 changes: 5 additions & 9 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals
responses_params=responses_params,
context=context,
endpoint_path=endpoint_path,
original_input=None,
)

# Combine inline RAG results (BYOK + Solr) with tool-based results
Expand All @@ -353,6 +354,8 @@ async def streaming_query_endpoint_handler( # pylint: disable=too-many-locals
responses_params=responses_params,
turn_summary=turn_summary,
background_topic_summary_tasks=_background_topic_summary_tasks,
emit_start=True,
original_input=None,
),
media_type=response_media_type,
)
Expand Down Expand Up @@ -387,7 +390,6 @@ async def retrieve_response_generator(
if context.moderation_result.decision == "blocked":
turn_summary.llm_response = context.moderation_result.message
turn_summary.id = context.moderation_result.moderation_id
turn_summary.output_items = [context.moderation_result.refusal_response]
# In compacted mode the conversation parameter was omitted, so the
# refusal turn (with the original input) is persisted by
# generate_response; storing it here too would duplicate it.
Expand Down Expand Up @@ -506,6 +508,7 @@ async def generate_response_with_compaction(
responses_params=responses_params,
context=context,
endpoint_path=endpoint_path,
original_input=compacted_original_input,
)
except HTTPException as e:
yield http_exception_stream_event(e)
Expand Down Expand Up @@ -705,7 +708,7 @@ async def generate_response( # pylint: disable=too-many-arguments,too-many-posi
if original_input is not None
else context.query_request.query
),
turn_summary.output_items,
[], # field was removed from TurnSummary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inside deprecated helper function so it has no effect.

)
except Exception: # pylint: disable=broad-except
logger.exception(
Expand Down Expand Up @@ -884,10 +887,6 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
getattr(chunk, "response"), # noqa: B009
)
turn_summary.llm_response = turn_summary.llm_response or "".join(text_parts)
# Capture structured output items for compacted-mode turn storage
# (LCORE-1572), so the persisted turn keeps non-text output items
# rather than being flattened to the response text.
turn_summary.output_items = list(latest_response_object.output or [])
event_id = chunk_id
chunk_id += 1
turn_summary.next_chunk_id = chunk_id
Expand All @@ -906,9 +905,6 @@ async def response_generator( # pylint: disable=too-many-branches,too-many-stat
OpenAIResponseObject,
getattr(chunk, "response"), # noqa: B009
)
# Capture any partial output items so a compacted-mode turn is not
# persisted with empty output on these terminals (LCORE-1572).
turn_summary.output_items = list(latest_response_object.output or [])
error_message = (
latest_response_object.error.message
if latest_response_object.error
Expand Down
6 changes: 0 additions & 6 deletions src/models/common/turn_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from typing import Any, Optional

from llama_stack_api import OpenAIResponseOutput
from pydantic import AnyUrl, BaseModel, Field

from utils.token_counter import TokenCounter
Expand Down Expand Up @@ -109,11 +108,6 @@ class TurnSummary(BaseModel):
rag_chunks: list[RAGChunk] = Field(default_factory=list)
referenced_documents: list[ReferencedDocument] = Field(default_factory=list)
token_usage: TokenCounter = Field(default_factory=TokenCounter)
output_items: list[OpenAIResponseOutput] = Field(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attribute is unnecessary now, we capture the output inside the agentic loop

default_factory=list,
description="Structured response output items, captured for compacted-mode "
"turn persistence (LCORE-1572). Empty on the non-compacted path.",
)
partial_tokens: list[str] = Field(
default_factory=list,
description="Accumulated text deltas during streaming, used to reconstruct "
Expand Down
11 changes: 9 additions & 2 deletions src/pydantic_ai_lightspeed/llamastack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""Pydantic AI provider for Llama Stack."""

from pydantic_ai_lightspeed.llamastack._model import LlamaStackResponsesModel
from pydantic_ai_lightspeed.llamastack._model import (
CompactionTurnContext,
LlamaStackResponsesModel,
)
from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider

__all__ = ["LlamaStackProvider", "LlamaStackResponsesModel"]
__all__ = [
"CompactionTurnContext",
"LlamaStackProvider",
"LlamaStackResponsesModel",
]
161 changes: 156 additions & 5 deletions src/pydantic_ai_lightspeed/llamastack/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@
deltas must be replayed with the matching suffix so pydantic_ai can append the
streamed ``tool_args`` content to the correct part.

When compaction omits the ``conversation`` parameter from inference requests,
``LlamaStackResponsesModel`` appends completed turns to the conversation via
``CompactionTurnContext`` (in :meth:`_responses_create` for non-streaming rounds,
and on ``response.completed`` for streaming rounds in :meth:`request_stream`).

This module provides ``LlamaStackResponsesModel`` which wraps the event stream to
buffer those early delta events and replay them correctly once the item is announced.
"""

from __future__ import annotations as _annotations

from collections import defaultdict
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from typing import Any, cast
from dataclasses import dataclass
from typing import Any, Literal, Optional, cast, overload

from llama_stack_client import AsyncLlamaStackClient
from openai import AsyncStream
from openai.types import responses
from pydantic_ai import UnexpectedModelBehavior
from pydantic_ai._run_context import RunContext
from pydantic_ai._utils import PeekableAsyncStream, Unset, number_to_datetime
from pydantic_ai.messages import ModelMessage, ModelResponse
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse
from pydantic_ai.models import (
ModelRequestParameters,
StreamedResponse,
Expand All @@ -38,13 +45,38 @@
OpenAIResponsesStreamedResponse,
_map_api_errors,
)
from pydantic_ai.profiles import ModelProfileSpec
from pydantic_ai.settings import ModelSettings

from log import get_logger
from models.common.responses.types import ResponseInput
from pydantic_ai_lightspeed.llamastack._provider import LlamaStackProvider
from utils.conversations import append_turn_items_to_conversation

logger = get_logger(__name__)


@dataclass
class CompactionTurnContext:
"""Mutable state for manually persisting compacted agent turns.

``latest_round_input`` is initialized to the real user query. The create patch
leaves it unchanged on the first LLM round, then records pydantic-ai input
for follow-up rounds after that turn is persisted.

Attributes:
client: Llama Stack client used to append conversation items.
conversation_id: Conversation to store turns against.
latest_round_input: Input stored for the current or next inference round.
original_input_persisted: Whether the first compacted round was appended.
"""

client: AsyncLlamaStackClient
conversation_id: str
latest_round_input: ResponseInput
original_input_persisted: bool = False


class _FilteredResponseStream:
"""Wraps an OpenAI AsyncStream to reorder spurious events from Llama Stack.

Expand All @@ -58,13 +90,19 @@ class _FilteredResponseStream:
a closing ``}`` to complete the outer JSON object that pydantic_ai opens.
"""

def __init__(self, source: AsyncStream[responses.ResponseStreamEvent]) -> None:
def __init__(
self,
source: AsyncStream[responses.ResponseStreamEvent],
compaction: Optional[CompactionTurnContext] = None,
) -> None:
"""Wrap an existing stream with reordering logic.

Args:
source: The raw OpenAI AsyncStream to reorder.
compaction: Compaction state for turn persistence, if active.
"""
self._source = source
self._compaction = compaction
self._announced_item_ids: set[str] = set()
self._buffered_deltas: dict[
str, list[responses.ResponseFunctionCallArgumentsDeltaEvent]
Expand Down Expand Up @@ -112,6 +150,19 @@ async def _filtered_iter(
self._buffered_deltas[event.item_id].append(event)
continue

if (
isinstance(event, responses.ResponseCompletedEvent)
and self._compaction is not None
):
compaction = self._compaction
await append_turn_items_to_conversation(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store streaming compacted turn from final event

compaction.client,
compaction.conversation_id,
compaction.latest_round_input,
cast(Sequence[Any], event.response.output),
)
compaction.original_input_persisted = True

yield event

def _replay_buffered_deltas(
Expand Down Expand Up @@ -179,8 +230,108 @@ class LlamaStackResponsesModel(OpenAIResponsesModel):
Overrides the streaming response processing to buffer and replay
``ResponseFunctionCallArgumentsDeltaEvent`` events that Llama Stack emits
before the corresponding ``McpCall`` or ``ResponseFunctionToolCall`` item.

When ``compaction`` is set, completed inference rounds are appended to the
conversation because compacted mode omits the ``conversation`` parameter.
"""

def __init__( # pylint: disable=too-many-arguments
self,
model_name: str,
provider: LlamaStackProvider,
profile: ModelProfileSpec | None = None,
settings: ModelSettings | None = None,
compaction: Optional[CompactionTurnContext] = None,
) -> None:
"""Initialize the model.

Args:
model_name: Model identifier passed to pydantic-ai.
provider: Pydantic AI provider or provider name.
profile: Optional model profile override.
settings: Optional pydantic-ai model settings.
compaction: Compaction state when turns must be stored manually.
"""
super().__init__(
model_name,
provider=provider,
profile=profile,
settings=settings,
)
self.compaction = compaction

@overload
async def _responses_create(
self,
messages: list[ModelRequest | ModelResponse],
stream: Literal[False],
model_settings: OpenAIResponsesModelSettings,
model_request_parameters: ModelRequestParameters,
) -> responses.Response: ...

@overload
async def _responses_create(
self,
messages: list[ModelRequest | ModelResponse],
stream: Literal[True],
model_settings: OpenAIResponsesModelSettings,
model_request_parameters: ModelRequestParameters,
) -> AsyncStream[responses.ResponseStreamEvent]: ...

async def _responses_create(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patching _responses_create function in order to intercept input and output of each responses api call

self,
messages: list[ModelRequest | ModelResponse],
stream: bool,
model_settings: OpenAIResponsesModelSettings,
model_request_parameters: ModelRequestParameters,
) -> (
responses.Response | AsyncStream[responses.ResponseStreamEvent] | ModelResponse
):
"""Create a Responses API request with compacted turn persistence.

After the first compacted round is persisted, records pydantic-ai input
for follow-up tool-loop rounds. Non-streaming responses are appended
immediately; streaming persistence is handled in :meth:`request_stream`.
"""
compaction = self.compaction
if compaction is not None and compaction.original_input_persisted:
request_params = await self._build_responses_request_params(
messages,
model_settings,
model_request_parameters,
self.profile,
)
compaction.latest_round_input = cast(ResponseInput, request_params.input)

result: (
responses.Response
| AsyncStream[responses.ResponseStreamEvent]
| ModelResponse
)
if stream:
result = await super()._responses_create(
messages, True, model_settings, model_request_parameters
)
else:
result = await super()._responses_create(
messages, False, model_settings, model_request_parameters
)

if (
compaction is not None
and not stream
and isinstance(result, responses.Response)
):
await append_turn_items_to_conversation(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store here for non-streaming query

compaction.client,
compaction.conversation_id,
compaction.latest_round_input,
cast(Sequence[Any], result.output),
)
compaction.original_input_persisted = True

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First LLM call contains the whole compacted history, therefore we need to store onpy the original input, subsequent turns are stored normally


return result

async def request( # pylint: disable=unused-argument
self,
messages: list[ModelMessage],
Expand Down Expand Up @@ -274,7 +425,7 @@ async def request_stream( # pylint: disable=unused-argument
messages, True, model_settings_cast, model_request_parameters
)

filtered_stream = _FilteredResponseStream(response)
filtered_stream = _FilteredResponseStream(response, self.compaction)

async with response:
peekable: PeekableAsyncStream[
Expand Down
10 changes: 6 additions & 4 deletions src/utils/agents/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async def retrieve_agent_response(
responses_params: ResponsesApiParams,
moderation_result: ShieldModerationResult,
endpoint_path: str,
_original_input: Optional[ResponseInput] = None,
original_input: Optional[ResponseInput] = None,
) -> TurnSummary:
"""Retrieve a turn summary from a blocking agent run.

Expand All @@ -293,7 +293,7 @@ async def retrieve_agent_response(
responses_params: Prepared Responses API parameters.
moderation_result: Shield moderation outcome for the turn.
endpoint_path: Endpoint path used for metric labeling.
_original_input: Original user input before the explicit-input rewrite.
original_input: Original user input before the explicit-input rewrite.

Returns:
Turn summary for the completed agent run.
Expand All @@ -305,15 +305,17 @@ async def retrieve_agent_response(
await append_turn_items_to_conversation(
client,
responses_params.conversation,
responses_params.input,
original_input or responses_params.input,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If original input is not None -> store the original prompt, responses_api.input contains the whole compacted history.

[moderation_result.refusal_response],
)
return TurnSummary(
id=moderation_result.moderation_id,
llm_response=moderation_result.message,
)
try:
agent = build_agent(client, responses_params, configuration.skills)
agent = build_agent(
client, responses_params, configuration.skills, original_input
)
logger.debug("Starting agent non-streaming response processing")
run_result = await agent.run(cast(str, responses_params.input))
except (AgentRunError, APIStatusError, APIConnectionError, RuntimeError) as exc:
Expand Down
Loading
Loading