##// END OF EJS Templates
use ROUTER/DEALER socket names instead of XREP/XREQ...
MinRK -
Show More
@@ -20,20 +20,14 b' import warnings'
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.zmq import check_for_zmq
23
24
24 if os.name == 'nt':
25 if os.name == 'nt':
25 if zmq.__version__ < '2.1.7':
26 min_pyzmq = '2.1.7'
26 raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.7 on Windows, "
27 else:
27 "and you appear to have %s"%zmq.__version__)
28 min_pyzmq = '2.1.4'
28 elif zmq.__version__ < '2.1.4':
29 raise ImportError("IPython.parallel requires pyzmq/0MQ >= 2.1.4, you appear to have %s"%zmq.__version__)
30
31 if zmq.zmq_version() >= '3.0.0':
32 warnings.warn("""libzmq 3 detected.
33 It is unlikely that IPython's zmq code will work properly.
34 Please install libzmq stable, which is 2.1.x or 2.2.x""",
35 RuntimeWarning)
36
29
30 check_for_zmq(min_pyzmq, 'IPython.parallel')
37
31
38 from IPython.utils.pickleutil import Reference
32 from IPython.utils.pickleutil import Reference
39
33
@@ -300,7 +300,7 b' class IPControllerApp(BaseParallelApplication):'
300 children.append(q)
300 children.append(q)
301
301
302 # Multiplexer Queue (in a Process)
302 # Multiplexer Queue (in a Process)
303 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
303 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 q.bind_in(hub.client_info['mux'])
304 q.bind_in(hub.client_info['mux'])
305 q.setsockopt_in(zmq.IDENTITY, b'mux')
305 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 q.bind_out(hub.engine_info['mux'])
306 q.bind_out(hub.engine_info['mux'])
@@ -309,7 +309,7 b' class IPControllerApp(BaseParallelApplication):'
309 children.append(q)
309 children.append(q)
310
310
311 # Control Queue (in a Process)
311 # Control Queue (in a Process)
312 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
312 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 q.bind_in(hub.client_info['control'])
313 q.bind_in(hub.client_info['control'])
314 q.setsockopt_in(zmq.IDENTITY, b'control')
314 q.setsockopt_in(zmq.IDENTITY, b'control')
315 q.bind_out(hub.engine_info['control'])
315 q.bind_out(hub.engine_info['control'])
@@ -323,7 +323,7 b' class IPControllerApp(BaseParallelApplication):'
323 # Task Queue (in a Process)
323 # Task Queue (in a Process)
324 if scheme == 'pure':
324 if scheme == 'pure':
325 self.log.warn("task::using pure XREQ Task scheduler")
325 self.log.warn("task::using pure XREQ Task scheduler")
326 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
326 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 # q.setsockopt_out(zmq.HWM, hub.hwm)
327 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 q.bind_in(hub.client_info['task'][1])
328 q.bind_in(hub.client_info['task'][1])
329 q.setsockopt_in(zmq.IDENTITY, b'task')
329 q.setsockopt_in(zmq.IDENTITY, b'task')
@@ -369,7 +369,7 b' class Client(HasTraits):'
369 extra_args['key'] = exec_key
369 extra_args['key'] = exec_key
370 self.session = Session(**extra_args)
370 self.session = Session(**extra_args)
371
371
372 self._query_socket = self._context.socket(zmq.XREQ)
372 self._query_socket = self._context.socket(zmq.DEALER)
373 self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
373 self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
374 if self._ssh:
374 if self._ssh:
375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
@@ -498,12 +498,12 b' class Client(HasTraits):'
498 if content.status == 'ok':
498 if content.status == 'ok':
499 ident = util.asbytes(self.session.session)
499 ident = util.asbytes(self.session.session)
500 if content.mux:
500 if content.mux:
501 self._mux_socket = self._context.socket(zmq.XREQ)
501 self._mux_socket = self._context.socket(zmq.DEALER)
502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
503 connect_socket(self._mux_socket, content.mux)
503 connect_socket(self._mux_socket, content.mux)
504 if content.task:
504 if content.task:
505 self._task_scheme, task_addr = content.task
505 self._task_scheme, task_addr = content.task
506 self._task_socket = self._context.socket(zmq.XREQ)
506 self._task_socket = self._context.socket(zmq.DEALER)
507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
508 connect_socket(self._task_socket, task_addr)
508 connect_socket(self._task_socket, task_addr)
509 if content.notification:
509 if content.notification:
@@ -511,11 +511,11 b' class Client(HasTraits):'
511 connect_socket(self._notification_socket, content.notification)
511 connect_socket(self._notification_socket, content.notification)
512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
513 # if content.query:
513 # if content.query:
514 # self._query_socket = self._context.socket(zmq.XREQ)
514 # self._query_socket = self._context.socket(zmq.DEALER)
515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
516 # connect_socket(self._query_socket, content.query)
516 # connect_socket(self._query_socket, content.query)
517 if content.control:
517 if content.control:
518 self._control_socket = self._context.socket(zmq.XREQ)
518 self._control_socket = self._context.socket(zmq.DEALER)
519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
520 connect_socket(self._control_socket, content.control)
520 connect_socket(self._control_socket, content.control)
521 if content.iopub:
521 if content.iopub:
@@ -38,7 +38,7 b' class Heart(object):'
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 self.device.daemon=True
43 self.device.daemon=True
44 self.device.connect_in(in_addr)
44 self.device.connect_in(in_addr)
@@ -162,7 +162,7 b" if __name__ == '__main__':"
162 context = zmq.Context()
162 context = zmq.Context()
163 pub = context.socket(zmq.PUB)
163 pub = context.socket(zmq.PUB)
164 pub.bind('tcp://127.0.0.1:5555')
164 pub.bind('tcp://127.0.0.1:5555')
165 xrep = context.socket(zmq.XREP)
165 xrep = context.socket(zmq.ROUTER)
166 xrep.bind('tcp://127.0.0.1:5556')
166 xrep.bind('tcp://127.0.0.1:5556')
167
167
168 outstream = zmqstream.ZMQStream(pub, loop)
168 outstream = zmqstream.ZMQStream(pub, loop)
@@ -221,7 +221,7 b' class HubFactory(RegistrationFactory):'
221 loop = self.loop
221 loop = self.loop
222
222
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.XREP), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
@@ -233,7 +233,7 b' class HubFactory(RegistrationFactory):'
233 # heartbeat
233 # heartbeat
234 hpub = ctx.socket(zmq.PUB)
234 hpub = ctx.socket(zmq.PUB)
235 hpub.bind(engine_iface % self.hb[0])
235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.XREP)
236 hrep = ctx.socket(zmq.ROUTER)
237 hrep.bind(engine_iface % self.hb[1])
237 hrep.bind(engine_iface % self.hb[1])
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 pingstream=ZMQStream(hpub,loop),
239 pingstream=ZMQStream(hpub,loop),
@@ -286,7 +286,7 b' class HubFactory(RegistrationFactory):'
286 self.log.debug("Hub client addrs: %s"%self.client_info)
286 self.log.debug("Hub client addrs: %s"%self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 url = util.disambiguate_url(self.client_info['task'][-1])
290 url = util.disambiguate_url(self.client_info['task'][-1])
291 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
291 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
292 r.connect(url)
292 r.connect(url)
@@ -679,11 +679,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
679 # for safety with multiprocessing
679 # for safety with multiprocessing
680 ctx = zmq.Context()
680 ctx = zmq.Context()
681 loop = ioloop.IOLoop()
681 loop = ioloop.IOLoop()
682 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
682 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
683 ins.setsockopt(zmq.IDENTITY, identity)
683 ins.setsockopt(zmq.IDENTITY, identity)
684 ins.bind(in_addr)
684 ins.bind(in_addr)
685
685
686 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
686 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
687 outs.setsockopt(zmq.IDENTITY, identity)
687 outs.setsockopt(zmq.IDENTITY, identity)
688 outs.bind(out_addr)
688 outs.bind(out_addr)
689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
@@ -123,7 +123,7 b' class EngineFactory(RegistrationFactory):'
123 self.log.info("Registering with controller at %s"%self.url)
123 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
124 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.XREQ)
126 reg = ctx.socket(zmq.DEALER)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
128 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
@@ -164,13 +164,13 b' class EngineFactory(RegistrationFactory):'
164 # Uncomment this to go back to two-socket model
164 # Uncomment this to go back to two-socket model
165 # shell_streams = []
165 # shell_streams = []
166 # for addr in shell_addrs:
166 # for addr in shell_addrs:
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.setsockopt(zmq.IDENTITY, identity)
169 # stream.connect(disambiguate_url(addr, self.location))
169 # stream.connect(disambiguate_url(addr, self.location))
170 # shell_streams.append(stream)
170 # shell_streams.append(stream)
171
171
172 # Now use only one shell stream for mux and tasks
172 # Now use only one shell stream for mux and tasks
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
174 stream.setsockopt(zmq.IDENTITY, identity)
174 stream.setsockopt(zmq.IDENTITY, identity)
175 shell_streams = [stream]
175 shell_streams = [stream]
176 for addr in shell_addrs:
176 for addr in shell_addrs:
@@ -179,7 +179,7 b' class EngineFactory(RegistrationFactory):'
179
179
180 # control stream:
180 # control stream:
181 control_addr = str(msg.content.control)
181 control_addr = str(msg.content.control)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
184 connect(control_stream, control_addr)
184 connect(control_stream, control_addr)
185
185
@@ -219,9 +219,9 b' def make_starter(up_addr, down_addr, *args, **kwargs):'
219 loop = ioloop.IOLoop.instance()
219 loop = ioloop.IOLoop.instance()
220 ctx = zmq.Context()
220 ctx = zmq.Context()
221 session = Session()
221 session = Session()
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop)
223 upstream.connect(up_addr)
223 upstream.connect(up_addr)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.DEALER),loop)
225 downstream.connect(down_addr)
225 downstream.connect(down_addr)
226
226
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
@@ -9,26 +9,34 b''
9 # Verify zmq version dependency >= 2.1.4
9 # Verify zmq version dependency >= 2.1.4
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 import re
12 import warnings
13 import warnings
13
14
14 minimum_pyzmq_version = "2.1.4"
15 def check_for_zmq(minimum_version, module='IPython.zmq'):
16 min_vlist = [int(n) for n in minimum_version.split('.')]
15
17
16 try:
18 try:
17 import zmq
19 import zmq
18 except ImportError:
20 except ImportError:
19 raise ImportError("IPython.zmq requires pyzmq >= %s"%minimum_pyzmq_version)
21 raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))
20
22
21 pyzmq_version = zmq.__version__
23 pyzmq_version = zmq.__version__
24 vlist = [int(n) for n in re.findall(r'\d+', pyzmq_version)]
22
25
23 if pyzmq_version < minimum_pyzmq_version:
26 if 'dev' not in pyzmq_version and vlist < min_vlist:
24 raise ImportError("IPython.zmq requires pyzmq >= %s, but you have %s"%(
27 raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
25 minimum_pyzmq_version, pyzmq_version))
28 module, minimum_version, pyzmq_version))
26
29
27 del pyzmq_version
30 # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
31 if not hasattr(zmq, 'DEALER'):
32 zmq.DEALER = zmq.XREQ
33 if not hasattr(zmq, 'ROUTER'):
34 zmq.ROUTER = zmq.XREP
28
35
29 if zmq.zmq_version() >= '3.0.0':
36 if zmq.zmq_version() >= '4.0.0':
30 warnings.warn("""libzmq 3 detected.
37 warnings.warn("""libzmq 4 detected.
31 It is unlikely that IPython's zmq code will work properly.
38 It is unlikely that IPython's zmq code will work properly.
32 Please install libzmq stable, which is 2.1.x or 2.2.x""",
39 Please install libzmq stable, which is 2.1.x or 2.2.x""",
33 RuntimeWarning)
40 RuntimeWarning)
34
41
42 check_for_zmq('2.1.4')
@@ -179,7 +179,7 b' def main():'
179
179
180 # Create initial sockets
180 # Create initial sockets
181 c = zmq.Context()
181 c = zmq.Context()
182 request_socket = c.socket(zmq.XREQ)
182 request_socket = c.socket(zmq.DEALER)
183 request_socket.connect(req_conn)
183 request_socket.connect(req_conn)
184
184
185 sub_socket = c.socket(zmq.SUB)
185 sub_socket = c.socket(zmq.SUB)
@@ -145,9 +145,9 b' class KernelApp(BaseIPythonApplication):'
145 # Uncomment this to try closing the context.
145 # Uncomment this to try closing the context.
146 # atexit.register(context.term)
146 # atexit.register(context.term)
147
147
148 self.shell_socket = context.socket(zmq.XREP)
148 self.shell_socket = context.socket(zmq.ROUTER)
149 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
149 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
150 self.log.debug("shell XREP Channel on port: %i"%self.shell_port)
150 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
151
151
152 self.iopub_socket = context.socket(zmq.PUB)
152 self.iopub_socket = context.socket(zmq.PUB)
153 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
153 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
@@ -187,7 +187,7 b' class ShellSocketChannel(ZMQSocketChannel):'
187
187
188 def run(self):
188 def run(self):
189 """The thread's main activity. Call start() instead."""
189 """The thread's main activity. Call start() instead."""
190 self.socket = self.context.socket(zmq.XREQ)
190 self.socket = self.context.socket(zmq.DEALER)
191 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
191 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
192 self.socket.connect('tcp://%s:%i' % self.address)
192 self.socket.connect('tcp://%s:%i' % self.address)
193 self.iostate = POLLERR|POLLIN
193 self.iostate = POLLERR|POLLIN
@@ -482,7 +482,7 b' class StdInSocketChannel(ZMQSocketChannel):'
482
482
483 def run(self):
483 def run(self):
484 """The thread's main activity. Call start() instead."""
484 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.XREQ)
485 self.socket = self.context.socket(zmq.DEALER)
486 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
486 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
487 self.socket.connect('tcp://%s:%i' % self.address)
487 self.socket.connect('tcp://%s:%i' % self.address)
488 self.iostate = POLLERR|POLLIN
488 self.iostate = POLLERR|POLLIN
@@ -56,7 +56,7 b' this::'
56 print >>sys.__stdout__, "Starting the kernel..."
56 print >>sys.__stdout__, "Starting the kernel..."
57 print >>sys.__stdout__, "On:",rep_conn, pub_conn
57 print >>sys.__stdout__, "On:",rep_conn, pub_conn
58 session = Session(username=u'kernel')
58 session = Session(username=u'kernel')
59 reply_socket = c.socket(zmq.XREP)
59 reply_socket = c.socket(zmq.ROUTER)
60 reply_socket.bind(rep_conn)
60 reply_socket.bind(rep_conn)
61 pub_socket = c.socket(zmq.PUB)
61 pub_socket = c.socket(zmq.PUB)
62 pub_socket.bind(pub_conn)
62 pub_socket.bind(pub_conn)
@@ -40,7 +40,7 b' kernel has three sockets that serve the following functions:'
40 otherwise indicating that the user is to type input for the kernel instead
40 otherwise indicating that the user is to type input for the kernel instead
41 of normal commands in the frontend.
41 of normal commands in the frontend.
42
42
43 2. XREP: this single sockets allows multiple incoming connections from
43 2. ROUTER: this single sockets allows multiple incoming connections from
44 frontends, and this is the socket where requests for code execution, object
44 frontends, and this is the socket where requests for code execution, object
45 information, prompts, etc. are made to the kernel by any frontend. The
45 information, prompts, etc. are made to the kernel by any frontend. The
46 communication on this socket is a sequence of request/reply actions from
46 communication on this socket is a sequence of request/reply actions from
@@ -48,13 +48,13 b' kernel has three sockets that serve the following functions:'
48
48
49 3. PUB: this socket is the 'broadcast channel' where the kernel publishes all
49 3. PUB: this socket is the 'broadcast channel' where the kernel publishes all
50 side effects (stdout, stderr, etc.) as well as the requests coming from any
50 side effects (stdout, stderr, etc.) as well as the requests coming from any
51 client over the XREP socket and its own requests on the REP socket. There
51 client over the ROUTER socket and its own requests on the REP socket. There
52 are a number of actions in Python which generate side effects: :func:`print`
52 are a number of actions in Python which generate side effects: :func:`print`
53 writes to ``sys.stdout``, errors generate tracebacks, etc. Additionally, in
53 writes to ``sys.stdout``, errors generate tracebacks, etc. Additionally, in
54 a multi-client scenario, we want all frontends to be able to know what each
54 a multi-client scenario, we want all frontends to be able to know what each
55 other has sent to the kernel (this can be useful in collaborative scenarios,
55 other has sent to the kernel (this can be useful in collaborative scenarios,
56 for example). This socket allows both side effects and the information
56 for example). This socket allows both side effects and the information
57 about communications taking place with one client over the XREQ/XREP channel
57 about communications taking place with one client over the ROUTER/DEALER channel
58 to be made available to all clients in a uniform manner.
58 to be made available to all clients in a uniform manner.
59
59
60 All messages are tagged with enough information (details below) for clients
60 All messages are tagged with enough information (details below) for clients
@@ -122,7 +122,7 b' For each message type, the actual content will differ and all existing message'
122 types are specified in what follows of this document.
122 types are specified in what follows of this document.
123
123
124
124
125 Messages on the XREP/XREQ socket
125 Messages on the ROUTER/DEALER socket
126 ================================
126 ================================
127
127
128 .. _execute:
128 .. _execute:
@@ -633,7 +633,7 b' Connect'
633 When a client connects to the request/reply socket of the kernel, it can issue
633 When a client connects to the request/reply socket of the kernel, it can issue
634 a connect request to get basic information about the kernel, such as the ports
634 a connect request to get basic information about the kernel, such as the ports
635 the other ZeroMQ sockets are listening on. This allows clients to only have
635 the other ZeroMQ sockets are listening on. This allows clients to only have
636 to know about a single port (the XREQ/XREP channel) to connect to a kernel.
636 to know about a single port (the DEALER/ROUTER channel) to connect to a kernel.
637
637
638 Message type: ``connect_request``::
638 Message type: ``connect_request``::
639
639
@@ -643,7 +643,7 b' Message type: ``connect_request``::'
643 Message type: ``connect_reply``::
643 Message type: ``connect_reply``::
644
644
645 content = {
645 content = {
646 'xrep_port' : int # The port the XREP socket is listening on.
646 'xrep_port' : int # The port the ROUTER socket is listening on.
647 'pub_port' : int # The port the PUB socket is listening on.
647 'pub_port' : int # The port the PUB socket is listening on.
648 'req_port' : int # The port the REQ socket is listening on.
648 'req_port' : int # The port the REQ socket is listening on.
649 'hb_port' : int # The port the heartbeat socket is listening on.
649 'hb_port' : int # The port the heartbeat socket is listening on.
@@ -901,7 +901,7 b' heartbeat with pure ZMQ, without using any Python messaging at all.'
901
901
902 The monitor sends out a single zmq message (right now, it is a str of the
902 The monitor sends out a single zmq message (right now, it is a str of the
903 monitor's lifetime in seconds), and gets the same message right back, prefixed
903 monitor's lifetime in seconds), and gets the same message right back, prefixed
904 with the zmq identity of the XREQ socket in the heartbeat process. This can be
904 with the zmq identity of the DEALER socket in the heartbeat process. This can be
905 a uuid, or even a full message, but there doesn't seem to be a need for packing
905 a uuid, or even a full message, but there doesn't seem to be a need for packing
906 up a message when the sender and receiver are the exact same Python object.
906 up a message when the sender and receiver are the exact same Python object.
907
907
@@ -913,7 +913,7 b' and the monitor receives some number of messages of the form::'
913
913
914 ['uuid-abcd-dead-beef', '1.2345678910']
914 ['uuid-abcd-dead-beef', '1.2345678910']
915
915
916 where the first part is the zmq.IDENTITY of the heart's XREQ on the engine, and
916 where the first part is the zmq.IDENTITY of the heart's DEALER on the engine, and
917 the rest is the message sent by the monitor. No Python code ever has any
917 the rest is the message sent by the monitor. No Python code ever has any
918 access to the message between the monitor's send, and the monitor's recv.
918 access to the message between the monitor's send, and the monitor's recv.
919
919
@@ -48,11 +48,11 b' Registration'
48 :alt: IPython Registration connections
48 :alt: IPython Registration connections
49 :align: center
49 :align: center
50
50
51 Engines and Clients only need to know where the Query ``XREP`` is located to start
51 Engines and Clients only need to know where the Query ``ROUTER`` is located to start
52 connecting.
52 connecting.
53
53
54 Once a controller is launched, the only information needed for connecting clients and/or
54 Once a controller is launched, the only information needed for connecting clients and/or
55 engines is the IP/port of the Hub's ``XREP`` socket called the Registrar. This socket
55 engines is the IP/port of the Hub's ``ROUTER`` socket called the Registrar. This socket
56 handles connections from both clients and engines, and replies with the remaining
56 handles connections from both clients and engines, and replies with the remaining
57 information necessary to establish the remaining connections. Clients use this same socket for
57 information necessary to establish the remaining connections. Clients use this same socket for
58 querying the Hub for state information.
58 querying the Hub for state information.
@@ -69,12 +69,12 b' Heartbeat'
69
69
70 The heartbeat process has been described elsewhere. To summarize: the Heartbeat Monitor
70 The heartbeat process has been described elsewhere. To summarize: the Heartbeat Monitor
71 publishes a distinct message periodically via a ``PUB`` socket. Each engine has a
71 publishes a distinct message periodically via a ``PUB`` socket. Each engine has a
72 ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``XREQ`` socket for output.
72 ``zmq.FORWARDER`` device with a ``SUB`` socket for input, and ``DEALER`` socket for output.
73 The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``XREQ`` is
73 The ``SUB`` socket is connected to the ``PUB`` socket labeled *ping*, and the ``DEALER`` is
74 connected to the ``XREP`` labeled *pong*. This results in the same message being relayed
74 connected to the ``ROUTER`` labeled *pong*. This results in the same message being relayed
75 back to the Heartbeat Monitor with the addition of the ``XREQ`` prefix. The Heartbeat
75 back to the Heartbeat Monitor with the addition of the ``DEALER`` prefix. The Heartbeat
76 Monitor receives all the replies via an ``XREP`` socket, and identifies which hearts are
76 Monitor receives all the replies via an ``ROUTER`` socket, and identifies which hearts are
77 still beating by the ``zmq.IDENTITY`` prefix of the ``XREQ`` sockets, which information
77 still beating by the ``zmq.IDENTITY`` prefix of the ``DEALER`` sockets, which information
78 the Hub uses to notify clients of any changes in the available engines.
78 the Hub uses to notify clients of any changes in the available engines.
79
79
80 Schedulers
80 Schedulers
@@ -94,16 +94,16 b' queue, all messages sent through these queues (both directions) are also sent vi'
94 ``PUB`` socket to a monitor, which allows the Hub to monitor queue traffic without
94 ``PUB`` socket to a monitor, which allows the Hub to monitor queue traffic without
95 interfering with it.
95 interfering with it.
96
96
97 For tasks, the engine need not be specified. Messages sent to the ``XREP`` socket from the
97 For tasks, the engine need not be specified. Messages sent to the ``ROUTER`` socket from the
98 client side are assigned to an engine via ZMQ's ``XREQ`` round-robin load balancing.
98 client side are assigned to an engine via ZMQ's ``DEALER`` round-robin load balancing.
99 Engine replies are directed to specific clients via the IDENTITY of the client, which is
99 Engine replies are directed to specific clients via the IDENTITY of the client, which is
100 received as a prefix at the Engine.
100 received as a prefix at the Engine.
101
101
102 For Multiplexing, ``XREP`` is used for both in and output sockets in the device. Clients must
102 For Multiplexing, ``ROUTER`` is used for both in and output sockets in the device. Clients must
103 specify the destination by the ``zmq.IDENTITY`` of the ``XREP`` socket connected to
103 specify the destination by the ``zmq.IDENTITY`` of the ``ROUTER`` socket connected to
104 the downstream end of the device.
104 the downstream end of the device.
105
105
106 At the Kernel level, both of these ``XREP`` sockets are treated in the same way as the ``REP``
106 At the Kernel level, both of these ``ROUTER`` sockets are treated in the same way as the ``REP``
107 socket in the serial version (except using ZMQStreams instead of explicit sockets).
107 socket in the serial version (except using ZMQStreams instead of explicit sockets).
108
108
109 Execution can be done in a load-balanced (engine-agnostic) or multiplexed (engine-specified)
109 Execution can be done in a load-balanced (engine-agnostic) or multiplexed (engine-specified)
@@ -135,10 +135,10 b' Client connections'
135 :alt: IPython client query connections
135 :alt: IPython client query connections
136 :align: center
136 :align: center
137
137
138 Clients connect to an ``XREP`` socket to query the hub.
138 Clients connect to an ``ROUTER`` socket to query the hub.
139
139
140 The hub's registrar ``XREP`` socket also listens for queries from clients as to queue status,
140 The hub's registrar ``ROUTER`` socket also listens for queries from clients as to queue status,
141 and control instructions. Clients connect to this socket via an ``XREQ`` during registration.
141 and control instructions. Clients connect to this socket via an ``DEALER`` during registration.
142
142
143 .. figure:: figs/notiffade.png
143 .. figure:: figs/notiffade.png
144 :width: 432px
144 :width: 432px
@@ -29,14 +29,14 b' requests and results. It has no role in execution and does no relay of messages'
29 large blocking requests or database actions in the Hub do not have the ability to impede
29 large blocking requests or database actions in the Hub do not have the ability to impede
30 job submission and results.
30 job submission and results.
31
31
32 Registration (``XREP``)
32 Registration (``ROUTER``)
33 ***********************
33 ***********************
34
34
35 The first function of the Hub is to facilitate and monitor connections of clients
35 The first function of the Hub is to facilitate and monitor connections of clients
36 and engines. Both client and engine registration are handled by the same socket, so only
36 and engines. Both client and engine registration are handled by the same socket, so only
37 one ip/port pair is needed to connect any number of connections and clients.
37 one ip/port pair is needed to connect any number of connections and clients.
38
38
39 Engines register with the ``zmq.IDENTITY`` of their two ``XREQ`` sockets, one for the
39 Engines register with the ``zmq.IDENTITY`` of their two ``DEALER`` sockets, one for the
40 queue, which receives execute requests, and one for the heartbeat, which is used to
40 queue, which receives execute requests, and one for the heartbeat, which is used to
41 monitor the survival of the Engine process.
41 monitor the survival of the Engine process.
42
42
@@ -120,7 +120,7 b' Message type : ``unregistration_notification``::'
120 }
120 }
121
121
122
122
123 Client Queries (``XREP``)
123 Client Queries (``ROUTER``)
124 *************************
124 *************************
125
125
126 The hub monitors and logs all queue traffic, so that clients can retrieve past
126 The hub monitors and logs all queue traffic, so that clients can retrieve past
@@ -229,18 +229,18 b' There are three basic schedulers:'
229 * MUX Scheduler
229 * MUX Scheduler
230 * Control Scheduler
230 * Control Scheduler
231
231
232 The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``XREP``
232 The MUX and Control schedulers are simple MonitoredQueue ØMQ devices, with ``ROUTER``
233 sockets on either side. This allows the queue to relay individual messages to particular
233 sockets on either side. This allows the queue to relay individual messages to particular
234 targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ
234 targets via ``zmq.IDENTITY`` routing. The Task scheduler may be a MonitoredQueue ØMQ
235 device, in which case the client-facing socket is ``XREP``, and the engine-facing socket
235 device, in which case the client-facing socket is ``ROUTER``, and the engine-facing socket
236 is ``XREQ``. The result of this is that client-submitted messages are load-balanced via
236 is ``DEALER``. The result of this is that client-submitted messages are load-balanced via
237 the ``XREQ`` socket, but the engine's replies to each message go to the requesting client.
237 the ``DEALER`` socket, but the engine's replies to each message go to the requesting client.
238
238
239 Raw ``XREQ`` scheduling is quite primitive, and doesn't allow message introspection, so
239 Raw ``DEALER`` scheduling is quite primitive, and doesn't allow message introspection, so
240 there are also Python Schedulers that can be used. These Schedulers behave in much the
240 there are also Python Schedulers that can be used. These Schedulers behave in much the
241 same way as a MonitoredQueue does from the outside, but have rich internal logic to
241 same way as a MonitoredQueue does from the outside, but have rich internal logic to
242 determine destinations, as well as handle dependency graphs Their sockets are always
242 determine destinations, as well as handle dependency graphs Their sockets are always
243 ``XREP`` on both sides.
243 ``ROUTER`` on both sides.
244
244
245 The Python task schedulers have an additional message type, which informs the Hub of
245 The Python task schedulers have an additional message type, which informs the Hub of
246 the destination of a task as soon as that destination is known.
246 the destination of a task as soon as that destination is known.
@@ -389,7 +389,7 b' Pure ZMQ Scheduler'
389 ------------------
389 ------------------
390
390
391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``DEALER`` socket to perform all
393 load-balancing. This scheduler does not support any of the advanced features of the Python
393 load-balancing. This scheduler does not support any of the advanced features of the Python
394 :class:`.Scheduler`.
394 :class:`.Scheduler`.
395
395
General Comments 0
You need to be logged in to leave comments. Login now