At Quantlane, we write a lot of asyncio applications. We often find it useful to schedule tasks (usually coroutines) to run concurrently in the event loop. Asyncio provides the create_task function for that, available since Python 3.7 [1]:
task = asyncio.create_task(my_coroutine())
What happens to exceptions raised in tasks?
When we run asyncio.create_task(my_coroutine()), what happens when my_coroutine() raises an exception?
Consider this example [2]:
import asyncio import logging async def problem() -> None: await asyncio.sleep(1) logging.warning('Going to raise an exception now!') raise RuntimeError('Something went wrong') if __name__ == '__main__': logging.basicConfig( format = '▸ %(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s', level = logging.INFO, datefmt = '%H:%M:%S', ) loop = asyncio.get_event_loop() logging.info('Creating the problem task') loop.create_task(problem()) logging.info('Running the loop') try: loop.run_forever() except KeyboardInterrupt: logging.info('Closing the loop') loop.close() logging.info('Shutting down')
Running this, we get
$ python --version Python 3.8.2 $ python example1.py ▸ 14:11:56.991 example1.py:18 INFO Creating the problem task ▸ 14:11:56.992 example1.py:20 INFO Running the loop ▸ 14:11:57.995 example1.py:7 WARNING Going to raise an exception now! ▸ 14:11:57.996 base_events.py:1707 ERROR Task exception was never retrieved future: <Task finished name='Task-1' coro=<problem() done, defined at example1.py:5> exception=RuntimeError('Something went wrong')> Traceback (most recent call last): File "example1.py", line 8, in problem raise RuntimeError('Something went wrong') RuntimeError: Something went wrong ^C # <--- program interrupted by pressing Ctrl+C ▸ 14:12:01.361 example1.py:24 INFO Closing the loop ▸ 14:12:01.362 example1.py:26 INFO Shutting down
We can see that the exception was logged by asyncio (in asyncio/base_events.py, line 1707) as soon as it happened – one second after the start of the loop. This is good and in line with what we would expect from synchronous code or await'ed coroutines.
When you investigate that part of asyncio code you will find this is the default exception handler picking up on the task exception and logging it.
There is one major problem with asyncio's event loop error handler though: it's all too easy to write your code in a way that prevents the handler being called! Let's take a look.
The problem: task exceptions are (sometimes) only logged when the program terminates
We'll introduce an innocuous change on lines 19-20: let's store the task object returned by create_task and log it.
1 import asyncio 2 import logging 3 4 5 async def problem() -> None: 6 await asyncio.sleep(1) 7 logging.warning('Going to raise an exception now!') 8 raise RuntimeError('Something went wrong') 9 10 11 if __name__ == '__main__': 12 logging.basicConfig( 13 format = '▸ %(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s', 14 level = logging.INFO, 15 datefmt = '%H:%M:%S', 16 ) 17 loop = asyncio.get_event_loop() 18 logging.info('Creating the problem task') 19 task = loop.create_task(problem()) 20 logging.info('Created task = %r', task) 21 logging.info('Running the loop') 22 try: 23 loop.run_forever() 24 except KeyboardInterrupt: 25 logging.info('Closing the loop') 26 loop.close() 27 logging.info('Shutting down')
Observe carefully the sequence and timestamps of the logs when you run this:
$ python example2.py ▸ 14:14:57.877 example2.py:18 INFO Creating the problem task ▸ 14:14:57.877 example2.py:20 INFO Created task = <Task pending name='Task-1' coro=<problem() running at example2.py:5>> ▸ 14:14:57.877 example2.py:21 INFO Running the loop ▸ 14:14:58.882 example2.py:7 WARNING Going to raise an exception now! ^C # <--- program interrupted by pressing Ctrl+C ▸ 14:15:02.174 example2.py:25 INFO Closing the loop ▸ 14:15:02.175 example2.py:27 INFO Shutting down ▸ 14:15:02.180 base_events.py:1707 ERROR Task exception was never retrieved future: <Task finished name='Task-1' coro=<problem() done, defined at example2.py:5> exception=RuntimeError('Something went wrong')> Traceback (most recent call last): File "example2.py", line 8, in problem raise RuntimeError('Something went wrong') RuntimeError: Something went wrong
Notice that the exception was not logged when it happened, but only after the loop was closed and the program was about to terminate!
If this were a long-running program this would be a serious issue, as we would only learn about the exception a long long time after it happened.
Why does this happen? It's a fairly subtle issue related to garbage collection. It turns out the event loop error handler is only called on task exceptions when the Task object is freed from memory [3]. By creating a reference to the object on line 19, we prevented Python from deleting the task when it finished (with an exception). It had to hold on to it until the very end of the program where the task reference finally went out of scope. Only at that point the error handler got called and logged the exception.
Somewhat surprisingly, this was reported and marked as not-a-bug back in 2016. The argument is that if you care about the exception, you should have await task somewhere in your code. That would cause the exception to be raised in the place where you await the reference to your task object. We found this sometimes helps, but sometimes we expect the tasks to run for a long time and there is no obvious place where to await them. (Example: a 'background' task that periodically refreshes a cache.)
How to ensure task exceptions get logged immediately
Since Tasks are Future-like classes they also have callbacks, in particular, add_done_callback. This allows us to register a function that is called as soon as the task finishes – including the case when it finishes by raising an exception.
Let's try adding a handler right after we create the task:
logging.info('Creating the problem task') task = loop.create_task(problem()) task.add_done_callback(_handle_task_result)
This references a function called _handle_task_result. In this function we can test whether the task ended with an exception, and if so, log it. A convenient way to check is to call the result() method which re-raises the task exception, if there was one:
def _handle_task_result(task: asyncio.Task) -> None: try: task.result() except asyncio.CancelledError: pass # Task cancellation should not be logged as an error. except Exception: # pylint: disable=broad-except logging.exception('Exception raised by task = %r', task)
Running this, we see that the exception is again logged as soon as it happens 🎉
▸ 16:05:41.577 example3.py:27 INFO Creating the problem task ▸ 16:05:41.577 example3.py:30 INFO Created task = <Task pending name='Task-1' coro=<problem() running at example3.py:5> cb=[_handle_task_result() at example3.py:11]> ▸ 16:05:41.577 example3.py:31 INFO Running the loop ▸ 16:05:42.583 example3.py:7 WARNING Going to raise an exception now! ▸ 16:05:42.583 example3.py:17 ERROR Exception raised by task = <Task finished name='Task-1' coro=<problem() done, defined at example3.py:5> exception=RuntimeError('Something went wrong')> Traceback (most recent call last): File "example3.py", line 13, in _handle_task_result task.result() File "example3.py", line 8, in problem raise RuntimeError('Something went wrong') RuntimeError: Something went wrong ^C # <--- program interrupted by pressing Ctrl+C ▸ 16:05:44.949 example3.py:35 INFO Closing the loop ▸ 16:05:44.949 example3.py:37 INFO Shutting down
How we made this error handling reusable
Since long-running tasks are a common pattern in our code we created a utility function to set up all these error handlers for us. This is our task_logger.py module. It is MIT-licenced, so feel free to use and modify for your own needs:
from typing import Any, Coroutine, Optional, TypeVar, Tuple import asyncio import functools import logging T = TypeVar('T') def create_task( coroutine: Coroutine[Any, Any, T], *, logger: logging.Logger, message: str, message_args: Tuple[Any, ...] = (), loop: Optional[asyncio.AbstractEventLoop] = None, ) -> 'asyncio.Task[T]': # This type annotation has to be quoted for Python < 3.9, see https://www.python.org/dev/peps/pep-0585/ ''' This helper function wraps a ``loop.create_task(coroutine())`` call and ensures there is an exception handler added to the resulting task. If the task raises an exception it is logged using the provided ``logger``, with additional context provided by ``message`` and optionally ``message_args``. ''' if loop is None: loop = asyncio.get_running_loop() task = loop.create_task(coroutine) task.add_done_callback( functools.partial(_handle_task_result, logger = logger, message = message, message_args = message_args) ) return task def _handle_task_result( task: asyncio.Task, *, logger: logging.Logger, message: str, message_args: Tuple[Any, ...] = (), ) -> None: try: task.result() except asyncio.CancelledError: pass # Task cancellation should not be logged as an error. # Ad the pylint ignore: we want to handle all exceptions here so that the result of the task # is properly logged. There is no point re-raising the exception in this callback. except Exception: # pylint: disable=broad-except logger.exception(message, *message_args)
The full example code taking advantage of this follows. We simply use task_logger.create_task on line 22 and provide a bit of context to it.
1 import asyncio 2 import logging 3 4 import task_logger 5 6 7 async def problem() -> None: 8 await asyncio.sleep(1) 9 logging.warning('Going to raise an exception now!') 10 raise RuntimeError('Something went wrong') 11 12 13 if __name__ == '__main__': 14 logging.basicConfig( 15 format = '▸ %(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s', 16 level = logging.INFO, 17 datefmt = '%H:%M:%S', 18 ) 19 loop = asyncio.get_event_loop() 20 logging.info('Creating the problem task') 21 logger = logging.getLogger('task_logger') 22 task = task_logger.create_task(problem(), logger = logger, message = 'Task raised an exception', loop = loop) 23 logging.info('Created task = %r', task) 24 logging.info('Running the loop') 25 try: 26 loop.run_forever() 27 except KeyboardInterrupt: 28 logging.info('Closing the loop') 29 loop.close() 30 logging.info('Shutting down')
[1] | Before 3.7, you’d use ensure_future but there are few reasons to use that function anymore. |
[2] | All Python code observes Quantlane's code style, as auto-formatted by Orange. |
[3] | See the Task.__del__ implementation in Python 3.8. |