diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 6fa179e..68315b6 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -56,7 +56,7 @@ from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.dictdb import DictDB -from IPython.parallel.util import split_url, disambiguate_url +from IPython.parallel.util import split_url, disambiguate_url, set_hwm # conditional import of SQLiteDB / MongoDB backend class real_dbs = [] @@ -385,6 +385,7 @@ class IPControllerApp(BaseParallelApplication): # Multiplexer Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') + q.bind_in(f.client_url('mux')) q.setsockopt_in(zmq.IDENTITY, b'mux_in') q.bind_out(f.engine_url('mux')) @@ -438,6 +439,22 @@ class IPControllerApp(BaseParallelApplication): # single-threaded Controller kwargs['in_thread'] = True launch_scheduler(*sargs, **kwargs) + + # set unlimited HWM for all relay devices + if hasattr(zmq, 'SNDHWM'): + q = children[0] + q.setsockopt_in(zmq.RCVHWM, 0) + q.setsockopt_out(zmq.SNDHWM, 0) + + for q in children[1:]: + if not hasattr(q, 'setsockopt_in'): + continue + q.setsockopt_in(zmq.SNDHWM, 0) + q.setsockopt_in(zmq.RCVHWM, 0) + q.setsockopt_out(zmq.SNDHWM, 0) + q.setsockopt_out(zmq.RCVHWM, 0) + q.setsockopt_mon(zmq.SNDHWM, 0) + def terminate_children(self): child_procs = [] diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 8d87cd4..b589ee1 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -285,6 +285,7 @@ class HubFactory(RegistrationFactory): # Registrar socket q = ZMQStream(ctx.socket(zmq.ROUTER), loop) + util.set_hwm(q, 0) q.bind(self.client_url('registration')) self.log.info("Hub listening on %s for registration.", self.client_url('registration')) if self.client_ip != self.engine_ip: @@ -297,6 +298,7 @@ class HubFactory(RegistrationFactory): hpub = ctx.socket(zmq.PUB) hpub.bind(self.engine_url('hb_ping')) hrep = ctx.socket(zmq.ROUTER) + util.set_hwm(hrep, 0) hrep.bind(self.engine_url('hb_pong')) self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, pingstream=ZMQStream(hpub,loop), diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 2dfda57..01fec8a 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -811,13 +811,16 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non ctx = zmq.Context() loop = ioloop.IOLoop() ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) + util.set_hwm(ins, 0) ins.setsockopt(zmq.IDENTITY, identity + b'_in') ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) + util.set_hwm(outs, 0) outs.setsockopt(zmq.IDENTITY, identity + b'_out') outs.bind(out_addr) mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) + util.set_hwm(mons, 0) mons.connect(mon_addr) nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) nots.setsockopt(zmq.SUBSCRIBE, b'') diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index dbb740f..a725d5a 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -349,3 +349,20 @@ def local_logger(logname, loglevel=logging.DEBUG): logger.setLevel(loglevel) return logger +def set_hwm(sock, hwm=0): + """set zmq High Water Mark on a socket + + in a way that always works for various pyzmq / libzmq versions. + """ + import zmq + + for key in ('HWM', 'SNDHWM', 'RCVHWM'): + opt = getattr(zmq, key, None) + if opt is None: + continue + try: + sock.setsockopt(opt, hwm) + except zmq.ZMQError: + pass + + \ No newline at end of file