-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathacp.py
More file actions
107 lines (82 loc) · 4.22 KB
/
acp.py
File metadata and controls
107 lines (82 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
from typing import Union, AsyncGenerator
from agents import Agent, Runner, RunConfig
from agentex.lib import adk
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.types.converters import convert_task_messages_to_oai_agents_inputs
from agentex.lib.utils.model_utils import BaseModel
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
from agentex.types.task_message_content import TextContent, TaskMessageContent
from agentex.lib.adk.providers._modules.sync_provider import (
SyncStreamingProvider,
convert_openai_to_agentex_events,
)
# Create an ACP server
acp = FastACP.create(
acp_type="sync",
)
class StateModel(BaseModel):
system_prompt: str
model: str
# Note: The return of this handler is required to be persisted by the Agentex Server
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams,
) -> Union[TaskMessageContent, AsyncGenerator[TaskMessageUpdate, None]]:
"""
In this tutorial, we'll see how to handle a basic multi-turn conversation without streaming.
"""
#########################################################
# 1-3. These steps are all the same as the hello acp tutorial.
#########################################################
if not params.content:
return
if not hasattr(params.content, "type") or params.content.type != "text":
raise ValueError(f"Expected text message, got {getattr(params.content, 'type', 'unknown')}")
if not hasattr(params.content, "author") or params.content.author != "user":
raise ValueError(f"Expected user message, got {getattr(params.content, 'author', 'unknown')}")
if not os.environ.get("OPENAI_API_KEY"):
yield StreamTaskMessageFull(
index=0,
type="full",
content=TextContent(
author="agent",
content="Hey, sorry I'm unable to respond to your message because you're running this example without an OpenAI API key. Please set the OPENAI_API_KEY environment variable to run this example. Do this by either by adding a .env file to the project/ directory or by setting the environment variable in your terminal.",
),
)
return
# Try to retrieve the state. If it doesn't exist, create it.
task_state = await adk.state.get_by_task_and_agent(task_id=params.task.id, agent_id=params.agent.id)
if not task_state:
# If the state doesn't exist, create it.
state = StateModel(system_prompt="You are a helpful assistant that can answer questions.", model="gpt-4o-mini")
task_state = await adk.state.create(task_id=params.task.id, agent_id=params.agent.id, state=state)
else:
state = StateModel.model_validate(task_state.state)
task_messages = await adk.messages.list(task_id=params.task.id)
task_messages = list(reversed(task_messages)) # API returns newest first, reverse to chronological order
# Initialize the provider and run config to allow for tracing
provider = SyncStreamingProvider(
trace_id=params.task.id,
)
# Initialize the run config to allow for tracing and streaming
run_config = RunConfig(
model_provider=provider,
)
test_agent = Agent(name="assistant", instructions=state.system_prompt, model=state.model)
# Convert task messages to OpenAI Agents SDK format
input_list = convert_task_messages_to_oai_agents_inputs(task_messages)
# Append the current user message (not yet persisted in task history)
input_list.append({"role": "user", "content": params.content.content})
# Run the agent and stream the events
result = Runner.run_streamed(test_agent, input_list, run_config=run_config)
#########################################################
# 4. Stream the events to the client.
#########################################################
# Convert the OpenAI events to Agentex events
# This is done by converting the OpenAI events to Agentex events and yielding them to the client
stream = result.stream_events()
# Yield the Agentex events to the client
async for agentex_event in convert_openai_to_agentex_events(stream):
yield agentex_event