##// END OF EJS Templates
allow true single-threaded Controller...
MinRK -
Show More
@@ -327,9 +327,15 b' class IPControllerApp(BaseParallelApplication):'
327 hub.monitor_url, hub.client_info['notification'])
327 hub.monitor_url, hub.client_info['notification'])
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 log_url = self.log_url, config=dict(self.config))
329 log_url = self.log_url, config=dict(self.config))
330 if 'Process' in self.mq_class:
331 # run the Python scheduler in a Process
330 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
331 q.daemon=True
333 q.daemon=True
332 children.append(q)
334 children.append(q)
335 else:
336 # single-threaded Controller
337 kwargs['in_thread'] = True
338 launch_scheduler(*sargs, **kwargs)
333
339
334
340
335 def save_urls(self):
341 def save_urls(self):
@@ -38,6 +38,7 b' from zmq.eventloop import ioloop, zmqstream'
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.loader import Config
42 from IPython.config.loader import Config
42 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43
44
@@ -650,14 +651,21 b' class TaskScheduler(SessionFactory):'
650
651
651 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
652 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
652 logname='root', log_url=None, loglevel=logging.DEBUG,
653 logname='root', log_url=None, loglevel=logging.DEBUG,
653 identity=b'task'):
654 identity=b'task', in_thread=False):
654 from zmq.eventloop import ioloop
655
655 from zmq.eventloop.zmqstream import ZMQStream
656 ZMQStream = zmqstream.ZMQStream
656
657
657 if config:
658 if config:
658 # unwrap dict back into Config
659 # unwrap dict back into Config
659 config = Config(config)
660 config = Config(config)
660
661
662 if in_thread:
663 # use instance() to get the same Context/Loop as our parent
664 ctx = zmq.Context.instance()
665 loop = ioloop.IOLoop.instance()
666 else:
667 # in a process, don't use instance()
668 # for safety with multiprocessing
661 ctx = zmq.Context()
669 ctx = zmq.Context()
662 loop = ioloop.IOLoop()
670 loop = ioloop.IOLoop()
663 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
671 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
@@ -667,14 +675,16 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
667 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
675 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
668 outs.setsockopt(zmq.IDENTITY, identity)
676 outs.setsockopt(zmq.IDENTITY, identity)
669 outs.bind(out_addr)
677 outs.bind(out_addr)
670 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
678 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
671 mons.connect(mon_addr)
679 mons.connect(mon_addr)
672 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
680 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
673 nots.setsockopt(zmq.SUBSCRIBE, '')
681 nots.setsockopt(zmq.SUBSCRIBE, '')
674 nots.connect(not_addr)
682 nots.connect(not_addr)
675
683
676 # setup logging. Note that these will not work in-process, because they clobber
684 # setup logging.
677 # existing loggers.
685 if in_thread:
686 log = Application.instance().log
687 else:
678 if log_url:
688 if log_url:
679 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
689 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
680 else:
690 else:
@@ -685,6 +695,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
685 loop=loop, log=log,
695 loop=loop, log=log,
686 config=config)
696 config=config)
687 scheduler.start()
697 scheduler.start()
698 if not in_thread:
688 try:
699 try:
689 loop.start()
700 loop.start()
690 except KeyboardInterrupt:
701 except KeyboardInterrupt:
General Comments 0
You need to be logged in to leave comments. Login now