##// END OF EJS Templates
set unlimited HWM for all relay devices...
MinRK -
Show More
@@ -56,7 +56,7 b' from IPython.parallel.controller.hub import HubFactory'
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 from IPython.parallel.controller.dictdb import DictDB
57 from IPython.parallel.controller.dictdb import DictDB
58
58
59 from IPython.parallel.util import split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60
60
61 # conditional import of SQLiteDB / MongoDB backend class
61 # conditional import of SQLiteDB / MongoDB backend class
62 real_dbs = []
62 real_dbs = []
@@ -385,6 +385,7 b' class IPControllerApp(BaseParallelApplication):'
385
385
386 # Multiplexer Queue (in a Process)
386 # Multiplexer Queue (in a Process)
387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
388
388 q.bind_in(f.client_url('mux'))
389 q.bind_in(f.client_url('mux'))
389 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 q.bind_out(f.engine_url('mux'))
391 q.bind_out(f.engine_url('mux'))
@@ -439,6 +440,22 b' class IPControllerApp(BaseParallelApplication):'
439 kwargs['in_thread'] = True
440 kwargs['in_thread'] = True
440 launch_scheduler(*sargs, **kwargs)
441 launch_scheduler(*sargs, **kwargs)
441
442
443 # set unlimited HWM for all relay devices
444 if hasattr(zmq, 'SNDHWM'):
445 q = children[0]
446 q.setsockopt_in(zmq.RCVHWM, 0)
447 q.setsockopt_out(zmq.SNDHWM, 0)
448
449 for q in children[1:]:
450 if not hasattr(q, 'setsockopt_in'):
451 continue
452 q.setsockopt_in(zmq.SNDHWM, 0)
453 q.setsockopt_in(zmq.RCVHWM, 0)
454 q.setsockopt_out(zmq.SNDHWM, 0)
455 q.setsockopt_out(zmq.RCVHWM, 0)
456 q.setsockopt_mon(zmq.SNDHWM, 0)
457
458
442 def terminate_children(self):
459 def terminate_children(self):
443 child_procs = []
460 child_procs = []
444 for child in self.children:
461 for child in self.children:
@@ -285,6 +285,7 b' class HubFactory(RegistrationFactory):'
285
285
286 # Registrar socket
286 # Registrar socket
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
288 util.set_hwm(q, 0)
288 q.bind(self.client_url('registration'))
289 q.bind(self.client_url('registration'))
289 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 if self.client_ip != self.engine_ip:
291 if self.client_ip != self.engine_ip:
@@ -297,6 +298,7 b' class HubFactory(RegistrationFactory):'
297 hpub = ctx.socket(zmq.PUB)
298 hpub = ctx.socket(zmq.PUB)
298 hpub.bind(self.engine_url('hb_ping'))
299 hpub.bind(self.engine_url('hb_ping'))
299 hrep = ctx.socket(zmq.ROUTER)
300 hrep = ctx.socket(zmq.ROUTER)
301 util.set_hwm(hrep, 0)
300 hrep.bind(self.engine_url('hb_pong'))
302 hrep.bind(self.engine_url('hb_pong'))
301 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
303 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
302 pingstream=ZMQStream(hpub,loop),
304 pingstream=ZMQStream(hpub,loop),
@@ -811,13 +811,16 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non'
811 ctx = zmq.Context()
811 ctx = zmq.Context()
812 loop = ioloop.IOLoop()
812 loop = ioloop.IOLoop()
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 util.set_hwm(ins, 0)
814 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 ins.bind(in_addr)
816 ins.bind(in_addr)
816
817
817 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
819 util.set_hwm(outs, 0)
818 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
819 outs.bind(out_addr)
821 outs.bind(out_addr)
820 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
823 util.set_hwm(mons, 0)
821 mons.connect(mon_addr)
824 mons.connect(mon_addr)
822 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
823 nots.setsockopt(zmq.SUBSCRIBE, b'')
826 nots.setsockopt(zmq.SUBSCRIBE, b'')
@@ -349,3 +349,20 b' def local_logger(logname, loglevel=logging.DEBUG):'
349 logger.setLevel(loglevel)
349 logger.setLevel(loglevel)
350 return logger
350 return logger
351
351
352 def set_hwm(sock, hwm=0):
353 """set zmq High Water Mark on a socket
354
355 in a way that always works for various pyzmq / libzmq versions.
356 """
357 import zmq
358
359 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
360 opt = getattr(zmq, key, None)
361 if opt is None:
362 continue
363 try:
364 sock.setsockopt(opt, hwm)
365 except zmq.ZMQError:
366 pass
367
368 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now