Show More
@@ -56,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory' | |||
|
56 | 56 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler |
|
57 | 57 | from IPython.parallel.controller.dictdb import DictDB |
|
58 | 58 | |
|
59 | from IPython.parallel.util import split_url, disambiguate_url | |
|
59 | from IPython.parallel.util import split_url, disambiguate_url, set_hwm | |
|
60 | 60 | |
|
61 | 61 | # conditional import of SQLiteDB / MongoDB backend class |
|
62 | 62 | real_dbs = [] |
@@ -385,6 +385,7 b' class IPControllerApp(BaseParallelApplication):' | |||
|
385 | 385 | |
|
386 | 386 | # Multiplexer Queue (in a Process) |
|
387 | 387 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
388 | ||
|
388 | 389 | q.bind_in(f.client_url('mux')) |
|
389 | 390 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') |
|
390 | 391 | q.bind_out(f.engine_url('mux')) |
@@ -439,6 +440,22 b' class IPControllerApp(BaseParallelApplication):' | |||
|
439 | 440 | kwargs['in_thread'] = True |
|
440 | 441 | launch_scheduler(*sargs, **kwargs) |
|
441 | 442 | |
|
443 | # set unlimited HWM for all relay devices | |
|
444 | if hasattr(zmq, 'SNDHWM'): | |
|
445 | q = children[0] | |
|
446 | q.setsockopt_in(zmq.RCVHWM, 0) | |
|
447 | q.setsockopt_out(zmq.SNDHWM, 0) | |
|
448 | ||
|
449 | for q in children[1:]: | |
|
450 | if not hasattr(q, 'setsockopt_in'): | |
|
451 | continue | |
|
452 | q.setsockopt_in(zmq.SNDHWM, 0) | |
|
453 | q.setsockopt_in(zmq.RCVHWM, 0) | |
|
454 | q.setsockopt_out(zmq.SNDHWM, 0) | |
|
455 | q.setsockopt_out(zmq.RCVHWM, 0) | |
|
456 | q.setsockopt_mon(zmq.SNDHWM, 0) | |
|
457 | ||
|
458 | ||
|
442 | 459 | def terminate_children(self): |
|
443 | 460 | child_procs = [] |
|
444 | 461 | for child in self.children: |
@@ -285,6 +285,7 b' class HubFactory(RegistrationFactory):' | |||
|
285 | 285 | |
|
286 | 286 | # Registrar socket |
|
287 | 287 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
288 | util.set_hwm(q, 0) | |
|
288 | 289 | q.bind(self.client_url('registration')) |
|
289 | 290 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) |
|
290 | 291 | if self.client_ip != self.engine_ip: |
@@ -297,6 +298,7 b' class HubFactory(RegistrationFactory):' | |||
|
297 | 298 | hpub = ctx.socket(zmq.PUB) |
|
298 | 299 | hpub.bind(self.engine_url('hb_ping')) |
|
299 | 300 | hrep = ctx.socket(zmq.ROUTER) |
|
301 | util.set_hwm(hrep, 0) | |
|
300 | 302 | hrep.bind(self.engine_url('hb_pong')) |
|
301 | 303 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
302 | 304 | pingstream=ZMQStream(hpub,loop), |
@@ -811,13 +811,16 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non' | |||
|
811 | 811 | ctx = zmq.Context() |
|
812 | 812 | loop = ioloop.IOLoop() |
|
813 | 813 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
814 | util.set_hwm(ins, 0) | |
|
814 | 815 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') |
|
815 | 816 | ins.bind(in_addr) |
|
816 | 817 | |
|
817 | 818 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
819 | util.set_hwm(outs, 0) | |
|
818 | 820 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') |
|
819 | 821 | outs.bind(out_addr) |
|
820 | 822 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
823 | util.set_hwm(mons, 0) | |
|
821 | 824 | mons.connect(mon_addr) |
|
822 | 825 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) |
|
823 | 826 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
@@ -349,3 +349,20 b' def local_logger(logname, loglevel=logging.DEBUG):' | |||
|
349 | 349 | logger.setLevel(loglevel) |
|
350 | 350 | return logger |
|
351 | 351 | |
|
352 | def set_hwm(sock, hwm=0): | |
|
353 | """set zmq High Water Mark on a socket | |
|
354 | ||
|
355 | in a way that always works for various pyzmq / libzmq versions. | |
|
356 | """ | |
|
357 | import zmq | |
|
358 | ||
|
359 | for key in ('HWM', 'SNDHWM', 'RCVHWM'): | |
|
360 | opt = getattr(zmq, key, None) | |
|
361 | if opt is None: | |
|
362 | continue | |
|
363 | try: | |
|
364 | sock.setsockopt(opt, hwm) | |
|
365 | except zmq.ZMQError: | |
|
366 | pass | |
|
367 | ||
|
368 | No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now