parallel_messages.txt
266 lines
| 9.7 KiB
| text/plain
|
TextLexer
|
r2790 | .. _parallel_messages: | ||
Messaging for Parallel Computing | ||||
================================ | ||||
|
r3556 | This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections | ||
can be found in the :ref:`parallel connections <parallel_connections>` doc. | ||||
|
r2790 | |||
|
r3556 | ZMQ messaging is also used in the parallel computing IPython system. All messages to/from | ||
kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue | ||||
device. The controller receives all messages and replies in these channels, and saves | ||||
results for future use. | ||||
|
r2790 | |||
The Controller | ||||
-------------- | ||||
|
r3556 | The controller is the central process of the IPython parallel computing model. It has 3 | ||
Devices: | ||||
|
r2790 | |||
* Heartbeater | ||||
* Multiplexed Queue | ||||
* Task Queue | ||||
and 3 sockets: | ||||
* ``XREP`` for both engine and client registration | ||||
* ``PUB`` for notification of engine changes | ||||
* ``XREP`` for client requests | ||||
Registration (``XREP``) | ||||
*********************** | ||||
|
r3556 | The first function of the Controller is to facilitate and monitor connections of clients | ||
and engines. Both client and engine registration are handled by the same socket, so only | ||||
one ip/port pair is needed to connect any number of connections and clients. | ||||
|
r2790 | |||
|
r3556 | Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the | ||
queue, which receives execute requests, and one for the heartbeat, which is used to | ||||
monitor the survival of the Engine process. | ||||
|
r2790 | |||
Message type: ``registration_request``:: | ||||
content = { | ||||
'queue' : 'abcd-1234-...', # the queue XREQ id | ||||
'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id | ||||
} | ||||
|
r3556 | The Controller replies to an Engine's registration request with the engine's integer ID, | ||
and all the remaining connection information for connecting the heartbeat process, and | ||||
kernel queue socket(s). The message status will be an error if the Engine requests IDs that | ||||
already in use. | ||||
|
r2790 | |||
Message type: ``registration_reply``:: | ||||
content = { | ||||
'status' : 'ok', # or 'error' | ||||
# if ok: | ||||
'id' : 0, # int, the engine id | ||||
'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue | ||||
|
r3556 | 'control' : 'tcp://...', # addr for control queue | ||
|
r2790 | 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat | ||
|
r3556 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running | ||
|
r2790 | } | ||
|
r3556 | Clients use the same socket as engines to start their connections. Connection requests | ||
from clients need no information: | ||||
|
r2790 | |||
Message type: ``connection_request``:: | ||||
content = {} | ||||
|
r3556 | The reply to a Client registration request contains the connection information for the | ||
multiplexer and load balanced queues, as well as the address for direct controller | ||||
queries. If any of these addresses is `None`, that functionality is not available. | ||||
|
r2790 | |||
Message type: ``connection_reply``:: | ||||
content = { | ||||
'status' : 'ok', # or 'error' | ||||
# if ok: | ||||
|
r3556 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue | ||
|
r2790 | 'task' : 'tcp...', # addr for task queue, or None if no task queue running | ||
|
r3556 | 'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc. | ||
'control' : 'tcp...' # addr for control methods, like abort, etc. | ||||
|
r2790 | } | ||
Heartbeat | ||||
********* | ||||
|
r3556 | The controller uses a heartbeat system to monitor engines, and track when they become | ||
unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections | ||||
<parallel_connections>`. | ||||
|
r2790 | |||
Notification (``PUB``) | ||||
********************** | ||||
|
r3556 | The controller published all engine registration/unregistration events on a PUB socket. | ||
This allows clients to have up-to-date engine ID sets without polling. Registration | ||||
notifications contain both the integer engine ID and the queue ID, which is necessary for | ||||
sending messages via the Multiplexer Queue. | ||||
|
r2790 | |||
Message type: ``registration_notification``:: | ||||
content = { | ||||
'id' : 0, # engine ID that has been registered | ||||
'queue' : 'engine_id' # the IDENT for the engine's queue | ||||
} | ||||
Message type : ``unregistration_notification``:: | ||||
content = { | ||||
'id' : 0 # engine ID that has been unregistered | ||||
} | ||||
Client Queries (``XREP``) | ||||
************************* | ||||
|
r3556 | The controller monitors and logs all queue traffic, so that clients can retrieve past | ||
results or monitor pending tasks. Currently, this information resides in memory on the | ||||
Controller, but will ultimately be offloaded to a database over an additional ZMQ | ||||
connection. The interface should remain the same or at least similar. | ||||
|
r2790 | |||
|
r3556 | :func:`queue_request` requests can specify multiple engines to query via the `targets` | ||
element. A verbose flag can be passed, to determine whether the result should be the list | ||||
of `msg_ids` in the queue or simply the length of each list. | ||||
|
r2790 | |||
Message type: ``queue_request``:: | ||||
content = { | ||||
'verbose' : True, # whether return should be lists themselves or just lens | ||||
'targets' : [0,3,1] # list of ints | ||||
} | ||||
|
r3556 | The content of a reply to a :func:queue_request request is a dict, keyed by the engine | ||
IDs. Note that they will be the string representation of the integer keys, since JSON | ||||
cannot handle number keys. | ||||
|
r2790 | |||
Message type: ``queue_reply``:: | ||||
content = { | ||||
'0' : {'completed' : 1, 'queue' : 7}, | ||||
'1' : {'completed' : 10, 'queue' : 1} | ||||
} | ||||
|
r3556 | Clients can request individual results directly from the controller. This is primarily for | ||
use gathering results of executions not submitted by the particular client, as the client | ||||
will have all its own results already. Requests are made by msg_id, and can contain one or | ||||
more msg_id. | ||||
|
r2790 | |||
Message type: ``result_request``:: | ||||
content = { | ||||
|
r3556 | 'msg_ids' : ['uuid','...'] # list of strs | ||
|
r2790 | } | ||
|
r3556 | The :func:`result_request` reply contains the content objects of the actual execution | ||
reply messages | ||||
|
r2790 | |||
Message type: ``result_reply``:: | ||||
content = { | ||||
'status' : 'ok', # else error | ||||
# if ok: | ||||
msg_id : msg, # the content dict is keyed by msg_ids, | ||||
# values are the result messages | ||||
'pending' : ['msg_id','...'], # msg_ids still pending | ||||
|
r3556 | 'completed' : ['msg_id','...'], # list of completed msg_ids | ||
|
r2790 | } | ||
|
r3556 | For memory management purposes, Clients can also instruct the controller to forget the | ||
results of messages. This can be done by message ID or engine ID. Individual messages are | ||||
dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer | ||||
be necessary once we move to a DB-based message logging backend. | ||||
|
r2790 | |||
|
r3556 | If the msg_ids element is the string ``'all'`` instead of a list, then all completed | ||
results are forgotten. | ||||
|
r2790 | |||
Message type: ``purge_request``:: | ||||
content = { | ||||
'msg_ids' : ['id1', 'id2',...], # list of msg_ids or 'all' | ||||
'engine_ids' : [0,2,4] # list of engine IDs | ||||
} | ||||
|
r3556 | The reply to a purge request is simply the status 'ok' if the request succeeded, or an | ||
explanation of why it failed, such as requesting the purge of a nonexistent or pending | ||||
message. | ||||
|
r2790 | |||
Message type: ``purge_reply``:: | ||||
content = { | ||||
'status' : 'ok', # or 'error' | ||||
} | ||||
:func:`apply` and :func:`apply_bound` | ||||
************************************* | ||||
|
r3556 | The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to | ||
use the model:: | ||||
|
r2790 | |||
|
r3566 | ns.apply(f, *args, **kwargs) | ||
|
r2790 | |||
|
r3556 | which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` | ||
on a remote engine, returning the result (or, for non-blocking, information facilitating | ||||
later retrieval of the result). This model, unlike the execute message which just uses a | ||||
code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy | ||||
as little data as we can. The `buffers` property of a Message was introduced for this | ||||
purpose. | ||||
|
r2790 | |||
|
r3556 | Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a | ||
|
r3566 | function signature and builds a sendable buffer format for minimal data copying (exactly | ||
zero copies of numpy array data or buffers or large strings). | ||||
|
r2790 | |||
Message type: ``apply_request``:: | ||||
content = { | ||||
|
r3556 | 'bound' : True, # whether to execute in the engine's namespace or unbound | ||
'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict() | ||||
'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict() | ||||
|
r2790 | } | ||
buffers = ['...'] # at least 3 in length | ||||
# as built by build_apply_message(f,args,kwargs) | ||||
|
r3566 | after/follow represent task dependencies | ||
|
r2790 | Message type: ``apply_reply``:: | ||
content = { | ||||
'status' : 'ok' # 'ok' or 'error' | ||||
# other error info here, as in other messages | ||||
} | ||||
buffers = ['...'] # either 1 or 2 in length | ||||
# a serialization of the return value of f(*args,**kwargs) | ||||
# only populated if status is 'ok' | ||||
Implementation | ||||
-------------- | ||||
|
r3556 | There are a few differences in implementation between the `StreamSession` object used in | ||
the parallel computing fork and the `Session` object, the main one being that messages are | ||||
sent in parts, rather than as a single serialized object. `StreamSession` objects also | ||||
take pack/unpack functions, which are to be used when serializing/deserializing objects. | ||||
These can be any functions that translate to/from formats that ZMQ sockets can send | ||||
(buffers,bytes, etc.). | ||||
|
r2790 | |||
Split Sends | ||||
*********** | ||||
|
r3556 | Previously, messages were bundled as a single json object and one call to | ||
:func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to | ||||
see the content of the messages, which can be large, messages are now serialized and sent in | ||||
pieces. All messages are sent in at least 3 parts: the header, the parent header, and the | ||||
content. This allows the controller to unpack and inspect the (always small) header, | ||||
without spending time unpacking the content unless the message is bound for the | ||||
controller. Buffers are added on to the end of the message, and can be any objects that | ||||
present the buffer interface. | ||||
|
r2790 | |||