aiomas.local_queue

The local queue transport roughly mimics a normal TCP transport, but it sends and receives messages via two asyncio.Queue instances.

Its purpose is to aid the development and debugging of complex networking algorithms and distributed or multi-agent systems. In contrast to normal network transports, messages send via the LocalQueueTransport will always arrive in a deterministic order [1].

This transport does not work across multiple processes and is not thread safe, so it should only be used within a single thread and process.

The easiest way to use it is to create a LocalQueue instance via the get_queue() function and pass it to aiomas.channel.start_server()/aiomas.channel.open_connection() or aiomas.agent.Container.create() as addr argument.

[1]

Actually, message sent via a single TCP connection also arrive at a deterministic order (this is a property of the TCP/IP protocol). So the LocalQueue transport won’t give you any benefits in this case.

However, if you have multiple connections to the same server and send message through them in parallel, it’s no longer deterministic in which order the messages arrive from the different connections. In this case, the LocalQueue transport can help you.

aiomas.local_queue.get_queue(queue_id)[source]

Return a LocalQueue instance for the given queue_id.

If no instance is cached yet, create a new one.

Queue IDs must be strings and must not contain the / character. Raise a ValueError if these rules are violated.

aiomas.local_queue.clear_queue_cache()[source]

Clear the global queue cache.

aiomas.local_queue.create_connection(protocol_factory, lq, *, loop=None, **kwds)[source]

Connect to a LocalQueue lq and return a (transport, protocol) pair.

The protocol_factory must be a callable returning a protocol instance.

Before a connection to lq can be made, a server must be started for this instance (see create_server()).

aiomas.local_queue.create_server(protocol_factory, lq, **kwds)[source]

Create a LocalQueue server bound to lq.

The protocol_factory must be a callable returning a protocol instance.

Return a LocalQueueServer instance. That instance is also set as server for lq.

class aiomas.local_queue.LocalQueue(queue_id)[source]

An instance of this class serves as transport description when creating a server or connection.

The functions create_server() and create_connection() both require an instance of this class. Alternatively, instances of this class can be passed as addr argument to aiomas.channel.start_server() and aiomas.channel.open_connection()

A server needs to be started before any connections can be made.

queue_id

The queue’s ID.

server

The LocalQueueServer instance that was bound to this instance or None if no server has yet been started.

set_server(server)[source]

Set a LocalQueueServer instance.

Raise a RuntimeError if a server has already been bound to this instance.

This method is called by create_server().

unset_server()[source]

Unset the server from this instance.

This method is called when the server is closed (see LocalQueueServer.close()).

new_connection(sendq, recvq, loop=None)[source]

Create a connection endpoint on the server side.

This method is called by create_connection().

sendq and recvq are the queues used for sending and receiving messages to and from the client.

class aiomas.local_queue.LocalQueueServer(protocol_factory, lq)[source]

Implements asyncio.events.AbstractServer. An instance of this class is returned by create_server().

lq is the LocalQueue instance that this server was bound to.

protocol_factory is a callable that is called for each new client connection in order to create a new protocol instance.

lq

The LocalQueue the server is bound to.

new_connection(sendq, recvq, loop)[source]

Create a new protocol and transport instance.

Call the protocol factory, create a new LocalQueueTransport with sendq and recvq and wire them together.

Called by create_connection() via LocalQueue.new_connection().

close()[source]

Close the server and unset this instance from the associated LocalQueue instance.

wait_closed()[source]

Immediately return (there’s nothing to wait for).

class aiomas.local_queue.LocalQueueTransport(lq, sendq, recvq, protocol, loop)[source]

Implements asyncio.transports.Transport.

A LocalQueueTransport has two asynchronous queues (instances of asyncio.Queue) – one for sending messages to the other side and one for receiving messages from it.

close()[source]

Close the transport.

Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol’s connection_lost() method will (eventually) be called with None as its argument.

write(data)[source]

Write some data bytes to the transport.

This does not block; it buffers the data and arranges for it to be sent out asynchronously.

can_write_eof()[source]

Return False. This transport does not support write_eof().

abort()[source]

Close the transport immediately.

Buffered data will be lost. No more data will be received. The protocol’s connection_lost() method will (eventually) be called with None as its argument.