##// END OF EJS Templates
reorganized a few files
reorganized a few files

File last commit:

r3566:6596e8ad
r3566:6596e8ad
Show More
parallel_messages.txt
266 lines | 9.7 KiB | text/plain | TextLexer
MinRK
added parallel message docs to development section
r2790 .. _parallel_messages:
Messaging for Parallel Computing
================================
MinRK
general parallel code cleanup
r3556 This is an extension of the :ref:`messaging <messaging>` doc. Diagrams of the connections
can be found in the :ref:`parallel connections <parallel_connections>` doc.
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 ZMQ messaging is also used in the parallel computing IPython system. All messages to/from
kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue
device. The controller receives all messages and replies in these channels, and saves
results for future use.
MinRK
added parallel message docs to development section
r2790
The Controller
--------------
MinRK
general parallel code cleanup
r3556 The controller is the central process of the IPython parallel computing model. It has 3
Devices:
MinRK
added parallel message docs to development section
r2790
* Heartbeater
* Multiplexed Queue
* Task Queue
and 3 sockets:
* ``XREP`` for both engine and client registration
* ``PUB`` for notification of engine changes
* ``XREP`` for client requests
Registration (``XREP``)
***********************
MinRK
general parallel code cleanup
r3556 The first function of the Controller is to facilitate and monitor connections of clients
and engines. Both client and engine registration are handled by the same socket, so only
one ip/port pair is needed to connect any number of connections and clients.
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
queue, which receives execute requests, and one for the heartbeat, which is used to
monitor the survival of the Engine process.
MinRK
added parallel message docs to development section
r2790
Message type: ``registration_request``::
content = {
'queue' : 'abcd-1234-...', # the queue XREQ id
'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id
}
MinRK
general parallel code cleanup
r3556 The Controller replies to an Engine's registration request with the engine's integer ID,
and all the remaining connection information for connecting the heartbeat process, and
kernel queue socket(s). The message status will be an error if the Engine requests IDs that
already in use.
MinRK
added parallel message docs to development section
r2790
Message type: ``registration_reply``::
content = {
'status' : 'ok', # or 'error'
# if ok:
'id' : 0, # int, the engine id
'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
MinRK
general parallel code cleanup
r3556 'control' : 'tcp://...', # addr for control queue
MinRK
added parallel message docs to development section
r2790 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat
MinRK
general parallel code cleanup
r3556 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
MinRK
added parallel message docs to development section
r2790 }
MinRK
general parallel code cleanup
r3556 Clients use the same socket as engines to start their connections. Connection requests
from clients need no information:
MinRK
added parallel message docs to development section
r2790
Message type: ``connection_request``::
content = {}
MinRK
general parallel code cleanup
r3556 The reply to a Client registration request contains the connection information for the
multiplexer and load balanced queues, as well as the address for direct controller
queries. If any of these addresses is `None`, that functionality is not available.
MinRK
added parallel message docs to development section
r2790
Message type: ``connection_reply``::
content = {
'status' : 'ok', # or 'error'
# if ok:
MinRK
general parallel code cleanup
r3556 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
MinRK
added parallel message docs to development section
r2790 'task' : 'tcp...', # addr for task queue, or None if no task queue running
MinRK
general parallel code cleanup
r3556 'query' : 'tcp...' # addr for methods to query the controller, like queue_request, etc.
'control' : 'tcp...' # addr for control methods, like abort, etc.
MinRK
added parallel message docs to development section
r2790 }
Heartbeat
*********
MinRK
general parallel code cleanup
r3556 The controller uses a heartbeat system to monitor engines, and track when they become
unresponsive. As described in :ref:`messages <messages>`, and shown in :ref:`connections
<parallel_connections>`.
MinRK
added parallel message docs to development section
r2790
Notification (``PUB``)
**********************
MinRK
general parallel code cleanup
r3556 The controller published all engine registration/unregistration events on a PUB socket.
This allows clients to have up-to-date engine ID sets without polling. Registration
notifications contain both the integer engine ID and the queue ID, which is necessary for
sending messages via the Multiplexer Queue.
MinRK
added parallel message docs to development section
r2790
Message type: ``registration_notification``::
content = {
'id' : 0, # engine ID that has been registered
'queue' : 'engine_id' # the IDENT for the engine's queue
}
Message type : ``unregistration_notification``::
content = {
'id' : 0 # engine ID that has been unregistered
}
Client Queries (``XREP``)
*************************
MinRK
general parallel code cleanup
r3556 The controller monitors and logs all queue traffic, so that clients can retrieve past
results or monitor pending tasks. Currently, this information resides in memory on the
Controller, but will ultimately be offloaded to a database over an additional ZMQ
connection. The interface should remain the same or at least similar.
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 :func:`queue_request` requests can specify multiple engines to query via the `targets`
element. A verbose flag can be passed, to determine whether the result should be the list
of `msg_ids` in the queue or simply the length of each list.
MinRK
added parallel message docs to development section
r2790
Message type: ``queue_request``::
content = {
'verbose' : True, # whether return should be lists themselves or just lens
'targets' : [0,3,1] # list of ints
}
MinRK
general parallel code cleanup
r3556 The content of a reply to a :func:queue_request request is a dict, keyed by the engine
IDs. Note that they will be the string representation of the integer keys, since JSON
cannot handle number keys.
MinRK
added parallel message docs to development section
r2790
Message type: ``queue_reply``::
content = {
'0' : {'completed' : 1, 'queue' : 7},
'1' : {'completed' : 10, 'queue' : 1}
}
MinRK
general parallel code cleanup
r3556 Clients can request individual results directly from the controller. This is primarily for
use gathering results of executions not submitted by the particular client, as the client
will have all its own results already. Requests are made by msg_id, and can contain one or
more msg_id.
MinRK
added parallel message docs to development section
r2790
Message type: ``result_request``::
content = {
MinRK
general parallel code cleanup
r3556 'msg_ids' : ['uuid','...'] # list of strs
MinRK
added parallel message docs to development section
r2790 }
MinRK
general parallel code cleanup
r3556 The :func:`result_request` reply contains the content objects of the actual execution
reply messages
MinRK
added parallel message docs to development section
r2790
Message type: ``result_reply``::
content = {
'status' : 'ok', # else error
# if ok:
msg_id : msg, # the content dict is keyed by msg_ids,
# values are the result messages
'pending' : ['msg_id','...'], # msg_ids still pending
MinRK
general parallel code cleanup
r3556 'completed' : ['msg_id','...'], # list of completed msg_ids
MinRK
added parallel message docs to development section
r2790 }
MinRK
general parallel code cleanup
r3556 For memory management purposes, Clients can also instruct the controller to forget the
results of messages. This can be done by message ID or engine ID. Individual messages are
dropped by msg_id, and all messages completed on an engine are dropped by engine ID. This will likely no longer
be necessary once we move to a DB-based message logging backend.
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 If the msg_ids element is the string ``'all'`` instead of a list, then all completed
results are forgotten.
MinRK
added parallel message docs to development section
r2790
Message type: ``purge_request``::
content = {
'msg_ids' : ['id1', 'id2',...], # list of msg_ids or 'all'
'engine_ids' : [0,2,4] # list of engine IDs
}
MinRK
general parallel code cleanup
r3556 The reply to a purge request is simply the status 'ok' if the request succeeded, or an
explanation of why it failed, such as requesting the purge of a nonexistent or pending
message.
MinRK
added parallel message docs to development section
r2790
Message type: ``purge_reply``::
content = {
'status' : 'ok', # or 'error'
}
:func:`apply` and :func:`apply_bound`
*************************************
MinRK
general parallel code cleanup
r3556 The `Namespace <http://gist.github.com/483294>`_ model suggests that execution be able to
use the model::
MinRK
added parallel message docs to development section
r2790
MinRK
reorganized a few files
r3566 ns.apply(f, *args, **kwargs)
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)``
on a remote engine, returning the result (or, for non-blocking, information facilitating
later retrieval of the result). This model, unlike the execute message which just uses a
code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy
as little data as we can. The `buffers` property of a Message was introduced for this
purpose.
MinRK
added parallel message docs to development section
r2790
MinRK
general parallel code cleanup
r3556 Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a
MinRK
reorganized a few files
r3566 function signature and builds a sendable buffer format for minimal data copying (exactly
zero copies of numpy array data or buffers or large strings).
MinRK
added parallel message docs to development section
r2790
Message type: ``apply_request``::
content = {
MinRK
general parallel code cleanup
r3556 'bound' : True, # whether to execute in the engine's namespace or unbound
'after' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
'follow' : [msg_ids,...], # list of msg_ids or output of Dependency.as_dict()
MinRK
added parallel message docs to development section
r2790 }
buffers = ['...'] # at least 3 in length
# as built by build_apply_message(f,args,kwargs)
MinRK
reorganized a few files
r3566 after/follow represent task dependencies
MinRK
added parallel message docs to development section
r2790 Message type: ``apply_reply``::
content = {
'status' : 'ok' # 'ok' or 'error'
# other error info here, as in other messages
}
buffers = ['...'] # either 1 or 2 in length
# a serialization of the return value of f(*args,**kwargs)
# only populated if status is 'ok'
Implementation
--------------
MinRK
general parallel code cleanup
r3556 There are a few differences in implementation between the `StreamSession` object used in
the parallel computing fork and the `Session` object, the main one being that messages are
sent in parts, rather than as a single serialized object. `StreamSession` objects also
take pack/unpack functions, which are to be used when serializing/deserializing objects.
These can be any functions that translate to/from formats that ZMQ sockets can send
(buffers,bytes, etc.).
MinRK
added parallel message docs to development section
r2790
Split Sends
***********
MinRK
general parallel code cleanup
r3556 Previously, messages were bundled as a single json object and one call to
:func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to
see the content of the messages, which can be large, messages are now serialized and sent in
pieces. All messages are sent in at least 3 parts: the header, the parent header, and the
content. This allows the controller to unpack and inspect the (always small) header,
without spending time unpacking the content unless the message is bound for the
controller. Buffers are added on to the end of the message, and can be any objects that
present the buffer interface.
MinRK
added parallel message docs to development section
r2790