aiomas.channel

This module implements and asyncio asyncio.Protocol protocol for a request-reply Channel.

aiomas.channel.DEFAULT_CODEC

Default codec: JSON

aiomas.channel.open_connection(addr, *, loop=None, codec=None, extra_serializers=(), timeout=0, **kwds)[source]

Return a Channel connected to addr.

This is a convenience wrapper for asyncio.BaseEventLoop.create_connection(), asyncio.BaseEventLoop.create_unix_connection(), and aiomas.local_queue.create_connection().

If addr is a tuple (host, port), a TCP connection will be created. If addr is a string, it should be a path name pointing to the unix domain socket to connect to. If addr is a aiomas.local_queue instance, a LocalQueue connection will be created.

You can optionally provide the event loop to use.

By default, the JSON codec is used. You can override this by passing any subclass of aiomas.codecs.Codec as codec.

You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for add_serializer().

With a timeout of 0 (the default), there will only be one connection attempt before an error is raised (ConnectionRefusedError for TCP sockets and LocalQueue, FileNotFoundError for Unix domain sockets). If you set timeout to a number > 0 or None, this function will try to connect repeatedly for at most that many seconds (or indefinitely) before an error is raised. Use this if you need to start the client before the server.

The remaining keyword argumens kwds are forwarded to asyncio.BaseEventLoop.create_connection() and asyncio.BaseEventLoop.create_unix_connection() respectively.

This function is a coroutine.

aiomas.channel.start_server(addr, client_connected_cb, *, loop=None, codec=None, extra_serializers=(), **kwds)[source]

Start a server listening on addr and call client_connected_cb for every client connecting to it.

This function is a convenience wrapper for asyncio.BaseEventLoop.create_server(), asyncio.BaseEventLoop.create_unix_server(), and aiomas.local_queue.create_server().

If addr is a tuple (host, port), a TCP socket will be created. If addr is a string, a unix domain socket at this path will be created. If addr is a aiomas.local_queue instance, a LocalQueue server will be created.

The single argument of the callable client_connected_cb is a new instance of Channel.

You can optionally provide the event loop to use.

By default, the JSON codec is used. You can override this by passing any subclass of aiomas.codecs.Codec as codec.

You can also pass a list of extra_serializers for the codec. The list entires need to be callables that return a tuple with the arguments for add_serializer().

The remaining keyword argumens kwds are forwarded to asyncio.BaseEventLoop.create_server() and asyncio.BaseEventLoop.create_unix_server() respectively.

This function is a coroutine.

class aiomas.channel.ChannelProtocol(codec, client_connected_cb=None, *, loop)[source]

Asyncio asyncio.Protocol which connects the low level transport with the high level Channel API.

The codec is used to (de)serialize messages. It should be a sub-class of aiomas.codecs.Codec.

Optionally you can also pass a function/coroutine client_connected_cb that will be executed when a new connection is made (see start_server()).

connection_made(transport)[source]

Create a new Channel instance for a new connection.

Also call the client_connected_cb if one was passed to this class.

connection_lost(exc)[source]

Set a ConnectionError to the Channel to indicate that the connection is closed.

data_received(data)[source]

Buffer incomming data until we have a complete message and then pass it to Channel.

Messages are fixed length. The first four bytes (in network byte order) encode the length of the following payload. The payload is a triple (msg_type, msg_id, content) encoded with the specified codec.

eof_received()[source]

Set a ConnectionResetError to the Channel.

write(len_bytes, content)[source]

Serialize content and write the result to the transport.

This method is a coroutine.

pause_writing()[source]

Set the paused flag to True.

Can only be called if we are not already paused.

resume_writing()[source]

Set the paused flat to False and trigger the waiter future.

Can only be called if we are paused.

class aiomas.channel.Request(content, message_id, protocol)[source]

Represents a request returned by Channel.recv(). You shoudn’t instantiate it yourself.

content contains the incoming message.

msg_id is the ID for that message. It is unique within a channel.

protocol is the channel’s ChannelProtocol instance that is used for writing back the reply.

To reply to that request you can yield from Request.reply() or Request.fail().

content

The content of the incoming message.

reply(result)[source]

Reply to the request with the provided result.

This method is a coroutine.

fail(exception)[source]

Indicate a failure described by the exception instance.

This will raise a RemoteException on the other side of the channel.

This method is a coroutine.

class aiomas.channel.Channel(protocol, codec, transport, loop)[source]

A Channel represents a request-reply channel between two endpoints. An instance of it is returned by open_connection() or is passed to the callback of start_server().

protocol is an instance of ChannelProtocol.

transport is an asyncio.BaseTransport.

loop is an instance of an asyncio.BaseEventLoop.

codec

The codec used to de-/encode messages send via the channel.

transport

The transport of this channel (see the Python documentation for details).

send(content)[source]

Send a request content to the other end and return a future which is triggered when a reply arrives.

One of the following exceptions may be raised:

  • ValueError if the message is too long (the length of the encoded message does not fit into a long, which is ~ 4 GiB).
  • RemoteException: The remote site raised an exception during the computation of the result.
  • ConnectionError (or its subclass ConnectionResetError): The connection was closed during the request.
  • RuntimeError:
    • If an invalid message type was received.
    • If the future returned by this method was already triggered or canceled by a third party when an answer to the request arrives (e.g., if a task containing the future is cancelled). You get more detailed exception messages if you enable asyncio’s debug mode
try:
    result = yield from channel.request('ohai')
except RemoteException as exc:
    print(exc)
recv()[source]

Wait for an incoming Request and return it.

May raise one of the following exceptions:

This method is a coroutine.

close()[source]

Coroutine that closes the channel and waits for all sub tasks to finish.

get_extra_info(name, default=None)[source]

Wrapper for asyncio.BaseTransport.get_extra_info().