Source code for aiomas.agent

"""
This module implements the base class for agents (:class:`Agent`) and
containers for agents (:class:`Container`).

Every agent must live in a container.  A container can contain one ore more
agents.  Containers are responsible for making connections to other containers
and agents.  They also provide a factory function for spawning new agent
instances and registering them with the container.

Thus, the :class:`Agent` base class is very light-weight.  It only has a name,
a reference to its container and an RPC router (see :mod:`aiomas.rpc`).

"""
import asyncio
import collections
import weakref
import socket
import ssl as sslmod

from . import channel, clocks, local_queue, rpc, util


__all__ = ['SSLCerts', 'Container', 'Agent']


PROTOCOLS = {
    'tcp',  # TCP sockets
    'ipc',  # Inter Process Communication with Unix domain sockets
    'local',  # LocalQueue transport
}


SSLCerts = collections.namedtuple('SSLCerts', 'cafile, certfile, keyfile')
""":func:`~collections.namedtuple` storing the names of a CA file, a
certificate file and the associated private key file.

See also :func:`aiomas.util.make_ssl_server_context()` and
:func:`aiomas.util.make_ssl_client_context()`.

"""


def _get_base_url(addr):
    # Get base URL for agents (tcp or ipc)
    if isinstance(addr, tuple):
        host, port = addr
        if host in [None, '', '::', '0.0.0.0']:
            host = socket.getfqdn()
        base_url = '%s://%s:%s/' % ('tcp', host, port)
    elif isinstance(addr, local_queue.LocalQueue):
        base_url = 'local://%s/' % addr.queue_id
    else:
        base_url = '%s://[%s]/' % ('ipc', addr)

    return base_url


def _make_ssl_contexts(ssl):
    """Derive and return a tuple *(Server SSLContext, Client SSLContext)* from
    *ssl*.

    *ssl* may either be a :class:`SSLCerts` instance or a tuple of two
    :class:`ssl.SSLContext` instances.

    In other cases, return ``(None, None)``.

    """
    if isinstance(ssl, SSLCerts):
        ssl_server_ctx = util.make_ssl_server_context(**ssl._asdict())
        ssl_client_ctx = util.make_ssl_client_context(**ssl._asdict())
    elif isinstance(ssl, tuple):
        if (len(ssl) != 2 or
                not isinstance(ssl[0], sslmod.SSLContext) or
                not isinstance(ssl[1], sslmod.SSLContext)):
            raise TypeError('"ssl" must contain two "ssl.SSLContext" '
                            'instances; one for the server and one for '
                            'the client.')
        ssl_server_ctx = ssl[0]
        ssl_client_ctx = ssl[1]
    else:
        ssl_server_ctx = None
        ssl_client_ctx = None

    return ssl_server_ctx, ssl_client_ctx


