2828import traceback
2929import sys
3030import warnings
31+ import weakref
3132
3233from . import compat
3334from . import coroutines
@@ -242,6 +243,17 @@ def __init__(self):
242243 self ._task_factory = None
243244 self ._coroutine_wrapper_set = False
244245
246+ if hasattr (sys , 'get_asyncgen_hooks' ):
247+ # Python >= 3.6
248+ # A weak set of all asynchronous generators that are
249+ # being iterated by the loop.
250+ self ._asyncgens = weakref .WeakSet ()
251+ else :
252+ self ._asyncgens = None
253+
254+ # Set to True when `loop.shutdown_asyncgens` is called.
255+ self ._asyncgens_shutdown_called = False
256+
245257 def __repr__ (self ):
246258 return ('<%s running=%s closed=%s debug=%s>'
247259 % (self .__class__ .__name__ , self .is_running (),
@@ -333,13 +345,59 @@ def _check_closed(self):
333345 if self ._closed :
334346 raise RuntimeError ('Event loop is closed' )
335347
348+ def _asyncgen_finalizer_hook (self , agen ):
349+ self ._asyncgens .discard (agen )
350+ if not self .is_closed ():
351+ self .create_task (agen .aclose ())
352+
353+ def _asyncgen_firstiter_hook (self , agen ):
354+ if self ._asyncgens_shutdown_called :
355+ warnings .warn (
356+ "asynchronous generator {!r} was scheduled after "
357+ "loop.shutdown_asyncgens() call" .format (agen ),
358+ ResourceWarning , source = self )
359+
360+ self ._asyncgens .add (agen )
361+
362+ @coroutine
363+ def shutdown_asyncgens (self ):
364+ """Shutdown all active asynchronous generators."""
365+ self ._asyncgens_shutdown_called = True
366+
367+ if self ._asyncgens is None or not len (self ._asyncgens ):
368+ # If Python version is <3.6 or we don't have any asynchronous
369+ # generators alive.
370+ return
371+
372+ closing_agens = list (self ._asyncgens )
373+ self ._asyncgens .clear ()
374+
375+ shutdown_coro = tasks .gather (
376+ * [ag .aclose () for ag in closing_agens ],
377+ return_exceptions = True ,
378+ loop = self )
379+
380+ results = yield from shutdown_coro
381+ for result , agen in zip (results , closing_agens ):
382+ if isinstance (result , Exception ):
383+ self .call_exception_handler ({
384+ 'message' : 'an error occurred during closing of '
385+ 'asynchronous generator {!r}' .format (agen ),
386+ 'exception' : result ,
387+ 'asyncgen' : agen
388+ })
389+
336390 def run_forever (self ):
337391 """Run until stop() is called."""
338392 self ._check_closed ()
339393 if self .is_running ():
340394 raise RuntimeError ('Event loop is running.' )
341395 self ._set_coroutine_wrapper (self ._debug )
342396 self ._thread_id = threading .get_ident ()
397+ if self ._asyncgens is not None :
398+ old_agen_hooks = sys .get_asyncgen_hooks ()
399+ sys .set_asyncgen_hooks (firstiter = self ._asyncgen_firstiter_hook ,
400+ finalizer = self ._asyncgen_finalizer_hook )
343401 try :
344402 while True :
345403 self ._run_once ()
@@ -349,6 +407,8 @@ def run_forever(self):
349407 self ._stopping = False
350408 self ._thread_id = None
351409 self ._set_coroutine_wrapper (False )
410+ if self ._asyncgens is not None :
411+ sys .set_asyncgen_hooks (* old_agen_hooks )
352412
353413 def run_until_complete (self , future ):
354414 """Run until the Future is done.
0 commit comments