aiomas.channel

This module implements and asyncio asyncio.Protocol protocol for a request-reply Channel.

aiomas.channel.DEFAULT_CODEC

Default codec: JSON

aiomas.channel.open_connection(addr, *, loop=None, codec=None, extra_serializers=(), **kwds)[source]

Return a Channel connected to addr.

This is a convenience wrapper for asyncio.BaseEventLoop.create_connection() and asyncio.BaseEventLoop.create_unix_connection().

If addr is a tuple (host, port), a TCP connection will be created. If addr is a string, it should be a path name pointing to the unix domain socket to connect to.

You can optionally provide the event loop to use.

By default, the JSON codec is used. You can override this by passing any subclass of aiomas.codecs.Codec as codec.

You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for add_serializer().

The remaining keyword argumens kwds are forwarded to asyncio.BaseEventLoop.create_connection() and asyncio.BaseEventLoop.create_unix_connection() respectively.

aiomas.channel.start_server(addr, client_connected_cb, *, loop=None, codec=None, extra_serializers=(), **kwds)[source]

Start a server listening on addr and call client_connected_cb for every client connecting to it.

This function is a convenience wrapper for asyncio.BaseEventLoop.create_server() and asyncio.BaseEventLoop.create_unix_server().

If addr is a tuple (host, port), a TCP socket will be created. If addr is a string, a unix domain socket at this path will be created.

The single argument of the callable client_connected_cb is a new instance of Channel.

You can optionally provide the event loop to use.

By default, the JSON codec is used. You can override this by passing any subclass of aiomas.codecs.Codec as codec.

You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for add_serializer().

The remaining keyword argumens kwds are forwarded to asyncio.BaseEventLoop.create_server() and asyncio.BaseEventLoop.create_unix_server() respectively.

class aiomas.channel.ChannelProtocol(codec, client_connected_cb=None, *, loop)[source]

Asyncio asyncio.Protocol which connects the low level transport with the high level Channel API.

The codec is used to (de)serialize messages. It should be a sub-class of aiomas.codecs.Codec.

Optionally you can also pass a function/coroutine client_connected_cb that will be executed when a new connection is made (see start_server()).

connection_made(transport)[source]

Create a new Channel instance for a new connection.

Also call the client_connected_cb if one was passed to this class.

connection_lost(exc)[source]

Set a ConnectionError to the Channel to indicate that the connection is closed.

data_received(data)[source]

Buffer incomming data until we have a complete message and then pass it to Channel.

Messages are fixed length. The first four bytes (in network byte order) encode the length of the following payload. The payload is a triple (msg_type, msg_id, content) encoded with the specified codec.

eof_received()[source]

Set a ConnectionResetError to the Channel.

write(content)[source]

Serialize content and write the result to the transport.

This method is a coroutine.

pause_writing()[source]

Set the paused flag to True.

Can only be called if we are not already paused.

resume_writing()[source]

Set the paused flat to False and trigger the waiter future.

Can only be called if we are paused.

class aiomas.channel.Request(content, message_id, protocol)[source]

Represents a request returned by Channel.recv(). You shoudn’t instantiate it yourself.

content contains the incoming message.

msg_id is the ID for that message. It is unique within a channel.

protocol is the channel’s ChannelProtocol instance that is used for writing back the reply.

To reply to that request you can yield from Request.reply() or Request.fail().

content

The content of the incoming message.

reply(result)[source]

Reply to the request with the provided result.

fail(exception)[source]

Indicate a failure described by the exception instance.

This will raise a RemoteException on the other side of the channel.

class aiomas.channel.Channel(protocol, codec, transport, loop)[source]

A Channel represents a request-reply channel between two endpoints. An instance of it is returned by open_connection() or is passed to the callback of start_server().

protocol is an instance of ChannelProtocol.

transport is an asyncio.BaseTransport.

loop is an instance of an asyncio.BaseEventLoop.

codec

The codec used to de-/encode messages send via the channel.

transport

The transport of this channel (see the Python documentation for details).

send(content)[source]

Send a request content to the other end and return a future which is triggered when a reply arrives.

One of the following exceptions may be raised:

  • RemoteException: The remote site raised an exception during the computation of the result.
  • ConnectionError (or its subclass ConnectionResetError): The connection was closed during the request.
  • RuntimeError:
    • If an invalid message type was received.
    • If the future returned by this method was already triggered or canceled by a third party when an answer to the request arrives (e.g., if a task containing the future is cancelled). You get more detailed exception messages if you enable asyncio’s debug mode
try:
    result = yield from channel.request('ohai')
except RemoteException as exc:
    print(exc)
recv()[source]

Wait for an incoming Request and return it.

May raise one of the following exceptions:

close()[source]

Close the channel’s transport.

get_extra_info(name, default=None)[source]

Wrapper for asyncio.BaseTransport.get_extra_info().