Source code for aiomas.subproc

This module helps you to :func:`start()` an agent container in a new
subprocess.  The new container will have a :class:`Manager` agent that allows
the master process to spawn other agents in the new container.

The following example demonstrate how you can build a nice CLI with the `click
<>`_ around this module. The script will start you
a container with an :class:`~aiomas.clocks.ExternalClock` and the
:class:`~aiomas.codecs.MsgPackBlosc` codec:

.. code-block:: python3

   import logging

   import aiomas
   import arrow
   import click

   def validate_addr(ctx, param, value):
           host, port = value.rsplit(':', 1)
           return (host, int(port))
       except ValueError as e:
           raise click.BadParameter(e)

   def validate_start_date(ctx, param, value):
           arrow.get(value)  # Check if the date can be parsed
       except arrow.parser.ParserError as e:
           raise click.BadParameter(e)
       return value

   @click.option('--start-date', required=True,
                 help='Start date for the agents (ISO-8601 compliant, e.g.: '
   @click.option('--log-level', '-l', default='info', show_default=True,
                 type=click.Choice(['debug', 'info', 'warning', 'error',
                 help='Log level for the MAS')
   @click.argument('addr', metavar='HOST:PORT', callback=validate_addr)
   def main(addr, start_date, log_level):
       logging.basicConfig(level=getattr(logging, log_level.upper()))
       clock = aiomas.ExternalClock(start_date, init_time=-1)
       codec = aiomas.codecs.MsgPackBlosc
       task = aiomas.subproc.start(addr, clock=clock, codec=codec)

   if __name__ == '__main__':

Example usage: :command:`python
--start-date=2010-03-27T00:00:00+01:00 localhost:5556`.

.. note::

   You should use ``sys.executable`` instead of just ``'python'`` when you
   start a new subprocess from within a Python script to make sure you use the
   correct (same) interpreter.

import asyncio
import logging

from . import agent, rpc, util

logger = logging.getLogger(__name__)

[docs]async def start(addr, **container_kwargs): """`Coroutine <>`_ that starts a container with a :class:`Manager` agent. The agent will connect to *addr* ``('host', port)`` and wait for commands to spawn new agents within its container. The *container_kwargs* will be passed to :meth:`aiomas.agent.Container.create()` factory function. This coroutine finishes after :meth:`Manager.stop()` was called or when a :exc:`KeyboardInterrupt` is raised. """ container_kwargs.update(as_coro=True) container = await agent.Container.create(addr, **container_kwargs) try: manager = Manager(container) await manager.stop_received except KeyboardInterrupt:'Execution interrupted by user') finally: await container.shutdown(as_coro=True)
[docs]class Manager(agent.Agent): """An agent that can start other agents within its container. If the container uses an :class:`~aiomas.clocks.ExternalClock`, it can also set the time for the container's clock. """ def __init__(self, container): super().__init__(container) self.stop_received = container.loop.create_future()
[docs] @rpc.expose async def spawn(self, qualname, *args, **kwargs): """Create a new instance of an agent and return a proxy to it and its address. *qualname* is a string defining a class (or factory method/coroutine) for instantiating the agent (see :func:`aiomas.util.obj_from_str()` for details). *args* and *kwargs* get passed to this callable as positional and keyword arguemnts respectively. This is an exposed `coroutine <>`_. """ callable = util.obj_from_str(qualname) if asyncio.iscoroutinefunction(callable): agent = await callable(self.container, *args, **kwargs) else: agent = callable(self.container, *args, **kwargs) logger.debug('Spawned %s(%s)', agent.__class__.__name__, agent) return agent, agent.addr
[docs] @rpc.expose def set_time(self, time): """Set the agent's container's time to *time*. This only works if the container uses an :class:`~aiomas.clocks.ExternalClock`. This is an exposed function. """ self.container.clock.set_time(time)
[docs] @rpc.expose def stop(self): """Triggers the *stop_received* future of this agent causing its container process to shutodwn and terminate. This is an exposed function. """ self.stop_received.set_result(True)