##// END OF EJS Templates
set unlimited HWM for all relay devices...
MinRK -
Show More
@@ -56,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory'
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.dictdb import DictDB
58 58
59 from IPython.parallel.util import split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60 60
61 61 # conditional import of SQLiteDB / MongoDB backend class
62 62 real_dbs = []
@@ -385,6 +385,7 b' class IPControllerApp(BaseParallelApplication):'
385 385
386 386 # Multiplexer Queue (in a Process)
387 387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
388
388 389 q.bind_in(f.client_url('mux'))
389 390 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 391 q.bind_out(f.engine_url('mux'))
@@ -439,6 +440,22 b' class IPControllerApp(BaseParallelApplication):'
439 440 kwargs['in_thread'] = True
440 441 launch_scheduler(*sargs, **kwargs)
441 442
443 # set unlimited HWM for all relay devices
444 if hasattr(zmq, 'SNDHWM'):
445 q = children[0]
446 q.setsockopt_in(zmq.RCVHWM, 0)
447 q.setsockopt_out(zmq.SNDHWM, 0)
448
449 for q in children[1:]:
450 if not hasattr(q, 'setsockopt_in'):
451 continue
452 q.setsockopt_in(zmq.SNDHWM, 0)
453 q.setsockopt_in(zmq.RCVHWM, 0)
454 q.setsockopt_out(zmq.SNDHWM, 0)
455 q.setsockopt_out(zmq.RCVHWM, 0)
456 q.setsockopt_mon(zmq.SNDHWM, 0)
457
458
442 459 def terminate_children(self):
443 460 child_procs = []
444 461 for child in self.children:
@@ -285,6 +285,7 b' class HubFactory(RegistrationFactory):'
285 285
286 286 # Registrar socket
287 287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
288 util.set_hwm(q, 0)
288 289 q.bind(self.client_url('registration'))
289 290 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 291 if self.client_ip != self.engine_ip:
@@ -297,6 +298,7 b' class HubFactory(RegistrationFactory):'
297 298 hpub = ctx.socket(zmq.PUB)
298 299 hpub.bind(self.engine_url('hb_ping'))
299 300 hrep = ctx.socket(zmq.ROUTER)
301 util.set_hwm(hrep, 0)
300 302 hrep.bind(self.engine_url('hb_pong'))
301 303 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
302 304 pingstream=ZMQStream(hpub,loop),
@@ -811,13 +811,16 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non'
811 811 ctx = zmq.Context()
812 812 loop = ioloop.IOLoop()
813 813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 util.set_hwm(ins, 0)
814 815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 816 ins.bind(in_addr)
816 817
817 818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
819 util.set_hwm(outs, 0)
818 820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
819 821 outs.bind(out_addr)
820 822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
823 util.set_hwm(mons, 0)
821 824 mons.connect(mon_addr)
822 825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
823 826 nots.setsockopt(zmq.SUBSCRIBE, b'')
@@ -349,3 +349,20 b' def local_logger(logname, loglevel=logging.DEBUG):'
349 349 logger.setLevel(loglevel)
350 350 return logger
351 351
352 def set_hwm(sock, hwm=0):
353 """set zmq High Water Mark on a socket
354
355 in a way that always works for various pyzmq / libzmq versions.
356 """
357 import zmq
358
359 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
360 opt = getattr(zmq, key, None)
361 if opt is None:
362 continue
363 try:
364 sock.setsockopt(opt, hwm)
365 except zmq.ZMQError:
366 pass
367
368 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now