##// END OF EJS Templates
allow true single-threaded Controller...
MinRK -
Show More
@@ -327,9 +327,15 b' class IPControllerApp(BaseParallelApplication):'
327 hub.monitor_url, hub.client_info['notification'])
327 hub.monitor_url, hub.client_info['notification'])
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 log_url = self.log_url, config=dict(self.config))
329 log_url = self.log_url, config=dict(self.config))
330 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
330 if 'Process' in self.mq_class:
331 q.daemon=True
331 # run the Python scheduler in a Process
332 children.append(q)
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 q.daemon=True
334 children.append(q)
335 else:
336 # single-threaded Controller
337 kwargs['in_thread'] = True
338 launch_scheduler(*sargs, **kwargs)
333
339
334
340
335 def save_urls(self):
341 def save_urls(self):
@@ -38,6 +38,7 b' from zmq.eventloop import ioloop, zmqstream'
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.loader import Config
42 from IPython.config.loader import Config
42 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43
44
@@ -650,16 +651,23 b' class TaskScheduler(SessionFactory):'
650
651
651 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
652 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
652 logname='root', log_url=None, loglevel=logging.DEBUG,
653 logname='root', log_url=None, loglevel=logging.DEBUG,
653 identity=b'task'):
654 identity=b'task', in_thread=False):
654 from zmq.eventloop import ioloop
655
655 from zmq.eventloop.zmqstream import ZMQStream
656 ZMQStream = zmqstream.ZMQStream
656
657
657 if config:
658 if config:
658 # unwrap dict back into Config
659 # unwrap dict back into Config
659 config = Config(config)
660 config = Config(config)
660
661
661 ctx = zmq.Context()
662 if in_thread:
662 loop = ioloop.IOLoop()
663 # use instance() to get the same Context/Loop as our parent
664 ctx = zmq.Context.instance()
665 loop = ioloop.IOLoop.instance()
666 else:
667 # in a process, don't use instance()
668 # for safety with multiprocessing
669 ctx = zmq.Context()
670 loop = ioloop.IOLoop()
663 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
671 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
664 ins.setsockopt(zmq.IDENTITY, identity)
672 ins.setsockopt(zmq.IDENTITY, identity)
665 ins.bind(in_addr)
673 ins.bind(in_addr)
@@ -667,26 +675,29 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
667 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
675 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
668 outs.setsockopt(zmq.IDENTITY, identity)
676 outs.setsockopt(zmq.IDENTITY, identity)
669 outs.bind(out_addr)
677 outs.bind(out_addr)
670 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
678 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
671 mons.connect(mon_addr)
679 mons.connect(mon_addr)
672 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
680 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
673 nots.setsockopt(zmq.SUBSCRIBE, '')
681 nots.setsockopt(zmq.SUBSCRIBE, '')
674 nots.connect(not_addr)
682 nots.connect(not_addr)
675
683
676 # setup logging. Note that these will not work in-process, because they clobber
684 # setup logging.
677 # existing loggers.
685 if in_thread:
678 if log_url:
686 log = Application.instance().log
679 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
680 else:
687 else:
681 log = local_logger(logname, loglevel)
688 if log_url:
689 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
690 else:
691 log = local_logger(logname, loglevel)
682
692
683 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
693 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
684 mon_stream=mons, notifier_stream=nots,
694 mon_stream=mons, notifier_stream=nots,
685 loop=loop, log=log,
695 loop=loop, log=log,
686 config=config)
696 config=config)
687 scheduler.start()
697 scheduler.start()
688 try:
698 if not in_thread:
689 loop.start()
699 try:
690 except KeyboardInterrupt:
700 loop.start()
691 print ("interrupted, exiting...", file=sys.__stderr__)
701 except KeyboardInterrupt:
702 print ("interrupted, exiting...", file=sys.__stderr__)
692
703
General Comments 0
You need to be logged in to leave comments. Login now