##// END OF EJS Templates
use ROUTER/DEALER socket names instead of XREP/XREQ...
MinRK -
Show More
@@ -20,20 +20,14 b' import warnings'
20 20
21 21 import zmq
22 22
23 from IPython.zmq import check_for_zmq
23 24
24 25 if os.name == 'nt':
25 if zmq.__version__ < '2.1.7':
26 raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.7 on Windows, "
27 "and you appear to have %s"%zmq.__version__)
28 elif zmq.__version__ < '2.1.4':
29 raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.4, you appear to have %s"%zmq.__version__)
30
31 if zmq.zmq_version() >= '3.0.0':
32 warnings.warn("""libzmq 3 detected.
33 It is unlikely that IPython's zmq code will work properly.
34 Please install libzmq stable, which is 2.1.x or 2.2.x""",
35 RuntimeWarning)
26 min_pyzmq = '2.1.7'
27 else:
28 min_pyzmq = '2.1.4'
36 29
30 check_for_zmq(min_pyzmq, 'IPython.parallel')
37 31
38 32 from IPython.utils.pickleutil import Reference
39 33
@@ -300,7 +300,7 b' class IPControllerApp(BaseParallelApplication):'
300 300 children.append(q)
301 301
302 302 # Multiplexer Queue (in a Process)
303 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
303 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 304 q.bind_in(hub.client_info['mux'])
305 305 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 306 q.bind_out(hub.engine_info['mux'])
@@ -309,7 +309,7 b' class IPControllerApp(BaseParallelApplication):'
309 309 children.append(q)
310 310
311 311 # Control Queue (in a Process)
312 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
312 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 313 q.bind_in(hub.client_info['control'])
314 314 q.setsockopt_in(zmq.IDENTITY, b'control')
315 315 q.bind_out(hub.engine_info['control'])
@@ -323,7 +323,7 b' class IPControllerApp(BaseParallelApplication):'
323 323 # Task Queue (in a Process)
324 324 if scheme == 'pure':
325 325 self.log.warn("task::using pure XREQ Task scheduler")
326 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
326 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 327 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 328 q.bind_in(hub.client_info['task'][1])
329 329 q.setsockopt_in(zmq.IDENTITY, b'task')
@@ -369,7 +369,7 b' class Client(HasTraits):'
369 369 extra_args['key'] = exec_key
370 370 self.session = Session(**extra_args)
371 371
372 self._query_socket = self._context.socket(zmq.XREQ)
372 self._query_socket = self._context.socket(zmq.DEALER)
373 373 self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
374 374 if self._ssh:
375 375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
@@ -498,12 +498,12 b' class Client(HasTraits):'
498 498 if content.status == 'ok':
499 499 ident = util.asbytes(self.session.session)
500 500 if content.mux:
501 self._mux_socket = self._context.socket(zmq.XREQ)
501 self._mux_socket = self._context.socket(zmq.DEALER)
502 502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
503 503 connect_socket(self._mux_socket, content.mux)
504 504 if content.task:
505 505 self._task_scheme, task_addr = content.task
506 self._task_socket = self._context.socket(zmq.XREQ)
506 self._task_socket = self._context.socket(zmq.DEALER)
507 507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
508 508 connect_socket(self._task_socket, task_addr)
509 509 if content.notification:
@@ -511,11 +511,11 b' class Client(HasTraits):'
511 511 connect_socket(self._notification_socket, content.notification)
512 512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
513 513 # if content.query:
514 # self._query_socket = self._context.socket(zmq.XREQ)
514 # self._query_socket = self._context.socket(zmq.DEALER)
515 515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
516 516 # connect_socket(self._query_socket, content.query)
517 517 if content.control:
518 self._control_socket = self._context.socket(zmq.XREQ)
518 self._control_socket = self._context.socket(zmq.DEALER)
519 519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
520 520 connect_socket(self._control_socket, content.control)
521 521 if content.iopub:
@@ -38,7 +38,7 b' class Heart(object):'
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 43 self.device.daemon=True
44 44 self.device.connect_in(in_addr)
@@ -162,7 +162,7 b" if __name__ == '__main__':"
162 162 context = zmq.Context()
163 163 pub = context.socket(zmq.PUB)
164 164 pub.bind('tcp://127.0.0.1:5555')
165 xrep = context.socket(zmq.XREP)
165 xrep = context.socket(zmq.ROUTER)
166 166 xrep.bind('tcp://127.0.0.1:5556')
167 167
168 168 outstream = zmqstream.ZMQStream(pub, loop)
@@ -221,7 +221,7 b' class HubFactory(RegistrationFactory):'
221 221 loop = self.loop
222 222
223 223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.XREP), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 225 q.bind(client_iface % self.regport)
226 226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
227 227 if self.client_ip != self.engine_ip:
@@ -233,7 +233,7 b' class HubFactory(RegistrationFactory):'
233 233 # heartbeat
234 234 hpub = ctx.socket(zmq.PUB)
235 235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.XREP)
236 hrep = ctx.socket(zmq.ROUTER)
237 237 hrep.bind(engine_iface % self.hb[1])
238 238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 239 pingstream=ZMQStream(hpub,loop),
@@ -286,7 +286,7 b' class HubFactory(RegistrationFactory):'
286 286 self.log.debug("Hub client addrs: %s"%self.client_info)
287 287
288 288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 290 url = util.disambiguate_url(self.client_info['task'][-1])
291 291 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
292 292 r.connect(url)
@@ -679,11 +679,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
679 679 # for safety with multiprocessing
680 680 ctx = zmq.Context()
681 681 loop = ioloop.IOLoop()
682 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
682 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
683 683 ins.setsockopt(zmq.IDENTITY, identity)
684 684 ins.bind(in_addr)
685 685
686 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
686 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
687 687 outs.setsockopt(zmq.IDENTITY, identity)
688 688 outs.bind(out_addr)
689 689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
@@ -123,7 +123,7 b' class EngineFactory(RegistrationFactory):'
123 123 self.log.info("Registering with controller at %s"%self.url)
124 124 ctx = self.context
125 125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.XREQ)
126 reg = ctx.socket(zmq.DEALER)
127 127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 128 connect(reg, self.url)
129 129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
@@ -164,13 +164,13 b' class EngineFactory(RegistrationFactory):'
164 164 # Uncomment this to go back to two-socket model
165 165 # shell_streams = []
166 166 # for addr in shell_addrs:
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 168 # stream.setsockopt(zmq.IDENTITY, identity)
169 169 # stream.connect(disambiguate_url(addr, self.location))
170 170 # shell_streams.append(stream)
171 171
172 172 # Now use only one shell stream for mux and tasks
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
174 174 stream.setsockopt(zmq.IDENTITY, identity)
175 175 shell_streams = [stream]
176 176 for addr in shell_addrs:
@@ -179,7 +179,7 b' class EngineFactory(RegistrationFactory):'
179 179
180 180 # control stream:
181 181 control_addr = str(msg.content.control)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
183 183 control_stream.setsockopt(zmq.IDENTITY, identity)
184 184 connect(control_stream, control_addr)
185 185
@@ -219,9 +219,9 b' def make_starter(up_addr, down_addr, *args, **kwargs):'
219 219 loop = ioloop.IOLoop.instance()
220 220 ctx = zmq.Context()
221 221 session = Session()
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop)
223 223 upstream.connect(up_addr)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop)
225 225 downstream.connect(down_addr)
226 226
227 227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
@@ -9,26 +9,34 b''
9 9 # Verify zmq version dependency >= 2.1.4
10 10 #-----------------------------------------------------------------------------
11 11
12 import re
12 13 import warnings
13 14
14 minimum_pyzmq_version = "2.1.4"
15 def check_for_zmq(minimum_version, module='IPython.zmq'):
16 min_vlist = [int(n) for n in minimum_version.split('.')]
15 17
16 18 try:
17 19 import zmq
18 20 except ImportError:
19 raise ImportError("IPython.zmq requires pyzmq >= %s"%minimum_pyzmq_version)
21 raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))
20 22
21 23 pyzmq_version = zmq.__version__
24 vlist = [int(n) for n in re.findall(r'\d+', pyzmq_version)]
22 25
23 if pyzmq_version < minimum_pyzmq_version:
24 raise ImportError("IPython.zmq requires pyzmq >= %s, but you have %s"%(
25 minimum_pyzmq_version, pyzmq_version))
26 if 'dev' not in pyzmq_version and vlist < min_vlist:
27 raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
28 module, minimum_version, pyzmq_version))
26 29
27 del pyzmq_version
30 # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
31 if not hasattr(zmq, 'DEALER'):
32 zmq.DEALER = zmq.XREQ
33 if not hasattr(zmq, 'ROUTER'):
34 zmq.ROUTER = zmq.XREP
28 35
29 if zmq.zmq_version() >= '3.0.0':
30 warnings.warn("""libzmq 3 detected.
36 if zmq.zmq_version() >= '4.0.0':
37 warnings.warn("""libzmq 4 detected.
31 38 It is unlikely that IPython's zmq code will work properly.
32 39 Please install libzmq stable, which is 2.1.x or 2.2.x""",
33 40 RuntimeWarning)
34 41
42 check_for_zmq('2.1.4')
@@ -179,7 +179,7 b' def main():'
179 179
180 180 # Create initial sockets
181 181 c = zmq.Context()
182 request_socket = c.socket(zmq.XREQ)
182 request_socket = c.socket(zmq.DEALER)
183 183 request_socket.connect(req_conn)
184 184
185 185 sub_socket = c.socket(zmq.SUB)
@@ -145,9 +145,9 b' class KernelApp(BaseIPythonApplication):'
145 145 # Uncomment this to try closing the context.
146 146 # atexit.register(context.term)
147 147
148 self.shell_socket = context.socket(zmq.XREP)
148 self.shell_socket = context.socket(zmq.ROUTER)
149 149 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
150 self.log.debug("shell XREP Channel on port: %i"%self.shell_port)
150 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
151 151
152 152 self.iopub_socket = context.socket(zmq.PUB)
153 153 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
@@ -187,7 +187,7 b' class ShellSocketChannel(ZMQSocketChannel):'
187 187
188 188 def run(self):
189 189 """The thread's main activity. Call start() instead."""
190 self.socket = self.context.socket(zmq.XREQ)
190 self.socket = self.context.socket(zmq.DEALER)
191 191 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
192 192 self.socket.connect('tcp://%s:%i' % self.address)
193 193 self.iostate = POLLERR|POLLIN
@@ -482,7 +482,7 b' class StdInSocketChannel(ZMQSocketChannel):'
482 482
483 483 def run(self):
484 484 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.XREQ)
485 self.socket = self.context.socket(zmq.DEALER)
486 486 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
487 487 self.socket.connect('tcp://%s:%i' % self.address)
488 488 self.iostate = POLLERR|POLLIN
@@ -56,7 +56,7 b' this::'
56 56 print >>sys.__stdout__, "Starting the kernel..."
57 57 print >>sys.__stdout__, "On:",rep_conn, pub_conn
58 58 session = Session(username=u'kernel')
59 reply_socket = c.socket(zmq.XREP)
59 reply_socket = c.socket(zmq.ROUTER)
60 60 reply_socket.bind(rep_conn)
61 61 pub_socket = c.socket(zmq.PUB)
62 62 pub_socket.bind(pub_conn)
@@ -40,7 +40,7 b' kernel has three sockets that serve the following functions:'
40 40 otherwise indicating that the user is to type input for the kernel instead
41 41 of normal commands in the frontend.
42 42
43 2. XREP: this single sockets allows multiple incoming connections from
43 2. ROUTER: this single sockets allows multiple incoming connections from
44 44 frontends, and this is the socket where requests for code execution, object
45 45 information, prompts, etc. are made to the kernel by any frontend. The
46 46 communication on this socket is a sequence of request/reply actions from
@@ -48,13 +48,13 b' kernel has three sockets that serve the following functions:'
48 48
49 49 3. PUB: this socket is the 'broadcast channel' where the kernel publishes all
50 50 side effects (stdout, stderr, etc.) as well as the requests coming from any
51 client over the XREP socket and its own requests on the REP socket. There
51 client over the ROUTER socket and its own requests on the REP socket. There
52 52 are a number of actions in Python which generate side effects: :func:`print`
53 53 writes to ``sys.stdout``, errors generate tracebacks, etc. Additionally, in
54 54 a multi-client scenario, we want all frontends to be able to know what each
55 55 other has sent to the kernel (this can be useful in collaborative scenarios,
56 56 for example). This socket allows both side effects and the information
57 about communications taking place with one client over the XREQ/XREP channel
57 about communications taking place with one client over the ROUTER/DEALER channel
58 58 to be made available to all clients in a uniform manner.
59 59
60 60 All messages are tagged with enough information (details below) for clients
@@ -122,7 +122,7 b' For each message type, the actual content will differ and all existing message'
122 122 types are specified in what follows of this document.
123 123
124 124
125 Messages on the XREP/XREQ socket
125 Messages on the ROUTER/DEALER socket
126 126 ================================
127 127
128 128 .. _execute:
@@ -633,7 +633,7 b' Connect'
633 633 When a client connects to the request/reply socket of the kernel, it can issue
634 634 a connect request to get basic information about the kernel, such as the ports
635 635 the other ZeroMQ sockets are listening on. This allows clients to only have
636 to know about a single port (the XREQ/XREP channel) to connect to a kernel.
636 to know about a single port (the DEALER/ROUTER channel) to connect to a kernel.
637 637
638 638 Message type: ``connect_request``::
639 639
@@ -643,7 +643,7 b' Message type: ``connect_request``::'
643 643 Message type: ``connect_reply``::
644 644
645 645 content = {
646 'xrep_port' : int # The port the XREP socket is listening on.
646 'xrep_port' : int # The port the ROUTER socket is listening on.
647 647 'pub_port' : int # The port the PUB socket is listening on.
648 648 'req_port' : int # The port the REQ socket is listening on.
649 649 'hb_port' : int # The port the heartbeat socket is listening on.
@@ -901,7 +901,7 b' heartbeat with pure ZMQ, without using any Python messaging at all.'
901 901
902 902 The monitor sends out a single zmq message (right now, it is a str of the
903 903 monitor's lifetime in seconds), and gets the same message right back, prefixed
904 with the zmq identity of the XREQ socket in the heartbeat process. This can be
904 with the zmq identity of the DEALER socket in the heartbeat process. This can be
905 905 a uuid, or even a full message, but there doesn't seem to be a need for packing
906 906 up a message when the sender and receiver are the exact same Python object.
907 907
@@ -913,7 +913,7 b' and the monitor receives some number of messages of the form::'
913 913
914 914 ['uuid-abcd-dead-beef', '1.2345678910']
915 915
916 where the first part is the zmq.IDENTITY of the heart's XREQ on the engine, and
916 where the first part is the zmq.IDENTITY of the heart's DEALER on the engine, and
917 917 the rest is the message sent by the monitor. No Python code ever has any
918 918 access to the message between the monitor's send, and the monitor's recv.
919 919
@@ -48,11 +48,11 b' Registration'
48 48 :alt: IPython Registration connections
49 49 :align: center
50 50
51 Engines and Clients only need to know where the Query ``XREP`` is located to start
51 Engines and Clients only need to know where the Query ``ROUTER`` is located to start
52 52 connecting.
53 53
54 54 Once a controller is launched, the only information needed for connecting clients and/or
55 engines is the IP/port of the Hub's ``XREP`` socket called the Registrar. This socket
55 engines is the IP/port of the Hub's ``ROUTER`` socket called the Registrar. This socket
56 56 handles connections from both clients and engines, and replies with the remaining
57 57 information necessary to establish the remaining connections. Clients use this same socket for
58 58 querying the Hub for state information.
@@ -69,12 +69,12 b' Heartbeat'
69 69
70 70 The heartbeat process has been described elsewhere. To summarize: the Heartbeat Monitor
71 71 publishes a distinct message periodically via a ``PUB`` socket. Each engine has a
72 ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output.
73 The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``XREQ`` is
74 connected to the ``XREP`` labeled *pong*. This results in the same message being relayed
75 back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat
76 Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are
77 still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets, which information
72 ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``DEALER`` socket for output.
73 The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``DEALER`` is
74 connected to the ``ROUTER`` labeled *pong*. This results in the same message being relayed
75 back to the Heartbeat Monitor with the addition of the ``DEALER`` prefix. The Heartbeat
76 Monitor receives all the replies via an ``ROUTER`` socket, and identifies which hearts are
77 still beating by the ``zmq.IDENTITY`` prefix of the ``DEALER`` sockets, which information
78 78 the Hub uses to notify clients of any changes in the available engines.
79 79
80 80 Schedulers
@@ -94,16 +94,16 b' queue, all messages sent through these queues (both directions) are also sent vi'
94 94 ``PUB`` socket to a monitor, which allows the Hub to monitor queue traffic without
95 95 interfering with it.
96 96
97 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the
98 client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing.
97 For tasks, the engine need not be specified. Messages sent to the ``ROUTER`` socket from the
98 client side are assigned to an engine via ZMQ's ``DEALER`` round-robin load balancing.
99 99 Engine replies are directed to specific clients via the IDENTITY of the client, which is
100 100 received as a prefix at the Engine.
101 101
102 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must
103 specify the destination by the ``zmq.IDENTITY`` of the ``XREP`` socket connected to
102 For Multiplexing, ``ROUTER`` is used for both in and output sockets in the device. Clients must
103 specify the destination by the ``zmq.IDENTITY`` of the ``ROUTER`` socket connected to
104 104 the downstream end of the device.
105 105
106 At the Kernel level, both of these ``XREP`` sockets are treated in the same way as the ``REP``
106 At the Kernel level, both of these ``ROUTER`` sockets are treated in the same way as the ``REP``
107 107 socket in the serial version (except using ZMQStreams instead of explicit sockets).
108 108
109 109 Execution can be done in a load-balanced (engine-agnostic) or multiplexed (engine-specified)
@@ -135,10 +135,10 b' Client connections'
135 135 :alt: IPython client query connections
136 136 :align: center
137 137
138 Clients connect to an ``XREP`` socket to query the hub.
138 Clients connect to an ``ROUTER`` socket to query the hub.
139 139
140 The hub's registrar ``XREP`` socket also listens for queries from clients as to queue status,
141 and control instructions. Clients connect to this socket via an ``XREQ`` during registration.
140 The hub's registrar ``ROUTER`` socket also listens for queries from clients as to queue status,
141 and control instructions. Clients connect to this socket via an ``DEALER`` during registration.
142 142
143 143 .. figure:: figs/notiffade.png
144 144 :width: 432px
@@ -29,14 +29,14 b' requests and results. It has no role in execution and does no relay of messages'
29 29 large blocking requests or database actions in the Hub do not have the ability to impede
30 30 job submission and results.
31 31
32 Registration (``XREP``)
32 Registration (``ROUTER``)
33 33 ***********************
34 34
35 35 The first function of the Hub is to facilitate and monitor connections of clients
36 36 and engines. Both client and engine registration are handled by the same socket, so only
37 37 one ip/port pair is needed to connect any number of connections and clients.
38 38
39 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
39 Engines register with the ``zmq.IDENTITY`` of their two ``DEALER`` sockets, one for the
40 40 queue, which receives execute requests, and one for the heartbeat, which is used to
41 41 monitor the survival of the Engine process.
42 42
@@ -120,7 +120,7 b' Message type : ``unregistration_notification``::'
120 120 }
121 121
122 122
123 Client Queries (``XREP``)
123 Client Queries (``ROUTER``)
124 124 *************************
125 125
126 126 The hub monitors and logs all queue traffic, so that clients can retrieve past
@@ -229,18 +229,18 b' There are three basic schedulers:'
229 229 * MUX Scheduler
230 230 * Control Scheduler
231 231
232 The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``XREP``
232 The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``ROUTER``
233 233 sockets on either side. This allows the queue to relay individual messages to particular
234 234 targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ
235 device, in which case the client-facing socket is ``XREP``, and the engine-facing socket
236 is ``XREQ``. The result of this is that client-submitted messages are load-balanced via
237 the ``XREQ`` socket, but the engine's replies to each message go to the requesting client.
235 device, in which case the client-facing socket is ``ROUTER``, and the engine-facing socket
236 is ``DEALER``. The result of this is that client-submitted messages are load-balanced via
237 the ``DEALER`` socket, but the engine's replies to each message go to the requesting client.
238 238
239 Raw ``XREQ`` scheduling is quite primitive, and doesn't allow message introspection, so
239 Raw ``DEALER`` scheduling is quite primitive, and doesn't allow message introspection, so
240 240 there are also Python Schedulers that can be used. These Schedulers behave in much the
241 241 same way as a MonitoredQueue does from the outside, but have rich internal logic to
242 242 determine destinations, as well as handle dependency graphs Their sockets are always
243 ``XREP`` on both sides.
243 ``ROUTER`` on both sides.
244 244
245 245 The Python task schedulers have an additional message type, which informs the Hub of
246 246 the destination of a task as soon as that destination is known.
@@ -389,7 +389,7 b' Pure ZMQ Scheduler'
389 389 ------------------
390 390
391 391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
393 393 load-balancing. This scheduler does not support any of the advanced features of the Python
394 394 :class:`.Scheduler`.
395 395
General Comments 0
You need to be logged in to leave comments. Login now