Source code for aiomas.util

"""
This module contains some utility functions.

"""
import asyncio

__all__ = ['async', 'run']


[docs]def async(coro_or_future, ignore_cancel=True, loop=None): """Run :func:`asyncio.async()` with *coro_or_future* and set a callback that instantly raises all exceptions. If *ignore_cancel* is left ``True``, no exception is raised if the task was canceled. If you also want to raise the ``CancelledError``, set the flag to ``False.``. Return an :class:`asyncio.Task` object. The difference between this function and :func:`asyncio.async()` subtle, but important if an error is raised by the task: :func:`asyncio.async()` returns a future (:class:`asyncio.Task` is a subclass of :class:`asyncio.Future`) for the task that you created. By the time that future goes out of scope, asyncio checks if someone was interested in its result or not. If the result was never retrieved, the exception is printed to *stderr*. If you call it like ``asyncio.async(mytask())`` (note that we don't keep a reference to the future here), an exception in *mytask* will pre printed immediately when the task is done. If, however, we store a reference to the future (``fut = asyncio.async(mytask())``), the exception only gets printed when ``fut`` goes out of scope. That means if, for example, an :class:`~aiomas.agent.Agent` creates a task and stores it as an instance attribute, our system may keep running for a long time after the exception has occured (or even block forever) and we won't see any stacktrace. This is because the reference to the task is still there and we could, in theory, still retrieve the exception from there. Since this can make debugging very hard, this method simply registers a callback to the future. The callback will try to get the result from the future when it is done and will thus print any exceptions immediately. """ task = asyncio.async(coro_or_future, loop=loop) def cb(f): if f.cancelled() and ignore_cancel: return f.result() # Let the future raise the exception task.add_done_callback(cb) return task
[docs]def run(until=None): """Run the event loop forever or until the task/future *until* is finished. This is an alias to asyncio's ``run_forever()`` if *until* is ``None`` and to ``run_until_complete()`` if not. """ import asyncio loop = asyncio.get_event_loop() if until is None: loop.run_forever() else: return loop.run_until_complete(until)