Source code for aiomas.agent

"""
This module implements the base class for agents (:class:`Agent`) and
containers for agents (:class:`Container`).

Every agent must live in a container.  A container can contain one ore more
agents.  Containers are responsible for making connections to other containers
and agents.  They also provide a factory function for spawning new agent
instances and registering them with the container.

Thus, the :class:`Agent` base class is very light-weight.  It only has a name,
a reference to its container and an RPC router (see :mod:`aiomas.rpc`).

"""
import asyncio
import collections
import weakref
import socket
import ssl as sslmod

from . import channel, clocks, rpc, util

__all__ = ['SSLCerts', 'Container', 'Agent']


PROTOCOLS = {
    'tcp',  # TCP sockets
    'ipc',  # Inter Process Communication with Unix domain sockets
}


SSLCerts = collections.namedtuple('SSLCerts', 'cafile, certfile, keyfile')
""":func:`~collections.namedtuple` storing the names of a CA file, a
certificate file and the associated private key file.

See also :func:`aiomas.util.make_ssl_server_context()` and
:func:`aiomas.util.make_ssl_client_context()`.

"""


[docs]class Container: """Container for agents. It can instantiate new agents via :meth:`spawn()` and can create connections to other agents (via :meth:`connect()`). In order to destroy a container and close all of its sockets, call :meth:`shutdown()`. You should not instantiate codecs directly but use the :meth:`create()` coroutine instead. """ router = rpc.Service(['agents']) @classmethod
[docs] def create(cls, addr, *, clock=None, codec=None, extra_serializers=None, ssl=None, async=False): """Instantiate a container and create a server socket for it. This function is a classmethod and coroutine. :param addr: is the address that the server socket is bound to. It may be a ``(host, port)`` tuple or a path for a Unix domain socket. If ``host`` is ``'0.0.0.0'`` / ``'::'``, the server is bound to all available IPv4 or IPv6 interfaces respectively. If ``host`` is ``None`` or ``''``, the server is bound to all available IPv4 and IPv6 interfaces. In these cases, the machine's FQDN (see :func:`socket.getfqdn()`) should be resolvable and point to that machine as it will be used for the agent's addresses. If ``host`` is a simple (IPv4 or IPv6) IP address, it will be used for the agent's addresses as is. :param clock: can be an instance of :class:`~aiomas.clocks.BaseClock`. It allows you to decouple the container's (and thus, its agent's) time from the system clock. This makes it easier to integrate your system with other simulators that may provide a clock for you or to let your MAS run as fast as possible. By default, the real-time :class:`~aiomas.clocks.AsyncioClock` will be used. :param codec: can be a :class:`~aiomas.codecs.Codec` subclass (not an instance!). :class:`~aiomas.codecs.JSON` is used by default. :param extra_serializers: is an optional list of extra serializers for the codec. The list entries need to be callables that return a tuple with the arguments for :meth:`~aiomas.codecs.Codec.add_serializer()`. :param ssl: allows you to enable TLS for all incoming and outgoing TCP connections. It may either be an :class:`SSLCerts` instance or a tuple containing two :class:`~ssl.SSLContext` instances, where the first one will be used for the server socket, the second one for client sockets. :param async: must be set to ``True`` if the event loop is already running when you call this method. This function then returns a coroutine that you need to ``yield from`` in order to get the container. By default it will block until the server has been started and return the container. :return: a fully initialized :class:`Container` instance if *async* is ``False`` or else a coroutine returning the instance when it is done. Invocation examples:: # Synchronous: container = Container.create(...) # Asynchronous: container = yield from Container.create(..., async=True) """ # Get base URL for agents (tcp or ipc) host = None port = None if type(addr) is tuple: host, port = addr if host in [None, '', '::', '0.0.0.0']: host = socket.getfqdn() base_url = '%s://%s:%s/' % ('tcp', host, port) else: base_url = '%s://[%s]/' % ('ipc', addr) # Get default codec and clock if none were provided if codec is None: codec = channel.DEFAULT_CODEC if clock is None: clock = clocks.AsyncioClock() # Prepend the Arrow date serializer to the list of serializers if extra_serializers is None: extra_serializers = [] extra_serializers = [util.arrow_serializer] + extra_serializers # Check/Create SSL contexts for TLS if type(ssl) is SSLCerts: ssl_server_ctx = util.make_ssl_server_context(**ssl._asdict()) ssl_client_ctx = util.make_ssl_server_context(**ssl._asdict()) elif type(ssl) is tuple: if (len(ssl) != 2 or type(ssl[0]) is not sslmod.SSLContext or type(ssl[1]) is not sslmod.SSLContext): raise TypeError('"ssl" must contain two "ssl.SSLContext" ' 'instances; one for the server and one for ' 'the client.') ssl_server_ctx = ssl[0] ssl_client_ctx = ssl[1] else: ssl_server_ctx = None ssl_client_ctx = None # Additional keyword arguments for connecting to other containers connect_kwargs = { 'codec': codec, 'extra_serializers': extra_serializers, 'ssl': ssl_client_ctx, } # Actually instantiate the container and start the server socket @asyncio.coroutine def _start(): container = cls(base_url, clock, connect_kwargs) tcp_server = yield from rpc.start_server( addr, container.router, client_connected_cb=container._add_to_con_cache, codec=codec, extra_serializers=extra_serializers, ssl=ssl_server_ctx) container.set_server(tcp_server) return container if async: return _start() else: return util.run(_start())
def __init__(self, base_url, clock, connect_kwargs): self._tcp_server = None self._rpc_cons = set() self._base_url = base_url self._clock = clock self._connect_kwargs = connect_kwargs # The agents managed by this container. # The agents' routers are subrouters of the container's root router. self.agents = rpc.RoutingDict() # Caches self._connections_out_futs = {} # Futures for outgoing connections self._connections_out = {} # RPC connections to containers by address self._remote_agent_futs = {} # Futures for remote agent validation self._remote_agents = {} # Validated remote agents by connection def __str__(self): return '%s(%r)' % (self.__class__.__name__, self._base_url) @property def clock(self): """The clock of the container. Instance of :class:`aiomas.clocks.BaseClock`.""" return self._clock def set_server(self, server): if self._tcp_server is not None: raise RuntimeError('Server already set.') self._tcp_server = server @asyncio.coroutine
[docs] def connect(self, url): """Connect to the argent available at *url* and return a proxy to it. *url* is a string ``<protocol>://<addr>//<agent-id>`` (e.g., ``'tcp://localhost:5555/0'``). """ addr, aid = self._parse_url(url) rpc_con = yield from self._open_connection(addr) remote_agent = yield from self._validate_aid(aid, rpc_con, addr, url) return remote_agent
[docs] def shutdown(self, async=False): """Close the container's server socket and the RPC services for all outgoing TCP connections. If *async* is left to ``False``, this method calls :meth:`asyncio.BaseEventLoop.run_until_complete()` in order to wait until all sockets are closed. If the event loop is already running when you call this method, set *async* to ``True``. The return value then is a coroutine that you need to ``yield from`` in order to actually shut the container down:: yield from container.shutdown(async=True) """ @asyncio.coroutine def _shutdown(): # Close all outgoing connections for con in self._connections_out.values(): con.close() if self._tcp_server: # Request closing the server socket and cancel the services self._tcp_server.close() for con in self._rpc_cons: con.service.cancel() # Wait for server and services to actually terminate yield from asyncio.gather(self._tcp_server.wait_closed(), *[c.service for c in self._rpc_cons]) self._tcp_server = None self._rpc_cons = None if async: return _shutdown() else: util.run(_shutdown())
@router.expose
[docs] def validate_aid(self, aid): """Return the class name for the agent represented by *aid* if it exists or ``None``.""" agents = self.agents.dict if aid in agents: return agents[aid].__class__.__name__
def _add_to_con_cache(self, rpc): """Client-connected-callback for the server socket that adds the RPC connection *rpc* to the set of connections.""" self._rpc_cons.add(rpc) def _parse_url(self, url): """Parse the agent *url* and return a ``((host, port), agent)`` tuple. Raise a :exc:`ValueError` if the URL cannot be parsed. """ try: proto, addr_aid = url.split('://', 1) assert proto in PROTOCOLS, '%s not in %s' % (proto, PROTOCOLS) if proto == 'tcp': addr, aid = addr_aid.split('/', 1) host, port = addr.rsplit(':', 1) if host[0] == '[' and host[-1] == ']': # IPv6 addresses may be surrounded by [] host = host[1:-1] addr = (host, int(port)) elif proto == 'ipc': assert addr_aid[0] == '[' addr, aid = addr_aid[1:].split(']/', 1) assert aid, 'No agent ID specified.' except (AssertionError, IndexError, ValueError) as e: raise ValueError('Cannot parse agent URL "%s": %s' % (url, e)) return addr, aid @asyncio.coroutine def _open_connection(self, addr): if addr in self._connections_out: # Return cached connection rpc_con = self._connections_out[addr] elif addr in self._connections_out_futs: # Wait for ongoing connection attempt rpc_con = yield from self._connections_out_futs[addr] else: # Open new connection fut = asyncio.Future() self._connections_out_futs[addr] = fut rpc_con = yield from rpc.open_connection( addr, router=self.router, **self._connect_kwargs) # Put connection into the cache self._rpc_cons.add(rpc_con) self._connections_out[addr] = rpc_con # Trigger future and remove it from the cache fut.set_result(rpc_con) self._connections_out_futs.pop(addr) # Initialize caches for remote agents self._remote_agents[rpc_con] = weakref.WeakValueDictionary() self._remote_agent_futs[rpc_con] = {} return rpc_con @asyncio.coroutine def _validate_aid(self, aid, rpc_con, addr, url): remote_agents = self._remote_agents[rpc_con] remote_agent_futs = self._remote_agent_futs[rpc_con] if aid in remote_agents: remote_agent = remote_agents[aid] elif aid in remote_agent_futs: remote_agent = yield from remote_agent_futs[aid] else: fut = asyncio.Future() remote_agent_futs[aid] = fut remote_type = yield from rpc_con.remote.validate_aid(aid) if remote_type is None: raise ConnectionError('Agent "%s" does not exist in ' 'Container(%r)' % (aid, addr)) remote_agent = getattr(rpc_con.remote.agents, aid) remote_agent._str = '%sProxy(%r)' % (remote_type, url) remote_agents[aid] = remote_agent fut.set_result(remote_agent) remote_agent_futs.pop(aid) return remote_agent def _register(self, agent): """Register *agent* with the container.""" aid = str(len(self.agents.dict)) url = self._base_url + aid self.agents.dict[aid] = agent self.agents.router.set_sub_router(agent.router, aid) return url
[docs]class Agent: """Base class for all agents.""" router = rpc.Service() """Descriptor that creates an RPC :class:`~aiomas.rpc.Router` for every agent instance. You can override this in a sub-class if you need to. (Usually, you don't.) """ def __init__(self, container): if type(container) != Container: raise TypeError('"container" must be a "Container" instance but ' 'is %s' % container) addr = container._register(self) self.__container = container self.__addr = addr self.__name = '%s(%r)' % (self.__class__.__name__, addr) def __str__(self): return self.__name @property def container(self): """The :class:`Container` that the agent lives in.""" return self.__container @property def addr(self): """The agent's address.""" return self.__addr