"""
This module implements remote procedure calls (RPC) on top of request-reply
channels (see :mod:`aiomas.channel`).
RPC connections are represented by instances of :class:`RpcClient` (one for
each side of a :class:`aiomas.channel.Channel`). They provide access to the
functions served by the remote side of the channel via :class:`Proxy`
instances. Optionally, they can provide their own RPC service (via
:func:`rpc_service`) so that the remote side can make calls as well.
An RPC service is defined by a :class:`Router`. A router resolves paths
requested by the remote side. It can also handle sub-routers (which allows you
to build hierarchies for nested calls) and is able to perform a reverse-lookup
of a router (mapping a fuction to its path).
Routers provide the functions and methods of dictionaries or class instances.
Dict routers can be created by passing a dictionary to :class:`Router`. For
classes, you create a :class:`Service` instance as `router` class attribute.
This creates a *Descriptor* which then creates a new router instance for each
class instance.
Functions that should be callable from the remote side must be decorated with
:func:`expose()`; :func:`Router.expose()` and :func:`Service.expose()` are
aliases for it.
"""
from asyncio import coroutine
import asyncio
import logging
import weakref
from . import channel
__all__ = [
'open_connection', 'start_server', 'rpc_service', 'expose',
'_handle_request',
'RoutingDict', 'Router', 'Service', 'RpcClient', 'Proxy',
]
logger = logging.getLogger(__name__)
@coroutine
[docs]def open_connection(addr, *, router=None, **kwds):
"""Return an :class:`RpcClient` connected to *addr*.
This is a convenience wrapper for :meth:`aiomas.channel.open_connection()`.
All keyword arguments *(kwds)* are forwared to it.
You can optionally pass a *router* to allow the remote site to call back
to us.
"""
if router is not None and type(router) is not Router:
raise ValueError('"%s" is not a valid router.' % router)
c = yield from channel.open_connection(addr, **kwds)
return RpcClient(c, router)
@coroutine
[docs]def start_server(addr, router, client_connected_cb=None, **kwds):
"""Start a server socket on *host:port* and create an RPC service with
the provided *handler* for each new client.
This is a convenience wrapper for :meth:`aiomas.channel.start_server()`.
All keyword arguments *(kwds)* are forwared to it.
*router* must be a :class:`Router` instance for the :func:`rpc_service()`
that is started for each new connection.
*client_connected_cb* is an optional callback that will be called with
with the :class:`RpcClient` instance for each new connection.
Raise a :exc:`ValueError` if *handler* is not decorated properly.
"""
if type(router) is not Router:
raise ValueError('"%s" is not a valid router.' % router)
def fac(channel):
"""Create an RPC client for each new connection and call the
*client_connected_cb* if there is one."""
rpc_cli = RpcClient(channel, router)
if client_connected_cb:
client_connected_cb(rpc_cli)
return rpc_cli
server = yield from channel.start_server(addr, fac, **kwds)
return server
@coroutine
[docs]def rpc_service(router, channel):
"""Serve the functions provided by the :class:`Router` *router* via the
:class:`~aiomas.channel.Channel` *channel*.
Forward errors raised by the handler to the caller.
Stop running when the connection closes.
"""
loop = channel._loop
try:
while True:
# Wait for a request
request = yield from channel.recv()
# Dispatch handling of the request to a sub task to make dead-locks
# less likely when multiple agents share the same connection.
asyncio.async(_handle_request(request, loop, router), loop=loop)
except (ConnectionError, asyncio.CancelledError):
pass
finally:
try:
channel.close()
except RuntimeError:
# May happen when the loop is already closed
pass
@coroutine
def _handle_request(request, loop, router):
"""Handle the *request*.
Resolve the path, execute the corresponding message and set the result
or exception.
"""
path, args, kwargs = request.content
# logger.debug('Request: [%s, %s, %s]' % (path, args, kwargs))
# Resolve requested path
try:
func = router.resolve(path)
except LookupError as exc:
yield from request.fail(exc)
return
# Call requested function
try:
res = func(*args, **kwargs)
if asyncio.iscoroutine(res):
res = yield from asyncio.async(res, loop=loop)
except Exception as e:
yield from request.fail(e) # pragma: no branch
else:
yield from request.reply(res)
[docs]def expose(func):
"""Decorator that enables RPC access to the decorated function.
*func* will not be wrapped but only gain an ``__rpc__`` attribute.
"""
if not hasattr(func, '__call__'):
raise ValueError('"%s" is not callable.' % func)
func.__rpc__ = True
return func
[docs]class RoutingDict:
"""Wrapper for dicts so that they can be used as RPC routers."""
__rpc__ = True
def __init__(self, dict=None):
self.dict = {} if dict is None else dict
"""The wrapped dict."""
self.router = Router(self)
"""The dict's router instance."""
self.__getrpc__ = self.dict.__getitem__
for key, val in self.dict.items():
# Iterate over all entries and look for objects with routers.
# Set *router* as parent to these sub-routers.
if hasattr(val, 'router'):
Router.set_sub_router(self.router, val.router, key)
[docs]class Router:
"""The Router resolves paths to functions provided by their object *obj*
(or its children). It can also perform a reverse lookup to get the path
of the router (and the router's *obj*).
The *obj* can be a class, an instance or a dict.
"""
def __init__(self, obj):
# Mark *obj* as node in the RPC hierarchy and and create an alias
# for accessing its child elements.
obj.__rpc__ = True
obj.__getrpc__ = obj.__getattribute__
self.obj = obj #: The object to which this router belongs to.
self.name = '' #: The name of the router (empty for root routers).
self.parent = None #: The parent router or ``None`` for root routers.
self._cache = {} # Maps already resolved paths to functions
@property
def path(self):
"""The path to this router (without trailing slash)."""
router = self
parts = []
while router.parent is not None:
# We go from child to root here
parts.append(router.name)
router = router.parent
return '/'.join(reversed(parts)) # Reverse to get root first
[docs] def resolve(self, path):
"""Resolve *path* and return the corresponding function.
*path* is a string with path components separated by */* (without
trailing slash).
Raise a :exc:`LookupError` if no handler function can be found for
*path* or if the function is not exposed (see :func:`expose()`).
"""
try:
obj = self._cache[path]
except KeyError:
parent = None
obj = self.obj
parts = path.split('/')
for i, name in enumerate(parts):
try:
parent, obj = obj, obj.__getrpc__(name)
except (AttributeError, KeyError):
raise LookupError('Name "%s" not found in "%s"' %
(name, '/'.join(parts[:i]))) from None
if not hasattr(obj, '__rpc__'):
cls = parent.__class__
raise LookupError('"%s.%s.%s" is not exposed' %
(cls.__module__, cls.__qualname__, name))
self._cache[path] = obj
return obj
expose = staticmethod(expose)
"""Alias for :func:`expose()`."""
[docs] def add(self, name):
"""Add the sub-router *name* (stored at ``self.obj.<name>``) to this
router.
Convenience wrapper for :meth:`set_sub_router`.
"""
router = getattr(self.obj, name).router
self.set_sub_router(router, name)
[docs] def set_sub_router(self, router, name):
"""Set *self* as parent for the *router* named *name*."""
if type(router) is not Router:
raise ValueError('"%s" is not a valid router.' % router)
if router.parent is not None:
raise ValueError('"%s" is already a sub router of "%s"' %
(router, router.parent))
router.name = name
router.parent = self
[docs]class Service:
"""A Data Descriptor that creates a new :class:`Router` instance for each
class instance to which it is set.
The attribute name for the Service should always be *router*::
class Spam:
router = aiomas.rpc.Service()
You can optionally pass a list with the attribute names of classes with
sub-routers. This required to build hierarchies of routers, e.g.::
class Eggs:
router = aiomas.rpc.Service()
class Spam:
router = aiomas.rpc.Service(['eggs'])
def __init__(self):
self.eggs = Eggs() # Instance with a sub-router
"""
def __init__(self, sub_routers=()):
self._sub_router_names = sub_routers
def __set__(self, instance, value):
"""Raise :exc:`AttributeError` to forbid overwriting this attribute."""
raise AttributeError('Read-only attribute.')
def __get__(self, instance, cls):
"""If accessed from the class, return this Service instance. If
accessed from an *instance*, return the :class:`Router` instance for
*instance*.
"""
if instance is None:
return self
if 'router' not in instance.__dict__:
# Create new Router for "instance" and add all sub-router:
router = instance.__dict__.setdefault('router', Router(instance))
for name in self._sub_router_names:
router.add(name)
return instance.__dict__['router']
expose = staticmethod(expose)
"""Alias for :func:`expose()`."""
[docs]class RpcClient:
"""The RpcClient provides proxy objects for remote calls via its
:attr:`remote` attribute.
*channel* is a :class:`~aiomas.channel.Channel` instance for communicating
with the remote side.
If *router* is not ``None``, it will also start its own RPC service so the
other side can make calls to us as well.
"""
def __init__(self, channel, router=None):
self.__channel = channel
self.__channel.codec.add_serializer(object, self._serialize_obj,
self._deserialize_obj)
self.__service = None
if router is not None:
self.__service = asyncio.async(rpc_service(router, channel),
loop=channel._loop)
@property
def channel(self):
"""The communication :class:`~aiomas.channel.Channel` of this instance.
"""
return self.__channel
@property
def service(self):
"""The RPC service process for this connection."""
return self.__service
@property
def remote(self):
"""A :class:`Proxy` for remote methods."""
return Proxy(self.__channel, '')
[docs] def close(self):
"""Close the connection."""
return self.__channel.close()
def _serialize_obj(self, obj):
try:
return obj.router.path
except AttributeError:
raise TypeError('"%r" object of type "%s" cannot be serialized.' %
(obj, type(obj))) from None
def _deserialize_obj(self, path):
return Proxy(self.__channel, path)
[docs]class Proxy:
"""Proxy object for remote objects and functions."""
def __init__(self, channel, path):
self._channel = channel
self._path = path
self._cache = weakref.WeakValueDictionary()
self._str = None
def __repr__(self):
return '<%s.%s at 0x%x>' % (self.__module__, self, id(self))
def __str__(self):
return self._str if self._str else '%s(%r, %r)' % (
self.__class__.__name__,
self._channel.get_extra_info('peername')[:2],
self._path)
[docs] def __getattr__(self, name):
"""Return a new proxy for *name*."""
if name in self._cache:
proxy = self._cache[name]
else:
path = name if not self._path else self._path + '/' + name
proxy = self.__class__(self._channel, path)
self._cache[name] = proxy
return proxy
[docs] def __call__(self, *args, **kwargs):
"""Call the remote method represented by this proxy and return its
result.
The result is a future, so you need to ``yield from`` it in order to
get the actual return value (or exception).
"""
if not self._path:
raise AttributeError('No RPC function name specified.')
return self._channel.send((self._path, args, kwargs))
def __eq__(self, other):
return self._channel is other._channel and self._path == other._path
def __hash__(self):
return hash(self._channel) + hash(self._path)