66import json
77import json .decoder
88import asyncio
9+ import os
910
1011import warnings
1112from fastapi import Request , Response
2930SCOPE_UNIQUE_ID = 'trace_unique_id'
3031SCOPE_CONTAINER_METADATA_COLLECTED = 'container_metadata'
3132SCOPE_IGNORE_REQUEST = 'ignore_request'
33+ IS_ASYNC_MODE = False
34+
35+ def _initialize_async_mode (mode ):
36+ global IS_ASYNC_MODE # pylint: disable=global-statement
37+ IS_ASYNC_MODE = mode
38+
39+ _initialize_async_mode (os .getenv (
40+ 'EPSAGON_FASTAPI_ASYNC_MODE' , 'FALSE' ) == 'TRUE' )
3241
3342def _handle_wrapper_params (_args , kwargs , original_request_param_name ):
34- """
43+ """f
3544 Handles the sync/async given parameters - gets the request object
3645 If original handler is set to get the Request object, then getting the
3746 request using this param. Otherwise, trying to get the Request object using
@@ -222,6 +231,71 @@ def _fastapi_handler(
222231 raised_err
223232 )
224233
234+ async def _async_fastapi_handler (
235+ original_handler ,
236+ request ,
237+ status_code ,
238+ args ,
239+ kwargs
240+ ):
241+ """
242+ FastAPI generic handler - for callbacks executed by a threadpool
243+ :param original_handler: the wrapped original handler
244+ :param request: the given handler request
245+ :param status_code: the default configured response status code.
246+ Can be None when called by exception handlers wrapper, as there's
247+ no status code configuration for exception handlers.
248+ """
249+ has_setup_succeeded = False
250+ should_ignore_request = False
251+
252+ try :
253+ epsagon_scope , trace = _setup_handler (request )
254+ if epsagon_scope and trace :
255+ has_setup_succeeded = True
256+ if (
257+ ignore_request ('' , request .url .path .lower ())
258+ or
259+ is_ignored_endpoint (request .url .path .lower ())
260+ ):
261+ should_ignore_request = True
262+ epsagon_scope [SCOPE_IGNORE_REQUEST ] = True
263+
264+ except Exception : # pylint: disable=broad-except
265+ has_setup_succeeded = False
266+
267+ if not has_setup_succeeded or should_ignore_request :
268+ return await original_handler (* args , ** kwargs )
269+
270+ created_runner = False
271+ response = None
272+ if not trace .runner :
273+ if not _setup_trace_runner (epsagon_scope , trace , request ):
274+ return await original_handler (* args , ** kwargs )
275+
276+ raised_err = None
277+ try :
278+ response = await original_handler (* args , ** kwargs )
279+ except Exception as exception : # pylint: disable=W0703
280+ raised_err = exception
281+ finally :
282+ try :
283+ epsagon .trace .trace_factory .unset_thread_local_unique_id ()
284+ except Exception : # pylint: disable=broad-except
285+ pass
286+ # no need to update request body if runner already created before
287+ if created_runner :
288+ _extract_request_body (trace , request )
289+
290+ return _handle_response (
291+ epsagon_scope ,
292+ response ,
293+ status_code ,
294+ trace ,
295+ raised_err
296+ )
297+
298+
225299
226300# pylint: disable=too-many-statements
227301def _wrap_handler (dependant , status_code ):
@@ -230,9 +304,6 @@ def _wrap_handler(dependant, status_code):
230304 """
231305 original_handler = dependant .call
232306 is_async = asyncio .iscoroutinefunction (original_handler )
233- if is_async :
234- # async endpoints are not supported
235- return
236307
237308 original_request_param_name = dependant .request_param_name
238309 if not original_request_param_name :
@@ -249,7 +320,23 @@ def wrapped_handler(*args, **kwargs):
249320 original_handler , request , status_code , args , kwargs
250321 )
251322
252- dependant .call = wrapped_handler
323+ async def async_wrapped_handler (* args , ** kwargs ):
324+ """
325+ Asynchronous wrapper handler
326+ """
327+ request : Request = _handle_wrapper_params (
328+ args , kwargs , original_request_param_name
329+ )
330+ return await _async_fastapi_handler (
331+ original_handler , request , status_code , args , kwargs
332+ )
333+
334+ if is_async and IS_ASYNC_MODE :
335+ # async endpoints
336+ dependant .call = async_wrapped_handler
337+
338+ elif not is_async and not IS_ASYNC_MODE :
339+ dependant .call = wrapped_handler
253340
254341
255342def route_class_wrapper (wrapped , instance , args , kwargs ):
@@ -280,7 +367,7 @@ def exception_handler_wrapper(original_handler):
280367 Wraps an exception handler
281368 """
282369 is_async = asyncio .iscoroutinefunction (original_handler )
283- if is_async :
370+ if is_async or IS_ASYNC_MODE :
284371 # async handlers are not supported
285372 return original_handler
286373
@@ -323,12 +410,16 @@ async def server_call_wrapper(wrapped, _instance, args, kwargs):
323410
324411 trace = None
325412 try :
326- epsagon .trace .trace_factory .switch_to_multiple_traces ()
413+ if IS_ASYNC_MODE :
414+ epsagon .trace .trace_factory .switch_to_async_tracer ()
415+ else :
416+ epsagon .trace .trace_factory .switch_to_multiple_traces ()
327417 unique_id = str (uuid .uuid4 ())
328418 trace = epsagon .trace .trace_factory .get_or_create_trace (
329419 unique_id = unique_id
330420 )
331421 trace .prepare ()
422+
332423 scope [EPSAGON_MARKER ] = {
333424 SCOPE_UNIQUE_ID : unique_id ,
334425 }
0 commit comments