5555from typing_extensions import deprecated
5656from typing_extensions import override
5757from watchdog .observers import Observer
58+ import yaml
5859
5960from . import agent_graph
6061from ..agents .base_agent import BaseAgent
8990from ..events .event import Event
9091from ..memory .base_memory_service import BaseMemoryService
9192from ..plugins .base_plugin import BasePlugin
93+ from ..plugins .bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin
9294from ..runners import Runner
9395from ..sessions .base_session_service import BaseSessionService
9496from ..sessions .session import Session
@@ -697,17 +699,55 @@ async def get_runner_async(self, app_name: str) -> Runner:
697699 # Instantiate extra plugins if configured
698700 extra_plugins_instances = self ._instantiate_extra_plugins ()
699701
702+ plugins_yaml_path = os .path .join (self .agents_dir , app_name , "plugins.yaml" )
703+ bq_analytics_config = None
704+ if os .path .exists (plugins_yaml_path ):
705+ with open (plugins_yaml_path , "r" , encoding = "utf-8" ) as f :
706+ plugins_config = yaml .safe_load (f )
707+ if plugins_config and isinstance (plugins_config , dict ):
708+ bq_analytics_config = plugins_config .get ("bigquery_agent_analytics" )
709+
710+ # Determine if the agent was loaded from YAML based on the agent loader info
711+ is_visual_builder = False
712+ detailed_agents = self .agent_loader .list_agents_detailed ()
713+ for agent_info in detailed_agents :
714+ if agent_info .get ("name" ) == app_name :
715+ if agent_info .get ("language" ) == "yaml" :
716+ is_visual_builder = True
717+ break
718+
700719 if isinstance (agent_or_app , BaseAgent ):
720+ plugins = extra_plugins_instances
721+
722+ # Handle BigQuery Analytics Plugin injection
723+ if bq_analytics_config and all ([
724+ bq_analytics_config .get ("project_id" ),
725+ bq_analytics_config .get ("dataset_id" ),
726+ bq_analytics_config .get ("dataset_location" ),
727+ ]):
728+ plugins .append (
729+ BigQueryAgentAnalyticsPlugin (
730+ project_id = bq_analytics_config .get ("project_id" ),
731+ dataset_id = bq_analytics_config .get ("dataset_id" ),
732+ table_id = bq_analytics_config .get ("table_id" ),
733+ location = bq_analytics_config .get ("dataset_location" ),
734+ )
735+ )
736+
701737 agentic_app = App (
702738 name = app_name ,
703739 root_agent = agent_or_app ,
704- plugins = extra_plugins_instances ,
740+ plugins = plugins ,
705741 )
706742 else :
707743 # Combine existing plugins with extra plugins
708744 agent_or_app .plugins = agent_or_app .plugins + extra_plugins_instances
709745 agentic_app = agent_or_app
710746
747+ # If the root agent was loaded from YAML, we treat it as being from Visual Builder
748+ if is_visual_builder :
749+ object .__setattr__ (agentic_app , "_is_visual_builder_app" , True )
750+
711751 runner = self ._create_runner (agentic_app )
712752 self .runner_dict [app_name ] = runner
713753 return runner
@@ -1840,9 +1880,20 @@ async def patch_memory(
18401880 raise HTTPException (status_code = 404 , detail = "Session not found" )
18411881 await self .memory_service .add_session_to_memory (session )
18421882
1883+ def _set_telemetry_context_if_needed (runner : Runner ):
1884+ """Helper to set contextvars for the current request task."""
1885+ app = getattr (runner , "app" , None )
1886+ from ..utils ._telemetry_context import _is_visual_builder
1887+
1888+ if app and getattr (app , "_is_visual_builder_app" , False ):
1889+ _is_visual_builder .set (True )
1890+ else :
1891+ _is_visual_builder .set (False )
1892+
18431893 @app .post ("/run" , response_model_exclude_none = True )
18441894 async def run_agent (req : RunAgentRequest ) -> list [Event ]:
18451895 runner = await self .get_runner_async (req .app_name )
1896+ _set_telemetry_context_if_needed (runner )
18461897 try :
18471898 async with Aclosing (
18481899 runner .run_async (
@@ -1864,6 +1915,7 @@ async def run_agent(req: RunAgentRequest) -> list[Event]:
18641915 async def run_agent_sse (req : RunAgentRequest ) -> StreamingResponse :
18651916 stream_mode = StreamingMode .SSE if req .streaming else StreamingMode .NONE
18661917 runner = await self .get_runner_async (req .app_name )
1918+ _set_telemetry_context_if_needed (runner )
18671919
18681920 # Validate session existence before starting the stream.
18691921 # We check directly here instead of eagerly advancing the
@@ -2039,6 +2091,8 @@ async def run_agent_live(
20392091 return
20402092
20412093 await websocket .accept ()
2094+ runner_for_context = await self .get_runner_async (app_name )
2095+ _set_telemetry_context_if_needed (runner_for_context )
20422096
20432097 session = await self .session_service .get_session (
20442098 app_name = app_name , user_id = user_id , session_id = session_id
0 commit comments