.. _parallel_messages: Messaging for Parallel Computing ================================ This is an extension of the :ref:`messaging ` doc. Diagrams of the connections can be found in the :ref:`parallel connections ` doc. ZMQ messaging is also used in the parallel computing IPython system. All messages to/from kernels remain the same as the single kernel model, and are forwarded through a ZMQ Queue device. The controller receives all messages and replies in these channels, and saves results for future use. The Controller -------------- The controller is the central process of the IPython parallel computing model. It has 3 Devices: * Heartbeater * Multiplexed Queue * Task Queue and 3 sockets: * ``XREP`` for both engine and client registration * ``PUB`` for notification of engine changes * ``XREP`` for client requests Registration (``XREP``) *********************** The first function of the Controller is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only one ip/port pair is needed to connect any number of connections and clients. Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the queue, which receives execute requests, and one for the heartbeat, which is used to monitor the survival of the Engine process. Message type: ``registration_request``:: content = { 'queue' : 'abcd-1234-...', # the queue XREQ id 'heartbeat' : '1234-abcd-...' # the heartbeat XREQ id } The Controller replies to an Engine's registration request with the engine's integer ID, and all the remaining connection information for connecting the heartbeat process, and kernel socket(s). The message status will be an error if the Engine requests IDs that already in use. Message type: ``registration_reply``:: content = { 'status' : 'ok', # or 'error' # if ok: 'id' : 0, # int, the engine id 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue 'heartbeat' : (a,b), # tuple containing two interfaces needed for heartbeat 'task' : 'tcp...', # addr for task queue, or None if no task queue running # if error: 'reason' : 'queue_id already registered' } Clients use the same socket to start their connections. Connection requests from clients need no information: Message type: ``connection_request``:: content = {} The reply to a Client registration request contains the connection information for the multiplexer and load balanced queues, as well as the address for direct controller queries. If any of these addresses is `None`, that functionality is not available. Message type: ``connection_reply``:: content = { 'status' : 'ok', # or 'error' # if ok: 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the queue 'task' : 'tcp...', # addr for task queue, or None if no task queue running 'controller' : 'tcp...' # addr for controller methods, like queue_request, etc. } Heartbeat ********* The controller uses a heartbeat system to monitor engines, and track when they become unresponsive. As described in :ref:`messages `, and shown in :ref:`connections `. Notification (``PUB``) ********************** The controller published all engine registration/unregistration events on a PUB socket. This allows clients to have up-to-date engine ID sets without polling. Registration notifications contain both the integer engine ID and the queue ID, which is necessary for sending messages via the Multiplexer Queue. Message type: ``registration_notification``:: content = { 'id' : 0, # engine ID that has been registered 'queue' : 'engine_id' # the IDENT for the engine's queue } Message type : ``unregistration_notification``:: content = { 'id' : 0 # engine ID that has been unregistered } Client Queries (``XREP``) ************************* The controller monitors and logs all queue traffic, so that clients can retrieve past results or monitor pending tasks. Currently, this information resides in memory on the Controller, but will ultimately be offloaded to a database over an additional ZMQ connection. The interface should remain the same or at least similar. :func:`queue_request` requests can specify multiple engines to query via the `targets` element. A verbose flag can be passed, to determine whether the result should be the list of `msg_ids` in the queue or simply the length of each list. Message type: ``queue_request``:: content = { 'verbose' : True, # whether return should be lists themselves or just lens 'targets' : [0,3,1] # list of ints } The content of a reply to a :func:queue_request request is a dict, keyed by the engine IDs. Note that they will be the string representation of the integer keys, since JSON cannot handle number keys. Message type: ``queue_reply``:: content = { '0' : {'completed' : 1, 'queue' : 7}, '1' : {'completed' : 10, 'queue' : 1} } Clients can request individual results directly from the controller. This is primarily for use gathering results of executions not submitted by the particular client, as the client will have all its own results already. Requests are made by msg_id, and can contain one or more msg_id. Message type: ``result_request``:: content = { 'msg_ids' : [uuid,'...'] # list of strs } The :func:`result_request` reply contains the content objects of the actual execution reply messages Message type: ``result_reply``:: content = { 'status' : 'ok', # else error # if ok: msg_id : msg, # the content dict is keyed by msg_ids, # values are the result messages 'pending' : ['msg_id','...'], # msg_ids still pending # if error: 'reason' : "explanation" } Clients can also instruct the controller to forget the results of messages. This can be done by message ID or engine ID. Individual messages are dropped by msg_id, and all messages completed on an engine are dropped by engine ID. If the msg_ids element is the string ``'all'`` instead of a list, then all completed results are forgotten. Message type: ``purge_request``:: content = { 'msg_ids' : ['id1', 'id2',...], # list of msg_ids or 'all' 'engine_ids' : [0,2,4] # list of engine IDs } The reply to a purge request is simply the status 'ok' if the request succeeded, or an explanation of why it failed, such as requesting the purge of a nonexistent or pending message. Message type: ``purge_reply``:: content = { 'status' : 'ok', # or 'error' # if error: 'reason' : "KeyError: no such msg_id 'whoda'" } :func:`apply` and :func:`apply_bound` ************************************* The `Namespace `_ model suggests that execution be able to use the model:: client.apply(f, *args, **kwargs) which takes `f`, a function in the user's namespace, and executes ``f(*args, **kwargs)`` on a remote engine, returning the result (or, for non-blocking, information facilitating later retrieval of the result). This model, unlike the execute message which just uses a code string, must be able to send arbitrary (pickleable) Python objects. And ideally, copy as little data as we can. The `buffers` property of a Message was introduced for this purpose. Utility method :func:`build_apply_message` in :mod:`IPython.zmq.streamsession` wraps a function signature and builds the correct buffer format. Message type: ``apply_request``:: content = { 'bound' : True # whether to execute in the engine's namespace or unbound } buffers = ['...'] # at least 3 in length # as built by build_apply_message(f,args,kwargs) Message type: ``apply_reply``:: content = { 'status' : 'ok' # 'ok' or 'error' # other error info here, as in other messages } buffers = ['...'] # either 1 or 2 in length # a serialization of the return value of f(*args,**kwargs) # only populated if status is 'ok' Implementation -------------- There are a few differences in implementation between the `StreamSession` object used in the parallel computing fork and the `Session` object, the main one being that messages are sent in parts, rather than as a single serialized object. `StreamSession` objects also take pack/unpack functions, which are to be used when serializing/deserializing objects. These can be any functions that translate to/from formats that ZMQ sockets can send (buffers,bytes, etc.). Split Sends *********** Previously, messages were bundled as a single json object and one call to :func:`socket.send_json`. Since the controller inspects all messages, and doesn't need to see the content of the messages, which can be large, messages are serialized and sent in pieces. All messages are sent in at least 3 parts: the header, the parent header, and the content. This allows the controller to unpack and inspect the (always small) header, without spending time unpacking the content unless the message is bound for the controller. Buffers are added on to the end of the message, and can be any objects that present the buffer interface.