Source code for aiomas.util

"""
This module contains some utility functions.

"""
import asyncio
import ssl

import arrow

__all__ = [
    'arrow_serializer',
    'async',
    'run',
    'make_ssl_client_context',
    'make_ssl_server_context',
]


[docs]def arrow_serializer(): """Return a serializer for *arrow* dates. The return value is an argument tuple for :meth:`aiomas.codecs.Codec.add_serializer()`. """ return arrow.Arrow, str, arrow.get
[docs]def async(coro_or_future, ignore_cancel=True, loop=None): """Run :func:`asyncio.async()` with *coro_or_future* and set a callback that instantly raises all exceptions. If *ignore_cancel* is left ``True``, no exception is raised if the task was canceled. If you also want to raise the ``CancelledError``, set the flag to ``False.``. Return an :class:`asyncio.Task` object. The difference between this function and :func:`asyncio.async()` subtle, but important if an error is raised by the task: :func:`asyncio.async()` returns a future (:class:`asyncio.Task` is a subclass of :class:`asyncio.Future`) for the task that you created. By the time that future goes out of scope, asyncio checks if someone was interested in its result or not. If the result was never retrieved, the exception is printed to *stderr*. If you call it like ``asyncio.async(mytask())`` (note that we don't keep a reference to the future here), an exception in *mytask* will pre printed immediately when the task is done. If, however, we store a reference to the future (``fut = asyncio.async(mytask())``), the exception only gets printed when ``fut`` goes out of scope. That means if, for example, an :class:`~aiomas.agent.Agent` creates a task and stores it as an instance attribute, our system may keep running for a long time after the exception has occured (or even block forever) and we won't see any stacktrace. This is because the reference to the task is still there and we could, in theory, still retrieve the exception from there. Since this can make debugging very hard, this method simply registers a callback to the future. The callback will try to get the result from the future when it is done and will thus print any exceptions immediately. """ task = asyncio.async(coro_or_future, loop=loop) def cb(f): if f.cancelled() and ignore_cancel: return f.result() # Let the future raise the exception task.add_done_callback(cb) return task
[docs]def run(until=None): """Run the event loop forever or until the task/future *until* is finished. This is an alias to asyncio's ``run_forever()`` if *until* is ``None`` and to ``run_until_complete()`` if not. """ import asyncio loop = asyncio.get_event_loop() if until is None: loop.run_forever() else: return loop.run_until_complete(until)
[docs]def make_ssl_server_context(cafile, certfile, keyfile): """Return an :class:`ssl.SSLContext` that can be used by a server socket. The server will use the certificate in *certfile* and private key in *keyfile* (both in PEM format) to authenticate itself. It requires clients to also authenticate themselves. Their certificates will be validated with the root CA certificate in *cafile*. It will use *TLS 1.2* with *ECDH+AESGCM* encryption. ECDH keys won't be reused in distinct SSL sessions. Compression is disabled. """ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ctx.set_ciphers('ECDH+AESGCM') ctx.load_cert_chain(certfile=certfile, keyfile=keyfile) ctx.verify_mode = ssl.CERT_REQUIRED ctx.load_verify_locations(cafile=cafile) ctx.options |= ssl.OP_SINGLE_ECDH_USE ctx.options |= ssl.OP_NO_COMPRESSION return ctx
[docs]def make_ssl_client_context(cafile, certfile, keyfile): """Return an :class:`ssl.SSLContext` that can be used by a client socket. It uses the root CA certificate in *cafile* to validate the server's certificate. It will also check the server's hostname. The client will use the certificate in *certfile* and private key in *keyfile* (both in PEM format) to authenticate itself. It will use *TLS 1.2* with *ECDH+AESGCM* encryption. """ ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ctx.set_ciphers('ECDH+AESGCM') ctx.verify_mode = ssl.CERT_REQUIRED ctx.load_verify_locations(cafile=cafile) ctx.check_hostname = True ctx.load_cert_chain(certfile=certfile, keyfile=keyfile) return ctx