From d370cfd6fe30e557700677bc5994c0501c54e860 2011-06-24 05:36:40 From: MinRK Date: 2011-06-24 05:36:40 Subject: [PATCH] allow true single-threaded Controller Previously --usethreads only put MonitoredQueue devices in threads, and still launched a subprocess for the Python scheduler. This adds support for the Python Scheduler in a thread. --- diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index ae64327..8597a32 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -327,9 +327,15 @@ class IPControllerApp(BaseParallelApplication): hub.monitor_url, hub.client_info['notification']) kwargs = dict(logname='scheduler', loglevel=self.log_level, log_url = self.log_url, config=dict(self.config)) - q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) - q.daemon=True - children.append(q) + if 'Process' in self.mq_class: + # run the Python scheduler in a Process + q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) + q.daemon=True + children.append(q) + else: + # single-threaded Controller + kwargs['in_thread'] = True + launch_scheduler(*sargs, **kwargs) def save_urls(self): diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index addba85..c49f9c7 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -38,6 +38,7 @@ from zmq.eventloop import ioloop, zmqstream # local imports from IPython.external.decorator import decorator +from IPython.config.application import Application from IPython.config.loader import Config from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum @@ -650,16 +651,23 @@ class TaskScheduler(SessionFactory): def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, logname='root', log_url=None, loglevel=logging.DEBUG, - identity=b'task'): - from zmq.eventloop import ioloop - from zmq.eventloop.zmqstream import ZMQStream + identity=b'task', in_thread=False): + + ZMQStream = zmqstream.ZMQStream if config: # unwrap dict back into Config config = Config(config) - ctx = zmq.Context() - loop = ioloop.IOLoop() + if in_thread: + # use instance() to get the same Context/Loop as our parent + ctx = zmq.Context.instance() + loop = ioloop.IOLoop.instance() + else: + # in a process, don't use instance() + # for safety with multiprocessing + ctx = zmq.Context() + loop = ioloop.IOLoop() ins = ZMQStream(ctx.socket(zmq.XREP),loop) ins.setsockopt(zmq.IDENTITY, identity) ins.bind(in_addr) @@ -667,26 +675,29 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, outs = ZMQStream(ctx.socket(zmq.XREP),loop) outs.setsockopt(zmq.IDENTITY, identity) outs.bind(out_addr) - mons = ZMQStream(ctx.socket(zmq.PUB),loop) + mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) mons.connect(mon_addr) - nots = ZMQStream(ctx.socket(zmq.SUB),loop) + nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) nots.setsockopt(zmq.SUBSCRIBE, '') nots.connect(not_addr) - # setup logging. Note that these will not work in-process, because they clobber - # existing loggers. - if log_url: - log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) + # setup logging. + if in_thread: + log = Application.instance().log else: - log = local_logger(logname, loglevel) + if log_url: + log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) + else: + log = local_logger(logname, loglevel) scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, loop=loop, log=log, config=config) scheduler.start() - try: - loop.start() - except KeyboardInterrupt: - print ("interrupted, exiting...", file=sys.__stderr__) + if not in_thread: + try: + loop.start() + except KeyboardInterrupt: + print ("interrupted, exiting...", file=sys.__stderr__)