[docs]class Container: """Container for agents. You should not instantiate containers directly but use the :meth:`create()` method/coroutine instead. This makes sure that the container's server socket is fully operational when it is created. The container allows its agents to create connections to other agents (via :meth:`connect()`). In order to destroy a container and close all of its sockets, call :meth:`shutdown()`. """ router = rpc.Service(['agents'])
[docs] @classmethod def create(cls, addr, *, clock=None, codec=None, extra_serializers=None, ssl=None, as_coro=False, loop=None): """Instantiate a container and create a server socket for it. This function is a classmethod and `coroutine <https://docs.python.org/3/library/asyncio-task.html#coroutine>`_. :param addr: is the address that the server socket is bound to. It may be a ``(host, port)`` tuple for a TCP socket, a path for a Unix domain socket, or a *LocalQueue* instance as returned by the :func:`aiomas.local_queue.get_queue()` function. **TCP sockets** If ``host`` is ``'0.0.0.0'`` or ``'::'``, the server is bound to all available IPv4 *or* IPv6 interfaces respectively. If ``host`` is ``None`` or ``''``, the server is bound to all available IPv4 *and* IPv6 interfaces. In these cases, the machine's FQDN (see :func:`socket.getfqdn()`) should be resolvable and point to that machine as it will be used for the agent's addresses. If ``host`` is a simple (IPv4 or IPv6) IP address, it will be used for the agent's addresses as is. **LocalQueue** In contrast to TCP, multiple LocalQueue connections between containers (within the same thread and OS process) send and receive message in a deterministic order, which is useful for testing and debugging. LocalQueue instances should be retrieved via the :func:`aiomas.local_queue.get_queue()` function (which also available as ``aiomas.get_queue()``). This function always returns the same instance for a given queue ID. :param clock: can be an instance of :class:`~aiomas.clocks.BaseClock`. It allows you to decouple the container's (and thus, its agent's) time from the system clock. This makes it easier to integrate your system with other simulators that may provide a clock for you or to let your MAS run as fast as possible. By default, the real-time :class:`~aiomas.clocks.AsyncioClock` will be used. :param codec: can be a :class:`~aiomas.codecs.Codec` subclass (not an instance!). :class:`~aiomas.codecs.JSON` is used by default. :param extra_serializers: is an optional list of extra serializers for the codec. The list entries need to be callables that return a tuple with the arguments for :meth:`~aiomas.codecs.Codec.add_serializer()`. :param ssl: allows you to enable TLS for all incoming and outgoing TCP connections. It may either be an :class:`SSLCerts` instance or a tuple containing two :class:`~ssl.SSLContext` instances, where the first one will be used for the server socket, the second one for client sockets. :param as_coro: must be set to ``True`` if the event loop is already running when you call this method. This function then returns a coroutine that you need to ``await`` in order to get the container. By default it will block until the server has been started and return the container. :param loop: The asyncio event loop to use. If ``None``, the default one will be used. :return: a fully initialized :class:`Container` instance if *async* is ``False`` or else a coroutine returning the instance when it is done. Invocation examples:: # Synchronous: container = Container.create(...) # Asynchronous: container = await Container.create(..., as_coro=True) """ base_url = _get_base_url(addr) ssl_server_ctx, ssl_client_ctx = _make_ssl_contexts(ssl) # Get default codec and clock if none were provided if loop is None: loop = asyncio.get_event_loop() if codec is None: codec = channel.DEFAULT_CODEC if clock is None: clock = clocks.AsyncioClock(loop=loop) # Prepend the Arrow date serializer to the list of serializers if extra_serializers is None: extra_serializers = [] extra_serializers = [util.arrow_serializer] + extra_serializers # Additional keyword arguments for connecting to other containers connect_kwargs = { 'codec': codec, 'extra_serializers': extra_serializers, 'ssl': ssl_client_ctx, } # Actually instantiate the container and start the server socket async def _start(): container = cls(base_url, loop, clock, connect_kwargs) tcp_server = await rpc.start_server( addr, container, client_connected_cb=container._add_to_con_cache, codec=codec, extra_serializers=extra_serializers, ssl=ssl_server_ctx, loop=loop, ) container.set_server(tcp_server) return container if as_coro: return _start() else: return util.run(_start(), loop=loop)
def __init__(self, base_url, loop, clock, connect_kwargs): self._tcp_server = None self._rpc_cons = set() self._loop = loop self._base_url = base_url self._clock = clock self._connect_kwargs = connect_kwargs # The agents managed by this container. # The agents' routers are subrouters of the container's root router. self.agents = rpc.ServiceDict() # Caches self._connections_out_futs = {} # Futures for outgoing connections self._connections_out = {} # RPC connections to containers by address self._remote_agent_futs = {} # Futures for remote agent validation self._remote_agents = {} # Validated remote agents by connection def __str__(self): return '%s(%r)' % (self.__class__.__name__, self._base_url) @property def loop(self): """The event loop the container is using.""" return self._loop @property def clock(self): """The clock of the container. Instance of :class:`aiomas.clocks.BaseClock`.""" return self._clock def set_server(self, server): if self._tcp_server is not None: raise RuntimeError('Server already set.') self._tcp_server = server
[docs] async def connect(self, url, timeout=0): """Connect to the argent available at *url* and return a proxy to it. *url* is a string ``<protocol>://<addr>//<agent-id>`` (e.g., ``'tcp://localhost:5555/0'``). With a *timeout* of 0 (the default), there will only be one connection attempt before an error is raised (:exc:`ConnectionRefusedError` for TCP sockets and LocalQueue, :exc:`FileNotFoundError` for Unix domain sockets). If you set *timeout* to a number > 0 or ``None``, this function will try to connect repeatedly for at most that many seconds (or indefinitely) before an error is raised. Use this if the remote agent's container may not yet exist. This function is a `coroutine <https://docs.python.org/3/library/asyncio-task.html#coroutine>`_. """ addr, aid = self._parse_url(url) rpc_con = await self._open_connection(addr, timeout) remote_agent = await self._validate_aid(aid, rpc_con, addr, url) return remote_agent
[docs] def shutdown(self, as_coro=False): """Close the container's server socket and the RPC services for all outgoing TCP connections. If *async* is left to ``False``, this method calls :meth:`asyncio.AbstractEventLoop.run_until_complete()` in order to wait until all sockets are closed. Set *async* to ``True`` if the event loop is already running (e.g., because you are in a coroutine). The return value then is a `coroutine <https://docs.python.org/3/library/asyncio-task.html#coroutine>`_ that you need to ``await`` in order to actually shut the container down:: await container.shutdown(as_coro=True) """ async def _shutdown(): # Close all outgoing connections futs = [] for con in self._connections_out.values(): futs.append(con.close()) await asyncio.gather(*futs, loop=self._loop) if self._tcp_server: # Request closing the server socket and cancel the services self._tcp_server.close() for con in self._rpc_cons: con.service.cancel() # Wait for server and services to actually terminate await asyncio.gather( self._tcp_server.wait_closed(), *[c.service for c in self._rpc_cons], return_exceptions=True, loop=self._loop, ) self._tcp_server = None self._rpc_cons = None if as_coro: return _shutdown() else: util.run(_shutdown(), loop=self._loop)
[docs] @router.expose def validate_aid(self, aid): """Return the class name for the agent represented by *aid* if it exists or ``None``.""" agents = self.agents.dict if aid in agents: return agents[aid].__class__.__name__
def _add_to_con_cache(self, rpc_con): """Client-connected-callback for the server socket that adds the RPC connection *rpc_con* to the set of connections.""" self._rpc_cons.add(rpc_con) @staticmethod def _parse_url(url): """Parse the agent *url* and return a ``((host, port), agent)`` tuple. Raise a :exc:`ValueError` if the URL cannot be parsed. """ try: proto, addr_aid = url.split('://', 1) assert proto in PROTOCOLS, '{} not in {}'.format(proto, PROTOCOLS) if proto == 'tcp': addr, aid = addr_aid.split('/', 1) host, port = addr.rsplit(':', 1) if host[0] == '[' and host[-1] == ']': # IPv6 addresses may be surrounded by [] host = host[1:-1] addr = (host, int(port)) elif proto == 'ipc': assert addr_aid[0] == '[' addr, aid = addr_aid[1:].split(']/', 1) elif proto == 'local': queue_id, aid = addr_aid.split('/', 1) addr = local_queue.get_queue(queue_id) aid = aid assert aid, 'No agent ID specified.' except (AssertionError, IndexError, ValueError) as e: raise ValueError('Cannot parse agent URL "{}": {}'.format(url, e)) return addr, aid async def _open_connection(self, addr, timeout): if addr in self._connections_out: # Return cached connection rpc_con = self._connections_out[addr] elif addr in self._connections_out_futs: # Wait for ongoing connection attempt rpc_con = await self._connections_out_futs[addr] else: # Open new connection fut = self._loop.create_future() self._connections_out_futs[addr] = fut rpc_con = await rpc.open_connection( addr, rpc_service=self, timeout=timeout, loop=self._loop, **self._connect_kwargs, ) # Put connection into the cache self._rpc_cons.add(rpc_con) self._connections_out[addr] = rpc_con # Trigger future and remove it from the cache fut.set_result(rpc_con) self._connections_out_futs.pop(addr) # Initialize caches for remote agents self._remote_agents[rpc_con] = weakref.WeakValueDictionary() self._remote_agent_futs[rpc_con] = {} return rpc_con async def _validate_aid(self, aid, rpc_con, addr, url): remote_agents = self._remote_agents[rpc_con] remote_agent_futs = self._remote_agent_futs[rpc_con] if aid in remote_agents: remote_agent = remote_agents[aid] elif aid in remote_agent_futs: remote_agent = await remote_agent_futs[aid] else: fut = self._loop.create_future() remote_agent_futs[aid] = fut remote_type = await rpc_con.remote.validate_aid(aid) if remote_type is None: raise ConnectionError('Agent "{}" does not exist in ' 'Container({!r})'.format(aid, addr)) remote_agent = getattr(rpc_con.remote.agents, aid) remote_agent._str = '{}Proxy({!r})'.format(remote_type, url) remote_agents[aid] = remote_agent fut.set_result(remote_agent) remote_agent_futs.pop(aid) return remote_agent def _register(self, agent): """Register *agent* with the container.""" aid = str(len(self.agents.dict)) url = self._base_url + aid self.agents.dict[aid] = agent self.agents.router.set_sub_router(agent.router, aid) return url
[docs]class Agent: """Base class for all agents.""" router = rpc.Service() """Descriptor that creates an RPC :class:`~aiomas.rpc.Router` for every agent instance. You can override this in a sub-class if you need to. (Usually, you don't.) """ def __init__(self, container): if not isinstance(container, Container): raise TypeError('"container" must be a "Container" instance but ' 'is {}'.format(container)) addr = container._register(self) self.__container = container self.__addr = addr self.__name = '{}({!r})'.format(self.__class__.__name__, addr) def __str__(self): return self.__name @property def container(self): """The :class:`Container` that the agent lives in.""" return self.__container @property def addr(self): """The agent's address.""" return self.__addr