The channel layer

The channel layer is aiomas’ lowest layer of abstraction. It lets you send and receive complete messages. In contrast to asyncio’s built-in stream protocol which just sends byte strings, messages are JSON-encoded [*] data (which is a lot more convenient).

[*]Actually, whether JSON is used for encoding, depends on the codec that the channel uses. JSON is the default, but you can also use MsgPack or something else. At the bottom of this document, there’s a section explaining aiomas’ message format in detail.

Here is a minimal example that shows how the Channel can be used:

>>> import aiomas
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(('localhost', 5555))
...     rep = await channel.send('ohai')
...     print(rep)
...     await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply('cya')
...     await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client))
>>> aiomas.run(client())
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())

A communication channel has two sides: The client side is created and returned by open_connection(). For each client connection, the server creates a Channel instance and starts a new background task of the client connected callback (client_connected_cb) to which it passes that channel instance.

Both, the client and server side, can send and receive messages. In the example above, the client starts to send a request and the server side waits for incoming requests. A request has a content attribute which holds the actual message. To send a reply, you can either use Request.reply() or Request.fail(). Channel.send() and Request.reply() take any data that the channel’s codec can serialize (e.g., strings, numbers, lists, dicts, ...). Request.fail() takes an exception instance which is raised at the requesting side as RemoteException, as the following example demonstrates:

>>> import aiomas
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(('localhost', 5555))
...     try:
...         rep = await channel.send('ohai')
...         print(rep)
...     except aiomas.RemoteException as e:
...         print('Got an error:', str(e))
...     finally:
...         await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.fail(ValueError(42))
...     await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('127.0.0.1', 5555), handle_client))
>>> aiomas.run(client())
ohai
Got an error: Origin: ('127.0.0.1', 5555)
ValueError: 42

>>> server.close()
>>> aiomas.run(server.wait_closed())

These are the basics of the channel layer. The following sections answer some detail questions.

How can I use and another codec?

In order to use another codec as the default JSON one, just pass the corresponding codec class (e.g., MsgPack to open_connection() and start_server():

>>> import aiomas
>>>
>>> CODEC = aiomas.codecs.MsgPack
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(('localhost', 5555),
...                                                    codec=CODEC)
...     rep = await channel.send('ohai')
...     print(rep)
...     await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply('cya')
...     await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
...                                                 codec=CODEC))
>>> aiomas.run(client())
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())

Note, that the codecs aiomas.codecs.MsgPack and aiomas.codecs.MsgPackBlosc are not available by default but have to be explicitly enabled.

How can I serialize custom data types?

Both, open_connection() and start_server() take a list of extra_serializers. Such a serializer is basically a function returning a three-tuple (type, serialize, deserialize). You can find more details in the codecs guide. Here is just a simple example:

>>> import aiomas
>>>
>>>
>>> class MyType:
...     """Our serializable type."""
...     def __init__(self, value):
...         self.value = value
...
...     def __repr__(self):
...         return '%s(%r)' % (self.__class__.__name__, self.value)
>>>
>>>
>>> def serialize_mytype(obj):
...     """Return a JSON serializable version "MyType" instances."""
...     return obj.value
>>>
>>>
>>> def deserialize_mytype(value):
...     """Make a "MyType" instance from *value*."""
...     return MyType(value)
>>>
>>>
>>> def mytype_serializer():
...     return (MyType, serialize_mytype, deserialize_mytype)
>>>
>>>
>>> EXTRA_SERIALIZERS = [mytype_serializer]
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(
...         ('localhost', 5555), extra_serializers=EXTRA_SERIALIZERS)
...     rep = await channel.send(['ohai', MyType(42)])
...     print(rep)
...     await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply(MyType('cya'))
...     await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
...                                                 extra_serializers=EXTRA_SERIALIZERS))
>>> aiomas.run(client())
['ohai', MyType(42)]
MyType('cya')
>>> server.close()
>>> aiomas.run(server.wait_closed())

A shorter version for common cases is using the aiomas.codecs.serializable() decorator:

