diff --git a/IPython/parallel/__init__.py b/IPython/parallel/__init__.py index 13ef12f..4cb507d 100644 --- a/IPython/parallel/__init__.py +++ b/IPython/parallel/__init__.py @@ -20,20 +20,14 @@ import warnings import zmq +from IPython.zmq import check_for_zmq if os.name == 'nt': - if zmq.__version__ < '2.1.7': - raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.7 on Windows, " - "and you appear to have %s"%zmq.__version__) -elif zmq.__version__ < '2.1.4': - raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.4, you appear to have %s"%zmq.__version__) - -if zmq.zmq_version() >= '3.0.0': - warnings.warn("""libzmq 3 detected. - It is unlikely that IPython's zmq code will work properly. - Please install libzmq stable, which is 2.1.x or 2.2.x""", - RuntimeWarning) + min_pyzmq = '2.1.7' +else: + min_pyzmq = '2.1.4' +check_for_zmq(min_pyzmq, 'IPython.parallel') from IPython.utils.pickleutil import Reference diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 362318c..3f31680 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -300,7 +300,7 @@ class IPControllerApp(BaseParallelApplication): children.append(q) # Multiplexer Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out') + q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') q.bind_in(hub.client_info['mux']) q.setsockopt_in(zmq.IDENTITY, b'mux') q.bind_out(hub.engine_info['mux']) @@ -309,7 +309,7 @@ class IPControllerApp(BaseParallelApplication): children.append(q) # Control Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol') + q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') q.bind_in(hub.client_info['control']) q.setsockopt_in(zmq.IDENTITY, b'control') q.bind_out(hub.engine_info['control']) @@ -323,7 +323,7 @@ class IPControllerApp(BaseParallelApplication): # Task Queue (in a Process) if scheme == 'pure': self.log.warn("task::using pure XREQ Task scheduler") - q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask') + q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') # q.setsockopt_out(zmq.HWM, hub.hwm) q.bind_in(hub.client_info['task'][1]) q.setsockopt_in(zmq.IDENTITY, b'task') diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index f6d4698..bee991a 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -369,7 +369,7 @@ class Client(HasTraits): extra_args['key'] = exec_key self.session = Session(**extra_args) - self._query_socket = self._context.socket(zmq.XREQ) + self._query_socket = self._context.socket(zmq.DEALER) self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) if self._ssh: tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) @@ -498,12 +498,12 @@ class Client(HasTraits): if content.status == 'ok': ident = util.asbytes(self.session.session) if content.mux: - self._mux_socket = self._context.socket(zmq.XREQ) + self._mux_socket = self._context.socket(zmq.DEALER) self._mux_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._mux_socket, content.mux) if content.task: self._task_scheme, task_addr = content.task - self._task_socket = self._context.socket(zmq.XREQ) + self._task_socket = self._context.socket(zmq.DEALER) self._task_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._task_socket, task_addr) if content.notification: @@ -511,11 +511,11 @@ class Client(HasTraits): connect_socket(self._notification_socket, content.notification) self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') # if content.query: - # self._query_socket = self._context.socket(zmq.XREQ) + # self._query_socket = self._context.socket(zmq.DEALER) # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) # connect_socket(self._query_socket, content.query) if content.control: - self._control_socket = self._context.socket(zmq.XREQ) + self._control_socket = self._context.socket(zmq.DEALER) self._control_socket.setsockopt(zmq.IDENTITY, ident) connect_socket(self._control_socket, content.control) if content.iopub: diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index e51b6c5..ad4fc7b 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -38,7 +38,7 @@ class Heart(object): You can specify the XREQ's IDENTITY via the optional heart_id argument.""" device=None id=None - def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): + def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None): self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) self.device.daemon=True self.device.connect_in(in_addr) @@ -162,7 +162,7 @@ if __name__ == '__main__': context = zmq.Context() pub = context.socket(zmq.PUB) pub.bind('tcp://127.0.0.1:5555') - xrep = context.socket(zmq.XREP) + xrep = context.socket(zmq.ROUTER) xrep.bind('tcp://127.0.0.1:5556') outstream = zmqstream.ZMQStream(pub, loop) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index efcc0c1..c3fe1c1 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -221,7 +221,7 @@ class HubFactory(RegistrationFactory): loop = self.loop # Registrar socket - q = ZMQStream(ctx.socket(zmq.XREP), loop) + q = ZMQStream(ctx.socket(zmq.ROUTER), loop) q.bind(client_iface % self.regport) self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) if self.client_ip != self.engine_ip: @@ -233,7 +233,7 @@ class HubFactory(RegistrationFactory): # heartbeat hpub = ctx.socket(zmq.PUB) hpub.bind(engine_iface % self.hb[0]) - hrep = ctx.socket(zmq.XREP) + hrep = ctx.socket(zmq.ROUTER) hrep.bind(engine_iface % self.hb[1]) self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, pingstream=ZMQStream(hpub,loop), @@ -286,7 +286,7 @@ class HubFactory(RegistrationFactory): self.log.debug("Hub client addrs: %s"%self.client_info) # resubmit stream - r = ZMQStream(ctx.socket(zmq.XREQ), loop) + r = ZMQStream(ctx.socket(zmq.DEALER), loop) url = util.disambiguate_url(self.client_info['task'][-1]) r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) r.connect(url) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index d7e6da6..b7f63eb 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -679,11 +679,11 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, # for safety with multiprocessing ctx = zmq.Context() loop = ioloop.IOLoop() - ins = ZMQStream(ctx.socket(zmq.XREP),loop) + ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) ins.setsockopt(zmq.IDENTITY, identity) ins.bind(in_addr) - outs = ZMQStream(ctx.socket(zmq.XREP),loop) + outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) outs.setsockopt(zmq.IDENTITY, identity) outs.bind(out_addr) mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 7cf07ef..8b7800e 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -123,7 +123,7 @@ class EngineFactory(RegistrationFactory): self.log.info("Registering with controller at %s"%self.url) ctx = self.context connect,maybe_tunnel = self.init_connector() - reg = ctx.socket(zmq.XREQ) + reg = ctx.socket(zmq.DEALER) reg.setsockopt(zmq.IDENTITY, self.bident) connect(reg, self.url) self.registrar = zmqstream.ZMQStream(reg, self.loop) @@ -164,13 +164,13 @@ class EngineFactory(RegistrationFactory): # Uncomment this to go back to two-socket model # shell_streams = [] # for addr in shell_addrs: - # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) + # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) # stream.setsockopt(zmq.IDENTITY, identity) # stream.connect(disambiguate_url(addr, self.location)) # shell_streams.append(stream) # Now use only one shell stream for mux and tasks - stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) + stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) stream.setsockopt(zmq.IDENTITY, identity) shell_streams = [stream] for addr in shell_addrs: @@ -179,7 +179,7 @@ class EngineFactory(RegistrationFactory): # control stream: control_addr = str(msg.content.control) - control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop) + control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) control_stream.setsockopt(zmq.IDENTITY, identity) connect(control_stream, control_addr) diff --git a/IPython/parallel/engine/kernelstarter.py b/IPython/parallel/engine/kernelstarter.py index 4aa0238..4f53d39 100644 --- a/IPython/parallel/engine/kernelstarter.py +++ b/IPython/parallel/engine/kernelstarter.py @@ -219,9 +219,9 @@ def make_starter(up_addr, down_addr, *args, **kwargs): loop = ioloop.IOLoop.instance() ctx = zmq.Context() session = Session() - upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) + upstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop) upstream.connect(up_addr) - downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop) + downstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop) downstream.connect(down_addr) starter = KernelStarter(session, upstream, downstream, *args, **kwargs) diff --git a/IPython/zmq/__init__.py b/IPython/zmq/__init__.py index e9ad6bf..553bb6d 100644 --- a/IPython/zmq/__init__.py +++ b/IPython/zmq/__init__.py @@ -9,26 +9,34 @@ # Verify zmq version dependency >= 2.1.4 #----------------------------------------------------------------------------- +import re import warnings -minimum_pyzmq_version = "2.1.4" +def check_for_zmq(minimum_version, module='IPython.zmq'): + min_vlist = [int(n) for n in minimum_version.split('.')] -try: - import zmq -except ImportError: - raise ImportError("IPython.zmq requires pyzmq >= %s"%minimum_pyzmq_version) + try: + import zmq + except ImportError: + raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version)) -pyzmq_version = zmq.__version__ + pyzmq_version = zmq.__version__ + vlist = [int(n) for n in re.findall(r'\d+', pyzmq_version)] -if pyzmq_version < minimum_pyzmq_version: - raise ImportError("IPython.zmq requires pyzmq >= %s, but you have %s"%( - minimum_pyzmq_version, pyzmq_version)) + if 'dev' not in pyzmq_version and vlist < min_vlist: + raise ImportError("%s requires pyzmq >= %s, but you have %s"%( + module, minimum_version, pyzmq_version)) -del pyzmq_version + # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9 + if not hasattr(zmq, 'DEALER'): + zmq.DEALER = zmq.XREQ + if not hasattr(zmq, 'ROUTER'): + zmq.ROUTER = zmq.XREP -if zmq.zmq_version() >= '3.0.0': - warnings.warn("""libzmq 3 detected. - It is unlikely that IPython's zmq code will work properly. - Please install libzmq stable, which is 2.1.x or 2.2.x""", - RuntimeWarning) + if zmq.zmq_version() >= '4.0.0': + warnings.warn("""libzmq 4 detected. + It is unlikely that IPython's zmq code will work properly. + Please install libzmq stable, which is 2.1.x or 2.2.x""", + RuntimeWarning) +check_for_zmq('2.1.4') diff --git a/IPython/zmq/frontend.py b/IPython/zmq/frontend.py index 39eaffa..671f27d 100755 --- a/IPython/zmq/frontend.py +++ b/IPython/zmq/frontend.py @@ -179,7 +179,7 @@ def main(): # Create initial sockets c = zmq.Context() - request_socket = c.socket(zmq.XREQ) + request_socket = c.socket(zmq.DEALER) request_socket.connect(req_conn) sub_socket = c.socket(zmq.SUB) diff --git a/IPython/zmq/kernelapp.py b/IPython/zmq/kernelapp.py index 730e09e..3b18fa7 100644 --- a/IPython/zmq/kernelapp.py +++ b/IPython/zmq/kernelapp.py @@ -145,9 +145,9 @@ class KernelApp(BaseIPythonApplication): # Uncomment this to try closing the context. # atexit.register(context.term) - self.shell_socket = context.socket(zmq.XREP) + self.shell_socket = context.socket(zmq.ROUTER) self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) - self.log.debug("shell XREP Channel on port: %i"%self.shell_port) + self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port) self.iopub_socket = context.socket(zmq.PUB) self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 0c76f0f..e81f723 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -187,7 +187,7 @@ class ShellSocketChannel(ZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.XREQ) + self.socket = self.context.socket(zmq.DEALER) self.socket.setsockopt(zmq.IDENTITY, self.session.session) self.socket.connect('tcp://%s:%i' % self.address) self.iostate = POLLERR|POLLIN @@ -482,7 +482,7 @@ class StdInSocketChannel(ZMQSocketChannel): def run(self): """The thread's main activity. Call start() instead.""" - self.socket = self.context.socket(zmq.XREQ) + self.socket = self.context.socket(zmq.DEALER) self.socket.setsockopt(zmq.IDENTITY, self.session.session) self.socket.connect('tcp://%s:%i' % self.address) self.iostate = POLLERR|POLLIN diff --git a/docs/source/development/ipython_qt.txt b/docs/source/development/ipython_qt.txt index 0363e42..05e928f 100644 --- a/docs/source/development/ipython_qt.txt +++ b/docs/source/development/ipython_qt.txt @@ -56,7 +56,7 @@ this:: print >>sys.__stdout__, "Starting the kernel..." print >>sys.__stdout__, "On:",rep_conn, pub_conn session = Session(username=u'kernel') - reply_socket = c.socket(zmq.XREP) + reply_socket = c.socket(zmq.ROUTER) reply_socket.bind(rep_conn) pub_socket = c.socket(zmq.PUB) pub_socket.bind(pub_conn) diff --git a/docs/source/development/messaging.txt b/docs/source/development/messaging.txt index f51a189..2e984fc 100644 --- a/docs/source/development/messaging.txt +++ b/docs/source/development/messaging.txt @@ -40,7 +40,7 @@ kernel has three sockets that serve the following functions: otherwise indicating that the user is to type input for the kernel instead of normal commands in the frontend. -2. XREP: this single sockets allows multiple incoming connections from +2. ROUTER: this single sockets allows multiple incoming connections from frontends, and this is the socket where requests for code execution, object information, prompts, etc. are made to the kernel by any frontend. The communication on this socket is a sequence of request/reply actions from @@ -48,13 +48,13 @@ kernel has three sockets that serve the following functions: 3. PUB: this socket is the 'broadcast channel' where the kernel publishes all side effects (stdout, stderr, etc.) as well as the requests coming from any - client over the XREP socket and its own requests on the REP socket. There + client over the ROUTER socket and its own requests on the REP socket. There are a number of actions in Python which generate side effects: :func:`print` writes to ``sys.stdout``, errors generate tracebacks, etc. Additionally, in a multi-client scenario, we want all frontends to be able to know what each other has sent to the kernel (this can be useful in collaborative scenarios, for example). This socket allows both side effects and the information - about communications taking place with one client over the XREQ/XREP channel + about communications taking place with one client over the ROUTER/DEALER channel to be made available to all clients in a uniform manner. All messages are tagged with enough information (details below) for clients @@ -122,7 +122,7 @@ For each message type, the actual content will differ and all existing message types are specified in what follows of this document. -Messages on the XREP/XREQ socket +Messages on the ROUTER/DEALER socket ================================ .. _execute: @@ -633,7 +633,7 @@ Connect When a client connects to the request/reply socket of the kernel, it can issue a connect request to get basic information about the kernel, such as the ports the other ZeroMQ sockets are listening on. This allows clients to only have -to know about a single port (the XREQ/XREP channel) to connect to a kernel. +to know about a single port (the DEALER/ROUTER channel) to connect to a kernel. Message type: ``connect_request``:: @@ -643,7 +643,7 @@ Message type: ``connect_request``:: Message type: ``connect_reply``:: content = { - 'xrep_port' : int # The port the XREP socket is listening on. + 'xrep_port' : int # The port the ROUTER socket is listening on. 'pub_port' : int # The port the PUB socket is listening on. 'req_port' : int # The port the REQ socket is listening on. 'hb_port' : int # The port the heartbeat socket is listening on. @@ -901,7 +901,7 @@ heartbeat with pure ZMQ, without using any Python messaging at all. The monitor sends out a single zmq message (right now, it is a str of the monitor's lifetime in seconds), and gets the same message right back, prefixed -with the zmq identity of the XREQ socket in the heartbeat process. This can be +with the zmq identity of the DEALER socket in the heartbeat process. This can be a uuid, or even a full message, but there doesn't seem to be a need for packing up a message when the sender and receiver are the exact same Python object. @@ -913,7 +913,7 @@ and the monitor receives some number of messages of the form:: ['uuid-abcd-dead-beef', '1.2345678910'] -where the first part is the zmq.IDENTITY of the heart's XREQ on the engine, and +where the first part is the zmq.IDENTITY of the heart's DEALER on the engine, and the rest is the message sent by the monitor. No Python code ever has any access to the message between the monitor's send, and the monitor's recv. diff --git a/docs/source/development/parallel_connections.txt b/docs/source/development/parallel_connections.txt index cac44e1..8530c29 100644 --- a/docs/source/development/parallel_connections.txt +++ b/docs/source/development/parallel_connections.txt @@ -48,11 +48,11 @@ Registration :alt: IPython Registration connections :align: center - Engines and Clients only need to know where the Query ``XREP`` is located to start + Engines and Clients only need to know where the Query ``ROUTER`` is located to start connecting. Once a controller is launched, the only information needed for connecting clients and/or -engines is the IP/port of the Hub's ``XREP`` socket called the Registrar. This socket +engines is the IP/port of the Hub's ``ROUTER`` socket called the Registrar. This socket handles connections from both clients and engines, and replies with the remaining information necessary to establish the remaining connections. Clients use this same socket for querying the Hub for state information. @@ -69,12 +69,12 @@ Heartbeat The heartbeat process has been described elsewhere. To summarize: the Heartbeat Monitor publishes a distinct message periodically via a ``PUB`` socket. Each engine has a -``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output. -The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``XREQ`` is -connected to the ``XREP`` labeled *pong*. This results in the same message being relayed -back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat -Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are -still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets, which information +``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``DEALER`` socket for output. +The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``DEALER`` is +connected to the ``ROUTER`` labeled *pong*. This results in the same message being relayed +back to the Heartbeat Monitor with the addition of the ``DEALER`` prefix. The Heartbeat +Monitor receives all the replies via an ``ROUTER`` socket, and identifies which hearts are +still beating by the ``zmq.IDENTITY`` prefix of the ``DEALER`` sockets, which information the Hub uses to notify clients of any changes in the available engines. Schedulers @@ -94,16 +94,16 @@ queue, all messages sent through these queues (both directions) are also sent vi ``PUB`` socket to a monitor, which allows the Hub to monitor queue traffic without interfering with it. -For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the -client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing. +For tasks, the engine need not be specified. Messages sent to the ``ROUTER`` socket from the +client side are assigned to an engine via ZMQ's ``DEALER`` round-robin load balancing. Engine replies are directed to specific clients via the IDENTITY of the client, which is received as a prefix at the Engine. -For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must -specify the destination by the ``zmq.IDENTITY`` of the ``XREP`` socket connected to +For Multiplexing, ``ROUTER`` is used for both in and output sockets in the device. Clients must +specify the destination by the ``zmq.IDENTITY`` of the ``ROUTER`` socket connected to the downstream end of the device. -At the Kernel level, both of these ``XREP`` sockets are treated in the same way as the ``REP`` +At the Kernel level, both of these ``ROUTER`` sockets are treated in the same way as the ``REP`` socket in the serial version (except using ZMQStreams instead of explicit sockets). Execution can be done in a load-balanced (engine-agnostic) or multiplexed (engine-specified) @@ -135,10 +135,10 @@ Client connections :alt: IPython client query connections :align: center - Clients connect to an ``XREP`` socket to query the hub. + Clients connect to an ``ROUTER`` socket to query the hub. -The hub's registrar ``XREP`` socket also listens for queries from clients as to queue status, -and control instructions. Clients connect to this socket via an ``XREQ`` during registration. +The hub's registrar ``ROUTER`` socket also listens for queries from clients as to queue status, +and control instructions. Clients connect to this socket via an ``DEALER`` during registration. .. figure:: figs/notiffade.png :width: 432px diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 95d455b..674b27b 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -29,14 +29,14 @@ requests and results. It has no role in execution and does no relay of messages large blocking requests or database actions in the Hub do not have the ability to impede job submission and results. -Registration (``XREP``) +Registration (``ROUTER``) *********************** The first function of the Hub is to facilitate and monitor connections of clients and engines. Both client and engine registration are handled by the same socket, so only one ip/port pair is needed to connect any number of connections and clients. -Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the +Engines register with the ``zmq.IDENTITY`` of their two ``DEALER`` sockets, one for the queue, which receives execute requests, and one for the heartbeat, which is used to monitor the survival of the Engine process. @@ -120,7 +120,7 @@ Message type : ``unregistration_notification``:: } -Client Queries (``XREP``) +Client Queries (``ROUTER``) ************************* The hub monitors and logs all queue traffic, so that clients can retrieve past @@ -229,18 +229,18 @@ There are three basic schedulers: * MUX Scheduler * Control Scheduler -The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``XREP`` +The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``ROUTER`` sockets on either side. This allows the queue to relay individual messages to particular targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ -device, in which case the client-facing socket is ``XREP``, and the engine-facing socket -is ``XREQ``. The result of this is that client-submitted messages are load-balanced via -the ``XREQ`` socket, but the engine's replies to each message go to the requesting client. +device, in which case the client-facing socket is ``ROUTER``, and the engine-facing socket +is ``DEALER``. The result of this is that client-submitted messages are load-balanced via +the ``DEALER`` socket, but the engine's replies to each message go to the requesting client. -Raw ``XREQ`` scheduling is quite primitive, and doesn't allow message introspection, so +Raw ``DEALER`` scheduling is quite primitive, and doesn't allow message introspection, so there are also Python Schedulers that can be used. These Schedulers behave in much the same way as a MonitoredQueue does from the outside, but have rich internal logic to determine destinations, as well as handle dependency graphs Their sockets are always -``XREP`` on both sides. +``ROUTER`` on both sides. The Python task schedulers have an additional message type, which informs the Hub of the destination of a task as soon as that destination is known. diff --git a/docs/source/parallel/parallel_task.txt b/docs/source/parallel/parallel_task.txt index 28d34d6..1e13c0b 100644 --- a/docs/source/parallel/parallel_task.txt +++ b/docs/source/parallel/parallel_task.txt @@ -389,7 +389,7 @@ Pure ZMQ Scheduler ------------------ For maximum throughput, the 'pure' scheme is not Python at all, but a C-level -:class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all +:class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all load-balancing. This scheduler does not support any of the advanced features of the Python :class:`.Scheduler`.