|
|
.. _parallel_messages:
|
|
|
|
|
|
Messaging for Parallel Computing
|
|
|
================================
|
|
|
|
|
|
This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections
|
|
|
can be found in the :ref:`parallel connections <parallel_connections>` doc.
|
|
|
|
|
|
|
|
|
ZMQ messaging is also used in the parallel computing IPython system. All messages to/from
|
|
|
kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue
|
|
|
device. The controller receives all messages and replies in these channels, and saves
|
|
|
results for future use.
|
|
|
|
|
|
The Controller
|
|
|
--------------
|
|
|
|
|
|
The controller is the central process of the IPython parallel computing model. It has 3
|
|
|
Devices:
|
|
|
|
|
|
* Heartbeater
|
|
|
* Multiplexed Queue
|
|
|
* Task Queue
|
|
|
|
|
|
and 3 sockets:
|
|
|
|
|
|
* ``XREP`` for both engine and client registration
|
|
|
* ``PUB`` for notification of engine changes
|
|
|
* ``XREP`` for client requests
|
|
|
|
|
|
|
|
|
|
|
|
Registration (``XREP``)
|
|
|
***********************
|
|
|
|
|
|
The first function of the Controller is to facilitate and monitor connections of clients
|
|
|
and engines. Both client and engine registration are handled by the same socket, so only
|
|
|
one ip/port pair is needed to connect any number of connections and clients.
|
|
|
|
|
|
Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
|
|
|
queue, which receives execute requests, and one for the heartbeat, which is used to
|
|
|
monitor the survival of the Engine process.
|
|
|
|
|
|
Message type: ``registration_request``::
|
|
|
|
|
|
content = {
|
|
|
'queue' : 'abcd-1234-...', # the queue XREQ id
|
|
|
'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id
|
|
|
}
|
|
|
|
|
|
The Controller replies to an Engine's registration request with the engine's integer ID,
|
|
|
and all the remaining connection information for connecting the heartbeat process, and
|
|
|
kernel queue socket(s). The message status will be an error if the Engine requests IDs that
|
|
|
already in use.
|
|
|
|
|
|
Message type: ``registration_reply``::
|
|
|
|
|
|
content = {
|
|
|
'status' : 'ok', # or 'error'
|
|
|
# if ok:
|
|
|
'id' : 0, # int, the engine id
|
|
|
'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
|
|
|
'control' : 'tcp://...', # addr for control queue
|
|
|
'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat
|
|
|
'task' : 'tcp://...', # addr for task queue, or None if no task queue running
|
|
|
# if error:
|
|
|
'reason' : 'queue_id already registered'
|
|
|
}
|
|
|
|
|
|
Clients use the same socket as engines to start their connections. Connection requests
|
|
|
from clients need no information:
|
|
|
|
|
|
Message type: ``connection_request``::
|
|
|
|
|
|
content = {}
|
|
|
|
|
|
The reply to a Client registration request contains the connection information for the
|
|
|
multiplexer and load balanced queues, as well as the address for direct controller
|
|
|
queries. If any of these addresses is `None`, that functionality is not available.
|
|
|
|
|
|
Message type: ``connection_reply``::
|
|
|
|
|
|
content = {
|
|
|
'status' : 'ok', # or 'error'
|
|
|
# if ok:
|
|
|
'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
|
|
|
'task' : 'tcp...', # addr for task queue, or None if no task queue running
|
|
|
'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc.
|
|
|
'control' : 'tcp...' # addr for control methods, like abort, etc.
|
|
|
}
|
|
|
|
|
|
Heartbeat
|
|
|
*********
|
|
|
|
|
|
The controller uses a heartbeat system to monitor engines, and track when they become
|
|
|
unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections
|
|
|
<parallel_connections>`.
|
|
|
|
|
|
Notification (``PUB``)
|
|
|
**********************
|
|
|
|
|
|
The controller published all engine registration/unregistration events on a PUB socket.
|
|
|
This allows clients to have up-to-date engine ID sets without polling. Registration
|
|
|
notifications contain both the integer engine ID and the queue ID, which is necessary for
|
|
|
sending messages via the Multiplexer Queue.
|
|
|
|
|
|
Message type: ``registration_notification``::
|
|
|
|
|
|
content = {
|
|
|
'id' : 0, # engine ID that has been registered
|
|
|
'queue' : 'engine_id' # the IDENT for the engine's queue
|
|
|
}
|
|
|
|
|
|
Message type : ``unregistration_notification``::
|
|
|
|
|
|
content = {
|
|
|
'id' : 0 # engine ID that has been unregistered
|
|
|
}
|
|
|
|
|
|
|
|
|
Client Queries (``XREP``)
|
|
|
*************************
|
|
|
|
|
|
The controller monitors and logs all queue traffic, so that clients can retrieve past
|
|
|
results or monitor pending tasks. Currently, this information resides in memory on the
|
|
|
Controller, but will ultimately be offloaded to a database over an additional ZMQ
|
|
|
connection. The interface should remain the same or at least similar.
|
|
|
|
|
|
:func:`queue_request` requests can specify multiple engines to query via the `targets`
|
|
|
element. A verbose flag can be passed, to determine whether the result should be the list
|
|
|
of `msg_ids` in the queue or simply the length of each list.
|
|
|
|
|
|
Message type: ``queue_request``::
|
|
|
|
|
|
content = {
|
|
|
'verbose' : True, # whether return should be lists themselves or just lens
|
|
|
'targets' : [0,3,1] # list of ints
|
|
|
}
|
|
|
|
|
|
The content of a reply to a :func:queue_request request is a dict, keyed by the engine
|
|
|
IDs. Note that they will be the string representation of the integer keys, since JSON
|
|
|
cannot handle number keys.
|
|
|
|
|
|
Message type: ``queue_reply``::
|
|
|
|
|
|
content = {
|
|
|
'0' : {'completed' : 1, 'queue' : 7},
|
|
|
'1' : {'completed' : 10, 'queue' : 1}
|
|
|
}
|
|
|
|
|
|
Clients can request individual results directly from the controller. This is primarily for
|
|
|
use gathering results of executions not submitted by the particular client, as the client
|
|
|
will have all its own results already. Requests are made by msg_id, and can contain one or
|
|
|
more msg_id.
|
|
|
|
|
|
Message type: ``result_request``::
|
|
|
|
|
|
content = {
|
|
|
'msg_ids' : ['uuid','...'] # list of strs
|
|
|
}
|
|
|
|
|
|
The :func:`result_request` reply contains the content objects of the actual execution
|
|
|
reply messages
|
|
|
|
|
|
|
|
|
Message type: ``result_reply``::
|
|
|
|
|
|
content = {
|
|
|
'status' : 'ok', # else error
|
|
|
# if ok:
|
|
|
msg_id : msg, # the content dict is keyed by msg_ids,
|
|
|
# values are the result messages
|
|
|
'pending' : ['msg_id','...'], # msg_ids still pending
|
|
|
'completed' : ['msg_id','...'], # list of completed msg_ids
|
|
|
# if error:
|
|
|
'reason' : "explanation"
|
|
|
}
|
|
|
|
|
|
For memory management purposes, Clients can also instruct the controller to forget the
|
|
|
results of messages. This can be done by message ID or engine ID. Individual messages are
|
|
|
dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer
|
|
|
be necessary once we move to a DB-based message logging backend.
|
|
|
|
|
|
If the msg_ids element is the string ``'all'`` instead of a list, then all completed
|
|
|
results are forgotten.
|
|
|
|
|
|
Message type: ``purge_request``::
|
|
|
|
|
|
content = {
|
|
|
'msg_ids' : ['id1', 'id2',...], # list of msg_ids or 'all'
|
|
|
'engine_ids' : [0,2,4] # list of engine IDs
|
|
|
}
|
|
|
|
|
|
The reply to a purge request is simply the status 'ok' if the request succeeded, or an
|
|
|
explanation of why it failed, such as requesting the purge of a nonexistent or pending
|
|
|
message.
|
|
|
|
|
|
Message type: ``purge_reply``::
|
|
|
|
|
|
content = {
|
|
|
'status' : 'ok', # or 'error'
|
|
|
|
|
|
# if error:
|
|
|
'reason' : "KeyError: no such msg_id 'whoda'"
|
|
|
}
|
|
|
|
|
|
:func:`apply` and :func:`apply_bound`
|
|
|
*************************************
|
|
|
|
|
|
The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to
|
|
|
use the model::
|
|
|
|
|
|
client.apply(f, *args, **kwargs)
|
|
|
|
|
|
which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
|
|
|
on a remote engine, returning the result (or, for non-blocking, information facilitating
|
|
|
later retrieval of the result). This model, unlike the execute message which just uses a
|
|
|
code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy
|
|
|
as little data as we can. The `buffers` property of a Message was introduced for this
|
|
|
purpose.
|
|
|
|
|
|
Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
|
|
|
function signature and builds the correct buffer format for minimal data copying (exactly
|
|
|
zero copies of numpy array data).
|
|
|
|
|
|
Message type: ``apply_request``::
|
|
|
|
|
|
content = {
|
|
|
'bound' : True, # whether to execute in the engine's namespace or unbound
|
|
|
'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
|
|
|
'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
|
|
|
|
|
|
}
|
|
|
buffers = ['...'] # at least 3 in length
|
|
|
# as built by build_apply_message(f,args,kwargs)
|
|
|
|
|
|
Message type: ``apply_reply``::
|
|
|
|
|
|
content = {
|
|
|
'status' : 'ok' # 'ok' or 'error'
|
|
|
# other error info here, as in other messages
|
|
|
}
|
|
|
buffers = ['...'] # either 1 or 2 in length
|
|
|
# a serialization of the return value of f(*args,**kwargs)
|
|
|
# only populated if status is 'ok'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Implementation
|
|
|
--------------
|
|
|
|
|
|
There are a few differences in implementation between the `StreamSession` object used in
|
|
|
the parallel computing fork and the `Session` object, the main one being that messages are
|
|
|
sent in parts, rather than as a single serialized object. `StreamSession` objects also
|
|
|
take pack/unpack functions, which are to be used when serializing/deserializing objects.
|
|
|
These can be any functions that translate to/from formats that ZMQ sockets can send
|
|
|
(buffers,bytes, etc.).
|
|
|
|
|
|
Split Sends
|
|
|
***********
|
|
|
|
|
|
Previously, messages were bundled as a single json object and one call to
|
|
|
:func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to
|
|
|
see the content of the messages, which can be large, messages are now serialized and sent in
|
|
|
pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
|
|
|
content. This allows the controller to unpack and inspect the (always small) header,
|
|
|
without spending time unpacking the content unless the message is bound for the
|
|
|
controller. Buffers are added on to the end of the message, and can be any objects that
|
|
|
present the buffer interface.
|
|
|
|
|
|
|