"""
This module implements the base class for agents (:class:`Agent`) and
containers for agents (:class:`Container`).
Every agent must live in a container. A container can contain one ore more
agents. Containers are responsible for making connections to other containers
and agents. They also provide a factory function for spawning new agent
instances and registering them with the container.
Thus, the :class:`Agent` base class is very light-weight. It only has a name,
a reference to its container and an RPC router (see :mod:`aiomas.rpc`).
"""
import asyncio
import collections
import weakref
import socket
import ssl as sslmod
from . import channel, clocks, rpc, util
__all__ = ['SSLCerts', 'Container', 'Agent']
PROTOCOLS = {
'tcp', # TCP sockets
'ipc', # Inter Process Communication with Unix domain sockets
}
SSLCerts = collections.namedtuple('SSLCerts', 'cafile, certfile, keyfile')
""":func:`~collections.namedtuple` storing the names of a CA file, a
certificate file and the associated private key file.
See also :func:`aiomas.util.make_ssl_server_context()` and
:func:`aiomas.util.make_ssl_client_context()`.
"""
[docs]class Container:
"""Container for agents.
It can instantiate new agents via :meth:`spawn()` and can create
connections to other agents (via :meth:`connect()`).
In order to destroy a container and close all of its sockets, call
:meth:`shutdown()`.
You should not instantiate codecs directly but use the :meth:`create()`
coroutine instead.
"""
router = rpc.Service(['agents'])
@classmethod
[docs] def create(cls, addr, *, clock=None, codec=None, extra_serializers=None,
ssl=None, async=False):
"""Instantiate a container and create a server socket for it.
This function is a classmethod and coroutine.
:param addr:
is the address that the server socket is bound to. It may be
a ``(host, port)`` tuple or a path for a Unix domain socket.
If ``host`` is ``'0.0.0.0'`` / ``'::'``, the server is bound to all
available IPv4 or IPv6 interfaces respectively. If ``host`` is
``None`` or ``''``, the server is bound to all available IPv4 and
IPv6 interfaces. In these cases, the machine's FQDN (see
:func:`socket.getfqdn()`) should be resolvable and point to that
machine as it will be used for the agent's addresses.
If ``host`` is a simple (IPv4 or IPv6) IP address, it will be used
for the agent's addresses as is.
:param clock:
can be an instance of :class:`~aiomas.clocks.BaseClock`.
It allows you to decouple the container's (and thus, its agent's)
time from the system clock. This makes it easier to integrate your
system with other simulators that may provide a clock for you or to
let your MAS run as fast as possible.
By default, the real-time :class:`~aiomas.clocks.AsyncioClock` will
be used.
:param codec:
can be a :class:`~aiomas.codecs.Codec` subclass (not an instance!).
:class:`~aiomas.codecs.JSON` is used by default.
:param extra_serializers:
is an optional list of extra serializers for the codec. The list
entries need to be callables that return a tuple with the arguments
for :meth:`~aiomas.codecs.Codec.add_serializer()`.
:param ssl:
allows you to enable TLS for all incoming and outgoing TCP
connections. It may either be an :class:`SSLCerts` instance or
a tuple containing two :class:`~ssl.SSLContext` instances, where
the first one will be used for the server socket, the second one
for client sockets.
:param async:
must be set to ``True`` if the event loop is already running when
you call this method. This function then returns a coroutine that
you need to ``yield from`` in order to get the container. By
default it will block until the server has been started and return
the container.
:return:
a fully initialized :class:`Container` instance if *async* is
``False`` or else a coroutine returning the instance when it is
done.
Invocation examples::
# Synchronous:
container = Container.create(...)
# Asynchronous:
container = yield from Container.create(..., async=True)
"""
# Get base URL for agents (tcp or ipc)
host = None
port = None
if type(addr) is tuple:
host, port = addr
if host in [None, '', '::', '0.0.0.0']:
host = socket.getfqdn()
base_url = '%s://%s:%s/' % ('tcp', host, port)
else:
base_url = '%s://[%s]/' % ('ipc', addr)
# Get default codec and clock if none were provided
if codec is None:
codec = channel.DEFAULT_CODEC
if clock is None:
clock = clocks.AsyncioClock()
# Prepend the Arrow date serializer to the list of serializers
if extra_serializers is None:
extra_serializers = []
extra_serializers = [util.arrow_serializer] + extra_serializers
# Check/Create SSL contexts for TLS
if type(ssl) is SSLCerts:
ssl_server_ctx = util.make_ssl_server_context(**ssl._asdict())
ssl_client_ctx = util.make_ssl_server_context(**ssl._asdict())
elif type(ssl) is tuple:
if (len(ssl) != 2 or
type(ssl[0]) is not sslmod.SSLContext or
type(ssl[1]) is not sslmod.SSLContext):
raise TypeError('"ssl" must contain two "ssl.SSLContext" '
'instances; one for the server and one for '
'the client.')
ssl_server_ctx = ssl[0]
ssl_client_ctx = ssl[1]
else:
ssl_server_ctx = None
ssl_client_ctx = None
# Additional keyword arguments for connecting to other containers
connect_kwargs = {
'codec': codec,
'extra_serializers': extra_serializers,
'ssl': ssl_client_ctx,
}
# Actually instantiate the container and start the server socket
@asyncio.coroutine
def _start():
container = cls(base_url, clock, connect_kwargs)
tcp_server = yield from rpc.start_server(
addr,
container.router,
client_connected_cb=container._add_to_con_cache,
codec=codec,
extra_serializers=extra_serializers,
ssl=ssl_server_ctx)
container.set_server(tcp_server)
return container
if async:
return _start()
else:
return util.run(_start())
def __init__(self, base_url, clock, connect_kwargs):
self._tcp_server = None
self._rpc_cons = set()
self._base_url = base_url
self._clock = clock
self._connect_kwargs = connect_kwargs
# The agents managed by this container.
# The agents' routers are subrouters of the container's root router.
self.agents = rpc.RoutingDict()
# Caches
self._connections_out_futs = {} # Futures for outgoing connections
self._connections_out = {} # RPC connections to containers by address
self._remote_agent_futs = {} # Futures for remote agent validation
self._remote_agents = {} # Validated remote agents by connection
def __str__(self):
return '%s(%r)' % (self.__class__.__name__, self._base_url)
@property
def clock(self):
"""The clock of the container. Instance of
:class:`aiomas.clocks.BaseClock`."""
return self._clock
def set_server(self, server):
if self._tcp_server is not None:
raise RuntimeError('Server already set.')
self._tcp_server = server
@asyncio.coroutine
[docs] def connect(self, url):
"""Connect to the argent available at *url* and return a proxy to it.
*url* is a string ``<protocol>://<addr>//<agent-id>`` (e.g.,
``'tcp://localhost:5555/0'``).
"""
addr, aid = self._parse_url(url)
rpc_con = yield from self._open_connection(addr)
remote_agent = yield from self._validate_aid(aid, rpc_con, addr, url)
return remote_agent
[docs] def shutdown(self, async=False):
"""Close the container's server socket and the RPC services for all
outgoing TCP connections.
If *async* is left to ``False``, this method calls
:meth:`asyncio.BaseEventLoop.run_until_complete()` in order to wait
until all sockets are closed.
If the event loop is already running when you call this method, set
*async* to ``True``. The return value then is a coroutine that you need
to ``yield from`` in order to actually shut the container down::
yield from container.shutdown(async=True)
"""
@asyncio.coroutine
def _shutdown():
# Close all outgoing connections
for con in self._connections_out.values():
con.close()
if self._tcp_server:
# Request closing the server socket and cancel the services
self._tcp_server.close()
for con in self._rpc_cons:
con.service.cancel()
# Wait for server and services to actually terminate
yield from asyncio.gather(self._tcp_server.wait_closed(),
*[c.service for c in self._rpc_cons])
self._tcp_server = None
self._rpc_cons = None
if async:
return _shutdown()
else:
util.run(_shutdown())
@router.expose
[docs] def validate_aid(self, aid):
"""Return the class name for the agent represented by *aid* if it
exists or ``None``."""
agents = self.agents.dict
if aid in agents:
return agents[aid].__class__.__name__
def _add_to_con_cache(self, rpc):
"""Client-connected-callback for the server socket that adds the RPC
connection *rpc* to the set of connections."""
self._rpc_cons.add(rpc)
def _parse_url(self, url):
"""Parse the agent *url* and return a ``((host, port), agent)`` tuple.
Raise a :exc:`ValueError` if the URL cannot be parsed.
"""
try:
proto, addr_aid = url.split('://', 1)
assert proto in PROTOCOLS, '%s not in %s' % (proto, PROTOCOLS)
if proto == 'tcp':
addr, aid = addr_aid.split('/', 1)
host, port = addr.rsplit(':', 1)
if host[0] == '[' and host[-1] == ']':
# IPv6 addresses may be surrounded by []
host = host[1:-1]
addr = (host, int(port))
elif proto == 'ipc':
assert addr_aid[0] == '['
addr, aid = addr_aid[1:].split(']/', 1)
assert aid, 'No agent ID specified.'
except (AssertionError, IndexError, ValueError) as e:
raise ValueError('Cannot parse agent URL "%s": %s' % (url, e))
return addr, aid
@asyncio.coroutine
def _open_connection(self, addr):
if addr in self._connections_out:
# Return cached connection
rpc_con = self._connections_out[addr]
elif addr in self._connections_out_futs:
# Wait for ongoing connection attempt
rpc_con = yield from self._connections_out_futs[addr]
else:
# Open new connection
fut = asyncio.Future()
self._connections_out_futs[addr] = fut
rpc_con = yield from rpc.open_connection(
addr, router=self.router, **self._connect_kwargs)
# Put connection into the cache
self._rpc_cons.add(rpc_con)
self._connections_out[addr] = rpc_con
# Trigger future and remove it from the cache
fut.set_result(rpc_con)
self._connections_out_futs.pop(addr)
# Initialize caches for remote agents
self._remote_agents[rpc_con] = weakref.WeakValueDictionary()
self._remote_agent_futs[rpc_con] = {}
return rpc_con
@asyncio.coroutine
def _validate_aid(self, aid, rpc_con, addr, url):
remote_agents = self._remote_agents[rpc_con]
remote_agent_futs = self._remote_agent_futs[rpc_con]
if aid in remote_agents:
remote_agent = remote_agents[aid]
elif aid in remote_agent_futs:
remote_agent = yield from remote_agent_futs[aid]
else:
fut = asyncio.Future()
remote_agent_futs[aid] = fut
remote_type = yield from rpc_con.remote.validate_aid(aid)
if remote_type is None:
raise ConnectionError('Agent "%s" does not exist in '
'Container(%r)' % (aid, addr))
remote_agent = getattr(rpc_con.remote.agents, aid)
remote_agent._str = '%sProxy(%r)' % (remote_type, url)
remote_agents[aid] = remote_agent
fut.set_result(remote_agent)
remote_agent_futs.pop(aid)
return remote_agent
def _register(self, agent):
"""Register *agent* with the container."""
aid = str(len(self.agents.dict))
url = self._base_url + aid
self.agents.dict[aid] = agent
self.agents.router.set_sub_router(agent.router, aid)
return url
[docs]class Agent:
"""Base class for all agents."""
router = rpc.Service()
"""Descriptor that creates an RPC :class:`~aiomas.rpc.Router` for every
agent instance.
You can override this in a sub-class if you need to. (Usually, you don't.)
"""
def __init__(self, container):
if type(container) != Container:
raise TypeError('"container" must be a "Container" instance but '
'is %s' % container)
addr = container._register(self)
self.__container = container
self.__addr = addr
self.__name = '%s(%r)' % (self.__class__.__name__, addr)
def __str__(self):
return self.__name
@property
def container(self):
"""The :class:`Container` that the agent lives in."""
return self.__container
@property
def addr(self):
"""The agent's address."""
return self.__addr