aiomas.local_queue¶
The local queue transport roughly mimics a normal TCP transport, but it sends
and receives messages via two asyncio.Queue instances.
Its purpose is to aid the development and debugging of complex networking
algorithms and distributed or multi-agent systems. In contrast to normal
network transports, messages send via the LocalQueueTransport will
always arrive in a deterministic order [1].
This transport does not work across multiple processes and is not thread safe, so it should only be used within a single thread and process.
The easiest way to use it is to create a LocalQueue instance via the
get_queue() function and pass it to
aiomas.channel.start_server()/aiomas.channel.open_connection()
or aiomas.agent.Container.create() as addr argument.
| [1] | Actually, message sent via a single TCP connection also arrive at a deterministic order (this is a property of the TCP/IP protocol). So the LocalQueue transport won’t give you any benefits in this case. However, if you have multiple connections to the same server and send message through them in parallel, it’s no longer deterministic in which order the messages arrive from the different connections. In this case, the LocalQueue transport can help you. |
-
aiomas.local_queue.get_queue(queue_id)[source]¶ Return a
LocalQueueinstance for the given queue_id.If no instance is cached yet, create a new one.
Queue IDs must be strings and must not contain the
/character. Raise aValueErrorif these rules are violated.
-
aiomas.local_queue.create_connection(protocol_factory, lq, *, loop=None, **kwds)[source]¶ Connect to a
LocalQueuelq and return a (transport, protocol) pair.The protocol_factory must be a callable returning a protocol instance.
Before a connection to lq can be made, a server must be started for this instance (see
create_server()).
-
aiomas.local_queue.create_server(protocol_factory, lq, **kwds)[source]¶ Create a
LocalQueueserver bound to lq.The protocol_factory must be a callable returning a protocol instance.
Return a
LocalQueueServerinstance. That instance is also set asserverfor lq.
-
class
aiomas.local_queue.LocalQueue(queue_id)[source]¶ An instance of this class serves as transport description when creating a server or connection.
The functions
create_server()andcreate_connection()both require an instance of this class. Alternatively, instances of this class can be passed as addr argument toaiomas.channel.start_server()andaiomas.channel.open_connection()A server needs to be started before any connections can be made.
-
queue_id¶ The queue’s ID.
-
server¶ The
LocalQueueServerinstance that was bound to this instance orNoneif no server has yet been started.
-
set_server(server)[source]¶ Set a
LocalQueueServerinstance.Raise a
RuntimeErrorif a server has already been bound to this instance.This method is called by
create_server().
-
unset_server()[source]¶ Unset the server from this instance.
This method is called when the server is closed (see
LocalQueueServer.close()).
-
new_connection(sendq, recvq, loop=None)[source]¶ Create a connection endpoint on the server side.
This method is called by
create_connection().sendq and recvq are the queues used for sending and receiving messages to and from the client.
-
-
class
aiomas.local_queue.LocalQueueServer(protocol_factory, lq)[source]¶ Implements
asyncio.events.AbstractServer. An instance of this class is returned bycreate_server().lq is the
LocalQueueinstance that this server was bound to.protocol_factory is a callable that is called for each new client connection in order to create a new protocol instance.
-
lq¶ The
LocalQueuethe server is bound to.
-
new_connection(sendq, recvq, loop)[source]¶ Create a new protocol and transport instance.
Call the protocol factory, create a new
LocalQueueTransportwith sendq and recvq and wire them together.Called by
create_connection()viaLocalQueue.new_connection().
-
close()[source]¶ Close the server and unset this instance from the associated
LocalQueueinstance.
-
-
class
aiomas.local_queue.LocalQueueTransport(lq, sendq, recvq, protocol, loop)[source]¶ Implements
asyncio.transports.Transport.A LocalQueueTransport has two asynchronous queues (instances of
asyncio.Queue) – one for sending messages to the other side and one for receiving messages from it.-
close()[source]¶ Close the transport.
Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol’s
connection_lost()method will (eventually) be called withNoneas its argument.
-