>>> import aiomas
>>>
>>>
>>> @aiomas.codecs.serializable
... class MyType:
...     """Our serializable type."""
...     def __init__(self, value):
...         self.value = value
>>>
>>>
>>> EXTRA_SERIALIZERS = [MyType.__serializer__]
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(
...         ('localhost', 5555), extra_serializers=EXTRA_SERIALIZERS)
...     rep = await channel.send(['ohai', MyType(42)])
...     print(rep)
...     await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply(MyType('cya'))
...     await channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client,
...                                                 extra_serializers=EXTRA_SERIALIZERS))
>>> aiomas.run(client())
['ohai', MyType(value=42)]
MyType(value='cya')
>>> server.close()
>>> aiomas.run(server.wait_closed())

How can I bind a server socket to a random port?

You cannot ask your OS for an available port but have to try a randomly chosen port until you succeed:

>>> import errno
>>> import random
>>>
>>> max_tries = 100
>>> port_range = (49152, 65536)
>>>
>>> async def random_server(host, port_range, max_tries):
...     for i in range(max_tries):
...         try:
...             port = random.randrange(*port_range)
...             server = await aiomas.channel.start_server(
...                (host, port), handle_client)
...         except OSError as oe:
...             if oe.errno != errno.EADDRINUSE:
...                 # Re-raise if not errno 48 ("address already in use")
...                 raise
...         else:
...             return server, port
...     raise RuntimeError('Could not bind server to a random port.')
>>>
>>> server, port = aiomas.run(random_server('localhost', port_range, max_tries))
>>> server.close()
>>> aiomas.run(server.wait_closed())

Connection timeouts / Starting clients before the server

Sometimes, you need to start a client before the server is started. Therefore, the function open_connection() lets you specify a timeout. It repeatedly retries to connect until timeout seconds have passed. By default, timeout is 0 which means there is only one try.

>>> import asyncio
>>> import aiomas
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     # Try to connect for 1s:
...     channel = await aiomas.channel.open_connection(('localhost', 5555),
...                                                    timeout=1)
...     rep = await channel.send('ohai')
...     print(rep)
...     await channel.close()
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply('cya')
...     await channel.close()
>>>
>>>
>>> # Start the client in background, ...
>>> t_client = asyncio.async(client())
>>> # wait 0.5 seconds, ...
>>> aiomas.run(asyncio.sleep(0.5))
>>> # and finally start the server:
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client))
>>> aiomas.run(t_client)
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())

How exactly do messages look like?

This section explains how aiomas messages look and how they are constructed. You can easily implement this protocol in other languages, too, and write programs that can communicate with aiomas.

Network messages consists of a four bytes long header and a payload of arbitrary length. The header is an unsigned integer (uint32) in network byte order (big-endian) and stores the number of bytes in the payload. The payload itself is an encoded [†] list containing the message type, a message ID and the actual content:

Messages consist of a header and a payload.  The payload is a JSON list containing a message type, ID and the actual content.
[†]Depending on the codec you use, the payload may be a UTF-8 encoded JSON string (json.dumps().encode('utf-8')) (this is the default), a MsgPack list (msgpack.packb()), or whatever else the codec produces.

Messages send between two peers must follow the request-reply pattern. That means, every request that one peer makes must be responded to by the other peer. Request use the message type 0, replies use 1 for success or 2 to indicate a failure. The message ID is an integer that is unique for every request that a network socket makes. Replies (no matter if successful or failed) need to use the message ID of the corresponding request.

On the channel layer, the content of a request can be anything. On the RPC level, it a three-tuple (function_path, args, kwargs), e.g.:

[function, [arg0, arg1, ...], {kwarg0: val0, kwarg1: val1}]

Thereby, function is always a string containing the name of an exposed functions; if you use nested services, sub-services and the function names are separated by slashes (/) as in URLs. The type of the arguments and keyword arguments may vary depending on the function.

The content types of replies are the same for both, the channel layer and the RPC layer. Normal (successful) replies can be anything. The content of failure replies are strings with the error message and/or a stack trace.

Note

If the JSON codec is used, aiomas messages are compatible with simpy.io (and therewith with the co-simulation framework mosaik, too).