-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathacp.py.j2
More file actions
138 lines (108 loc) · 4.32 KB
/
acp.py.j2
File metadata and controls
138 lines (108 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
from typing import AsyncGenerator, List
from agentex.lib import adk
from agentex.lib.adk.providers._modules.sync_provider import SyncStreamingProvider, convert_openai_to_agentex_events
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.types.acp import SendMessageParams
from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config
from agentex.lib.types.tracing import SGPTracingProcessorConfig
from agentex.lib.utils.model_utils import BaseModel
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
from agentex.types.task_message_content import TaskMessageContent
from agentex.types.text_content import TextContent
from agentex.lib.utils.logging import make_logger
from agents import Agent, Runner, RunConfig, function_tool
logger = make_logger(__name__)
SGP_API_KEY = os.environ.get("SGP_API_KEY", "")
SGP_ACCOUNT_ID = os.environ.get("SGP_ACCOUNT_ID", "")
if SGP_API_KEY and SGP_ACCOUNT_ID:
add_tracing_processor_config(
SGPTracingProcessorConfig(
sgp_api_key=SGP_API_KEY,
sgp_account_id=SGP_ACCOUNT_ID,
)
)
MODEL = "gpt-4o-mini"
SYSTEM_PROMPT = """
<role>
You are a helpful assistant. Use your tools to help the user.
</role>
<communication_style>
Communicate in a witty and friendly manner
</communication_style>
"""
AGENT_NAME = "{{ agent_name }}"
@function_tool
async def get_weather() -> str:
"""
Get the current weather.
This is a dummy activity that returns a hardcoded string for demo purposes.
Replace this with a real weather API call in your implementation.
Returns:
A string describing the current weather conditions.
"""
logger.info("get_weather activity called")
return "Sunny, 72°F"
# Create an ACP server
acp = FastACP.create(
acp_type="sync",
)
class StateModel(BaseModel):
input_list: List[dict]
turn_number: int
@acp.on_message_send
async def handle_message_send(
params: SendMessageParams
) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]:
if not os.environ.get("OPENAI_API_KEY"):
yield StreamTaskMessageFull(
index=0,
type="full",
content=TextContent(
author="agent",
content="Hey, sorry I'm unable to respond to your message because you're running this example without an OpenAI API key. Please set the OPENAI_API_KEY environment variable to run this example. Do this by either by adding a .env file to the project/ directory or by setting the environment variable in your terminal.",
),
)
user_prompt = params.content.content
# Retrieve the task state. Each event is handled as a new turn, so we need to get the state for the current turn.
task_state = await adk.state.get_by_task_and_agent(task_id=params.task.id, agent_id=params.agent.id)
if not task_state:
# If the state doesn't exist, create it.
state = StateModel(input_list=[], turn_number=0)
task_state = await adk.state.create(task_id=params.task.id, agent_id=params.agent.id, state=state)
else:
state = StateModel.model_validate(task_state.state)
state.turn_number += 1
state.input_list.append({"role": "user", "content": user_prompt})
# Initialize the sync provider and run config to allow for tracing
provider = SyncStreamingProvider(
trace_id=params.task.id,
)
run_config = RunConfig(
model_provider=provider,
)
# Initialize the agent
agent = Agent(
name=AGENT_NAME,
instructions=SYSTEM_PROMPT,
model=MODEL,
tools=[get_weather],
)
# Run the agent with the conversation history from state
result = Runner.run_streamed(
agent,
state.input_list,
run_config=run_config
)
# Convert the OpenAI events to Agentex events and stream them back to the client
async for agentex_event in convert_openai_to_agentex_events(result.stream_events()):
yield agentex_event
# After streaming is complete, update state with the full conversation history
state.input_list = result.to_input_list()
await adk.state.update(
state_id=task_state.id,
task_id=params.task.id,
agent_id=params.agent.id,
state=state,
trace_id=params.task.id,
)