Skip to content

Commit 1f81521

Browse files
committed
fix(temporal): allowing-ACP-temporal-telemetry
1 parent 6509be1 commit 1f81521

1 file changed

Lines changed: 37 additions & 13 deletions

File tree

  • src/agentex/lib/core/temporal/workers

src/agentex/lib/core/temporal/workers/worker.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,17 @@ def _validate_interceptors(interceptors: list) -> None:
8989
)
9090

9191

92-
async def get_temporal_client(temporal_address: str, metrics_url: str | None = None, plugins: list = []) -> Client:
92+
async def get_temporal_client(
93+
temporal_address: str, metrics_url: str | None = None, plugins: list = []
94+
) -> Client:
9395
if plugins != []: # We don't need to validate the plugins if they are empty
9496
_validate_plugins(plugins)
9597

9698
# Check if OpenAI plugin is present - it needs to configure its own data converter
9799
# Lazy import to avoid pulling in opentelemetry.sdk for non-Temporal agents
98100
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
99-
has_openai_plugin = any(
100-
isinstance(p, OpenAIAgentsPlugin) for p in (plugins or [])
101-
)
101+
102+
has_openai_plugin = any(isinstance(p, OpenAIAgentsPlugin) for p in (plugins or []))
102103

103104
# Build connection kwargs
104105
connect_kwargs = {
@@ -113,7 +114,9 @@ async def get_temporal_client(temporal_address: str, metrics_url: str | None = N
113114
if not metrics_url:
114115
client = await Client.connect(**connect_kwargs)
115116
else:
116-
runtime = Runtime(telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url)))
117+
runtime = Runtime(
118+
telemetry=TelemetryConfig(metrics=OpenTelemetryConfig(url=metrics_url))
119+
)
117120
connect_kwargs["runtime"] = runtime
118121
client = await Client.connect(**connect_kwargs)
119122
return client
@@ -128,16 +131,22 @@ def __init__(
128131
health_check_port: int | None = None,
129132
plugins: list = [],
130133
interceptors: list = [],
134+
metrics_url: str | None = None,
131135
):
132136
self.task_queue = task_queue
133137
self.activity_handles = []
134138
self.max_workers = max_workers
135139
self.max_concurrent_activities = max_concurrent_activities
136140
self.health_check_server_running = False
137141
self.healthy = False
138-
self.health_check_port = health_check_port if health_check_port is not None else EnvironmentVariables.refresh().HEALTH_CHECK_PORT
142+
self.health_check_port = (
143+
health_check_port
144+
if health_check_port is not None
145+
else EnvironmentVariables.refresh().HEALTH_CHECK_PORT
146+
)
139147
self.plugins = plugins
140148
self.interceptors = interceptors
149+
self.metrics_url = metrics_url
141150

142151
@overload
143152
async def run(
@@ -172,12 +181,17 @@ async def run(
172181
temporal_client = await get_temporal_client(
173182
temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
174183
plugins=self.plugins,
184+
metrics_url=self.metrics_url,
175185
)
176186

177187
# Enable debug mode if AgentEx debug is enabled (disables deadlock detection)
178-
debug_enabled = os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true"
188+
debug_enabled = (
189+
os.environ.get("AGENTEX_DEBUG_ENABLED", "false").lower() == "true"
190+
)
179191
if debug_enabled:
180-
logger.info("🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled")
192+
logger.info(
193+
"🐛 [WORKER] Temporal debug mode enabled - deadlock detection disabled"
194+
)
181195

182196
if workflow is None and workflows is None:
183197
raise ValueError("Either workflow or workflows must be provided")
@@ -207,7 +221,9 @@ async def _health_check(self):
207221
async def start_health_check_server(self):
208222
if not self.health_check_server_running:
209223
app = web.Application()
210-
app.router.add_get("/readyz", lambda request: self._health_check()) # noqa: ARG005
224+
app.router.add_get(
225+
"/readyz", lambda request: self._health_check()
226+
) # noqa: ARG005
211227

212228
# Disable access logging
213229
runner = web.AppRunner(app, access_log=None)
@@ -216,19 +232,27 @@ async def start_health_check_server(self):
216232
try:
217233
site = web.TCPSite(runner, "0.0.0.0", self.health_check_port)
218234
await site.start()
219-
logger.info(f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz")
235+
logger.info(
236+
f"Health check server running on http://0.0.0.0:{self.health_check_port}/readyz"
237+
)
220238
self.health_check_server_running = True
221239
except OSError as e:
222-
logger.error(f"Failed to start health check server on port {self.health_check_port}: {e}")
240+
logger.error(
241+
f"Failed to start health check server on port {self.health_check_port}: {e}"
242+
)
223243
# Try alternative port if default fails
224244
try:
225245
alt_port = self.health_check_port + 1
226246
site = web.TCPSite(runner, "0.0.0.0", alt_port)
227247
await site.start()
228-
logger.info(f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz")
248+
logger.info(
249+
f"Health check server running on alternative port http://0.0.0.0:{alt_port}/readyz"
250+
)
229251
self.health_check_server_running = True
230252
except OSError as e:
231-
logger.error(f"Failed to start health check server on alternative port {alt_port}: {e}")
253+
logger.error(
254+
f"Failed to start health check server on alternative port {alt_port}: {e}"
255+
)
232256
raise
233257

234258
"""

0 commit comments

Comments
 (0)