"""
This module implements remote procedure calls (RPC) on top of request-reply
channels (see :mod:`aiomas.channel`).
RPC connections are represented by instances of :class:`RpcClient` (one for
each side of a :class:`aiomas.channel.Channel`). They provide access to the
functions served by the peer via :class:`Proxy` instances. Optionally, they
can provide their own RPC service so that the peer can make calls as well.
An RPC service is an object with a ``router`` attribute which is an instance of
:class:`Router`. A router resolves paths requested by the peer. It can also
handle sub-routers (which allows you to build hierarchies for nested calls) and
is able to perform a reverse-lookup of a router (mapping a fuction to its
path).
Routers an be attached to both, classes and dictionaries with functions.
Dictionaires need to be wrapped with a :class:`ServiceDict`. Classes need to
have a :class:`Service` class attribute named ``router``. :class:`Service` is
a descriptor which creates a :class:`Router` for every instance of that class.
Functions that should be callable from the remote side must be decorated with
:func:`expose()`; :func:`Router.expose()` and :func:`Service.expose()` are
aliases for it.
"""
import asyncio
import logging
import weakref
from . import channel, exceptions
__all__ = [
'open_connection', 'start_server', 'rpc_service_process', 'expose',
'_handle_request',
'Service', 'ServiceDict', 'Router', 'RpcClient', 'Proxy',
]
logger = logging.getLogger(__name__)
[docs]async def open_connection(addr, *, rpc_service=None, **kwds):
"""Return an :class:`RpcClient` connected to *addr*.
This is a convenience wrapper for :meth:`aiomas.channel.open_connection()`.
All keyword arguments *(kwds)* are forwared to it.
You can optionally pass a *rpc_service* to allow the peer to call back to
us.
This function is a `coroutine
<https://docs.python.org/3/library/asyncio-task.html#coroutine>`_.
"""
if rpc_service is not None and not _is_rpc_service(rpc_service):
raise ValueError('"{}" is not a valid RPC service'.format(rpc_service))
c = await channel.open_connection(addr, **kwds)
return RpcClient(c, rpc_service)
[docs]async def start_server(addr, rpc_service, client_connected_cb=None, **kwds):
"""Start a server socket on *host:port* and create an RPC service with
the provided *handler* for each new client.
This is a convenience wrapper for :meth:`aiomas.channel.start_server()`.
All keyword arguments *(kwds)* are forwared to it.
*rpc_service* must be an RPC service (an object with a ``router`` attribute
that is an instance of :class:`Router`).
*client_connected_cb* is an optional callback that will be called with
with the :class:`RpcClient` instance for each new connection.
Raise a :exc:`ValueError` if *handler* is not decorated properly.
This function is a `coroutine
<https://docs.python.org/3/library/asyncio-task.html#coroutine>`_.
"""
if not _is_rpc_service(rpc_service):
raise ValueError('"{}" is not a valid RPC service.'
.format(rpc_service))
def fac(channel):
"""Create an RPC client for each new connection and call the
*client_connected_cb* if there is one."""
rpc_cli = RpcClient(channel, rpc_service)
if client_connected_cb:
client_connected_cb(rpc_cli)
return rpc_cli
server = await channel.start_server(addr, fac, **kwds)
return server
[docs]async def rpc_service_process(rpc_client, router, channel):
"""RPC service process for a connection *rpc_lient*.
Serves the functions provided by the :class:`Router` *router* via the
:class:`~aiomas.channel.Channel` *channel*.
Forward errors raised by the handler to the caller.
Stop running when the connection closes.
This function is a `coroutine
<https://docs.python.org/3/library/asyncio-task.html#coroutine>`_.
"""
loop = channel._loop
try:
while True:
# Wait for a request
request = await channel.recv()
# Dispatch handling of the request to a sub task to make dead-locks
# less likely when multiple agents share the same connection.
loop.create_task(_handle_request(request, loop, router))
except asyncio.CancelledError:
pass
except ConnectionError as ce:
rpc_client._connection_reset(ce)
finally:
try:
await channel.close()
except RuntimeError:
# May happen when the loop is already closed
pass
async def _handle_request(request, loop, router):
"""Handle the *request*.
Resolve the path, execute the corresponding message and set the result
or exception.
"""
path, args, kwargs = request.content
# logger.debug('Request: [%s, %s, %s]' % (path, args, kwargs))
# Resolve requested path
try:
func = router.resolve(path)
except LookupError as exc:
await request.fail(exc)
return
# Call requested function
try:
res = func(*args, **kwargs)
if asyncio.iscoroutine(res):
res = await loop.create_task(res)
reply = request.reply
except Exception as e:
reply = request.fail
res = e
try:
await reply(res)
except ConnectionResetError:
pass
def _is_rpc_service(obj):
if not hasattr(obj, 'router'):
return False
if not isinstance(obj.router, Router):
return False
return True
[docs]def expose(func):
"""Decorator that enables RPC access to the decorated function.
*func* will not be wrapped but only gain an ``__rpc__`` attribute.
"""
if not hasattr(func, '__call__'):
raise ValueError('"{}" is not callable.'.format(func))
func.__rpc__ = True
return func
[docs]class ServiceDict:
"""Wrapper for dicts so that they can be used as RPC routers."""
__rpc__ = True
def __init__(self, dict=None):
self.dict = {} if dict is None else dict
"""The wrapped dict."""
self.router = Router(self)
"""The dict's router instance."""
self.__getrpc__ = self.dict.__getitem__
for key, val in self.dict.items():
# Iterate over all entries and look for objects with routers.
# Set *router* as parent to these sub-routers.
if hasattr(val, 'router'):
Router.set_sub_router(self.router, val.router, key)
[docs]class Service:
"""A Data Descriptor that creates a new :class:`Router` instance for each
class instance to which it is set.
The attribute name for the Service should always be *router*::
class Spam:
router = aiomas.rpc.Service()
You can optionally pass a list with the attribute names of classes with
sub-routers. This required to build hierarchies of routers, e.g.::
class Eggs:
router = aiomas.rpc.Service()
class Spam:
router = aiomas.rpc.Service(['eggs'])
def __init__(self):
self.eggs = Eggs() # Instance with a sub-router
"""
def __init__(self, sub_routers=()):
self._sub_router_names = sub_routers
def __set__(self, instance, value):
"""Raise :exc:`AttributeError` to forbid overwriting this attribute."""
raise AttributeError('Read-only attribute.')
def __get__(self, instance, cls):
"""If accessed from the class, return this Service instance. If
accessed from an *instance*, return the :class:`Router` instance for
*instance*.
"""
if instance is None:
return self
if 'router' not in instance.__dict__:
# Create new Router for "instance" and add all sub-router:
router = instance.__dict__.setdefault('router', Router(instance))
for name in self._sub_router_names:
router.add(name)
return instance.__dict__['router']
expose = staticmethod(expose)
"""Alias for :func:`expose()`."""
[docs]class Router:
"""The Router resolves paths to functions provided by their object *obj*
(or its children). It can also perform a reverse lookup to get the path
of the router (and the router's *obj*).
The *obj* can be a class, an instance or a dict.
"""
def __init__(self, obj):
# Mark *obj* as node in the RPC hierarchy and and create an alias
# for accessing its child elements.
obj.__rpc__ = True
obj.__getrpc__ = obj.__getattribute__
self.obj = obj #: The object to which this router belongs to.
self.name = '' #: The name of the router (empty for root routers).
self.parent = None #: The parent router or ``None`` for root routers.
self._cache = {} # Maps already resolved paths to functions
@property
def path(self):
"""The path to this router (without trailing slash)."""
router = self
parts = []
while router.parent is not None:
# We go from child to root here
parts.append(router.name)
router = router.parent
return '/'.join(reversed(parts)) # Reverse to get root first
[docs] def resolve(self, path):
"""Resolve *path* and return the corresponding function.
*path* is a string with path components separated by */* (without
trailing slash).
Raise a :exc:`LookupError` if no handler function can be found for
*path* or if the function is not exposed (see :func:`expose()`).
"""
try:
obj = self._cache[path]
except KeyError:
parent = None
obj = self.obj
parts = path.split('/')
for i, name in enumerate(parts):
try:
parent, obj = obj, obj.__getrpc__(name)
except (AttributeError, KeyError):
raise LookupError('Name "{}" not found in "{}"'.format(
name, '/'.join(parts[:i]))) from None
if not hasattr(obj, '__rpc__'):
cls = parent.__class__
raise LookupError('"{}.{}.{}" is not exposed'.format(
cls.__module__, cls.__qualname__, name))
self._cache[path] = obj
return obj
expose = staticmethod(expose)
"""Alias for :func:`expose()`."""
[docs] def add(self, name):
"""Add the sub-router *name* (stored at ``self.obj.<name>``) to this
router.
Convenience wrapper for :meth:`set_sub_router`.
"""
router = getattr(self.obj, name).router
self.set_sub_router(router, name)
[docs] def set_sub_router(self, router, name):
"""Set *self* as parent for the *router* named *name*."""
if type(router) is not Router:
raise ValueError('"{}" is not a valid router.'.format(router))
if router.parent is not None:
raise ValueError('"{}" is already a sub service of "{}"'
.format(router.obj, router.parent.obj))
router.name = name
router.parent = self
[docs]class RpcClient:
"""The RpcClient provides proxy objects for remote calls via its
:attr:`remote` attribute.
*channel* is a :class:`~aiomas.channel.Channel` instance for communicating
with the remote side.
If *rpc_service* is not ``None``, it will also start its own RPC service so
the peer can call the functions we provide.
"""
def __init__(self, channel, rpc_service=None):
self.__channel = channel
self.__channel.codec.add_serializer(object, self._serialize_obj,
self._deserialize_obj)
self.__root_router = None
self.__service = None
self.__connection_reset_callback = None
if rpc_service is not None:
self.__root_router = rpc_service.router
self.__service = channel._loop.create_task(
rpc_service_process(self, rpc_service.router, channel)
)
@property
def channel(self):
"""The communication :class:`~aiomas.channel.Channel` of this instance.
"""
return self.__channel
@property
def service(self):
"""The RPC service process for this connection."""
return self.__service
@property
def remote(self):
"""A :class:`Proxy` for remote methods."""
return Proxy(self.__channel, '')
[docs] def on_connection_reset(self, callback):
"""Add a *callback* that gets called if the peer closes the connection
and thus causing the :attr:`service` process to abort.
*callback* is a callable with a single argument, the exception that the
:attr:`service` process raises if the connection is reset by the peer.
If this method is called multiple times, override the current callback
with the new one. If *callback* is ``None``, delete the current
callback.
Raise a :exc:`ValueError` if *callback* is neither callable nor
``None``.
Raise a :exc:`RuntimeError` if this instance has not service task
running.
"""
if self.__service is None:
raise RuntimeError('This {} instance has no RPC service running.'
.format(self.__class__.__name__))
if not (callback is None or hasattr(callback, '__call__')):
raise ValueError('{!r} must be "None" or callable but is neither'
.format(callback))
self.__connection_reset_callback = callback
[docs] async def close(self):
"""`Coroutine
<https://docs.python.org/3/library/asyncio-task.html#coroutine>`_ that
closes the connection and waits for all sub tasks to finish."""
await self.__channel.close()
if self.__service is not None:
if not self.__service.done():
self.__service.cancel()
await self.__service
def _connection_reset(self, exc):
"""Callback that is executed when a :exc:`ConnectionError` is raised
within :attr:`service`.
Using the *connection reset callback* (also see
:meth:`on_connection_reset()`), the user can check if the connection
reset was intentional or an error and react accordingly.
"""
if self.__connection_reset_callback is not None:
self.__connection_reset_callback(exc)
def _serialize_obj(self, obj):
"""Fallback serializer for all objects for which no other serializer
was found.
It will only serialize valid RPC services and only if "self" has a
service process running and the RPC service is associated with it.
Else, raise a :exc:`SerializationError`.
"""
# Check if "obj" is a valid RPC service
if not _is_rpc_service(obj):
raise exceptions.SerializationError(
'No serializer found for type "{}"'.format(type(obj)))
# Check if we have a service running:
if self.__root_router is None:
raise exceptions.SerializationError(
'No RPC service running for this side of the connection. '
'Cannot send serice proxy.')
# Check if "obj.router" is self.__root_router or one of its children
router = obj.router
while router.parent is not None:
router = router.parent
if router is not self.__root_router:
raise exceptions.SerializationError(
'RPC service "{!r}" is not exposed for this connection'
.format(obj))
return obj.router.path
def _deserialize_obj(self, path):
return Proxy(self.__channel, path)
[docs]class Proxy:
"""Proxy object for remote objects and functions."""
def __init__(self, channel, path):
self._channel = channel
self._path = path
self._cache = weakref.WeakValueDictionary()
self._str = None
def __repr__(self):
return '<{}.{} at 0x{:x}>'.format(self.__module__, self, id(self))
def __str__(self):
return self._str if self._str else '{}({!r}, {!r})'.format(
self.__class__.__name__,
self._channel.get_extra_info('peername')[:2],
self._path)
[docs] def __getattr__(self, name):
"""Return a new proxy for *name*."""
if name in self._cache:
proxy = self._cache[name]
else:
path = name if not self._path else '{}/{}'.format(self._path, name)
proxy = self.__class__(self._channel, path)
self._cache[name] = proxy
return proxy
[docs] def __call__(self, *args, **kwargs):
"""Call the remote method represented by this proxy and return its
result.
The result is a future, so you need to ``await`` it in order to
get the actual return value (or exception).
"""
if not self._path:
raise AttributeError('No RPC function name specified.')
return self._channel.send((self._path, args, kwargs))
def __eq__(self, other):
return self._channel is other._channel and self._path == other._path
def __hash__(self):
return hash(self._channel) + hash(self._path)