Show More
@@ -56,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory' | |||||
56 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
56 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | |
57 | from IPython.parallel.controller.dictdb import DictDB |
|
57 | from IPython.parallel.controller.dictdb import DictDB | |
58 |
|
58 | |||
59 | from IPython.parallel.util import split_url, disambiguate_url |
|
59 | from IPython.parallel.util import split_url, disambiguate_url, set_hwm | |
60 |
|
60 | |||
61 | # conditional import of SQLiteDB / MongoDB backend class |
|
61 | # conditional import of SQLiteDB / MongoDB backend class | |
62 | real_dbs = [] |
|
62 | real_dbs = [] | |
@@ -385,6 +385,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
385 |
|
385 | |||
386 | # Multiplexer Queue (in a Process) |
|
386 | # Multiplexer Queue (in a Process) | |
387 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
387 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') | |
|
388 | ||||
388 | q.bind_in(f.client_url('mux')) |
|
389 | q.bind_in(f.client_url('mux')) | |
389 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | |
390 | q.bind_out(f.engine_url('mux')) |
|
391 | q.bind_out(f.engine_url('mux')) | |
@@ -439,6 +440,22 b' class IPControllerApp(BaseParallelApplication):' | |||||
439 | kwargs['in_thread'] = True |
|
440 | kwargs['in_thread'] = True | |
440 | launch_scheduler(*sargs, **kwargs) |
|
441 | launch_scheduler(*sargs, **kwargs) | |
441 |
|
442 | |||
|
443 | # set unlimited HWM for all relay devices | |||
|
444 | if hasattr(zmq, 'SNDHWM'): | |||
|
445 | q = children[0] | |||
|
446 | q.setsockopt_in(zmq.RCVHWM, 0) | |||
|
447 | q.setsockopt_out(zmq.SNDHWM, 0) | |||
|
448 | ||||
|
449 | for q in children[1:]: | |||
|
450 | if not hasattr(q, 'setsockopt_in'): | |||
|
451 | continue | |||
|
452 | q.setsockopt_in(zmq.SNDHWM, 0) | |||
|
453 | q.setsockopt_in(zmq.RCVHWM, 0) | |||
|
454 | q.setsockopt_out(zmq.SNDHWM, 0) | |||
|
455 | q.setsockopt_out(zmq.RCVHWM, 0) | |||
|
456 | q.setsockopt_mon(zmq.SNDHWM, 0) | |||
|
457 | ||||
|
458 | ||||
442 | def terminate_children(self): |
|
459 | def terminate_children(self): | |
443 | child_procs = [] |
|
460 | child_procs = [] | |
444 | for child in self.children: |
|
461 | for child in self.children: |
@@ -285,6 +285,7 b' class HubFactory(RegistrationFactory):' | |||||
285 |
|
285 | |||
286 | # Registrar socket |
|
286 | # Registrar socket | |
287 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
287 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
|
288 | util.set_hwm(q, 0) | |||
288 | q.bind(self.client_url('registration')) |
|
289 | q.bind(self.client_url('registration')) | |
289 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) |
|
290 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) | |
290 | if self.client_ip != self.engine_ip: |
|
291 | if self.client_ip != self.engine_ip: | |
@@ -297,6 +298,7 b' class HubFactory(RegistrationFactory):' | |||||
297 | hpub = ctx.socket(zmq.PUB) |
|
298 | hpub = ctx.socket(zmq.PUB) | |
298 | hpub.bind(self.engine_url('hb_ping')) |
|
299 | hpub.bind(self.engine_url('hb_ping')) | |
299 | hrep = ctx.socket(zmq.ROUTER) |
|
300 | hrep = ctx.socket(zmq.ROUTER) | |
|
301 | util.set_hwm(hrep, 0) | |||
300 | hrep.bind(self.engine_url('hb_pong')) |
|
302 | hrep.bind(self.engine_url('hb_pong')) | |
301 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
303 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, | |
302 | pingstream=ZMQStream(hpub,loop), |
|
304 | pingstream=ZMQStream(hpub,loop), |
@@ -811,13 +811,16 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non' | |||||
811 | ctx = zmq.Context() |
|
811 | ctx = zmq.Context() | |
812 | loop = ioloop.IOLoop() |
|
812 | loop = ioloop.IOLoop() | |
813 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
813 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
|
814 | util.set_hwm(ins, 0) | |||
814 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') |
|
815 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') | |
815 | ins.bind(in_addr) |
|
816 | ins.bind(in_addr) | |
816 |
|
817 | |||
817 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
818 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
|
819 | util.set_hwm(outs, 0) | |||
818 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') |
|
820 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') | |
819 | outs.bind(out_addr) |
|
821 | outs.bind(out_addr) | |
820 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
822 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | |
|
823 | util.set_hwm(mons, 0) | |||
821 | mons.connect(mon_addr) |
|
824 | mons.connect(mon_addr) | |
822 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) |
|
825 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | |
823 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
|
826 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
@@ -349,3 +349,20 b' def local_logger(logname, loglevel=logging.DEBUG):' | |||||
349 | logger.setLevel(loglevel) |
|
349 | logger.setLevel(loglevel) | |
350 | return logger |
|
350 | return logger | |
351 |
|
351 | |||
|
352 | def set_hwm(sock, hwm=0): | |||
|
353 | """set zmq High Water Mark on a socket | |||
|
354 | ||||
|
355 | in a way that always works for various pyzmq / libzmq versions. | |||
|
356 | """ | |||
|
357 | import zmq | |||
|
358 | ||||
|
359 | for key in ('HWM', 'SNDHWM', 'RCVHWM'): | |||
|
360 | opt = getattr(zmq, key, None) | |||
|
361 | if opt is None: | |||
|
362 | continue | |||
|
363 | try: | |||
|
364 | sock.setsockopt(opt, hwm) | |||
|
365 | except zmq.ZMQError: | |||
|
366 | pass | |||
|
367 | ||||
|
368 | No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now