##// END OF EJS Templates
allow true single-threaded Controller...
MinRK -
Show More
@@ -327,9 +327,15 b' class IPControllerApp(BaseParallelApplication):'
327 327 hub.monitor_url, hub.client_info['notification'])
328 328 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 329 log_url = self.log_url, config=dict(self.config))
330 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
331 q.daemon=True
332 children.append(q)
330 if 'Process' in self.mq_class:
331 # run the Python scheduler in a Process
332 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 q.daemon=True
334 children.append(q)
335 else:
336 # single-threaded Controller
337 kwargs['in_thread'] = True
338 launch_scheduler(*sargs, **kwargs)
333 339
334 340
335 341 def save_urls(self):
@@ -38,6 +38,7 b' from zmq.eventloop import ioloop, zmqstream'
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 42 from IPython.config.loader import Config
42 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum
43 44
@@ -650,16 +651,23 b' class TaskScheduler(SessionFactory):'
650 651
651 652 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
652 653 logname='root', log_url=None, loglevel=logging.DEBUG,
653 identity=b'task'):
654 from zmq.eventloop import ioloop
655 from zmq.eventloop.zmqstream import ZMQStream
654 identity=b'task', in_thread=False):
655
656 ZMQStream = zmqstream.ZMQStream
656 657
657 658 if config:
658 659 # unwrap dict back into Config
659 660 config = Config(config)
660 661
661 ctx = zmq.Context()
662 loop = ioloop.IOLoop()
662 if in_thread:
663 # use instance() to get the same Context/Loop as our parent
664 ctx = zmq.Context.instance()
665 loop = ioloop.IOLoop.instance()
666 else:
667 # in a process, don't use instance()
668 # for safety with multiprocessing
669 ctx = zmq.Context()
670 loop = ioloop.IOLoop()
663 671 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
664 672 ins.setsockopt(zmq.IDENTITY, identity)
665 673 ins.bind(in_addr)
@@ -667,26 +675,29 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
667 675 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
668 676 outs.setsockopt(zmq.IDENTITY, identity)
669 677 outs.bind(out_addr)
670 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
678 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
671 679 mons.connect(mon_addr)
672 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
680 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
673 681 nots.setsockopt(zmq.SUBSCRIBE, '')
674 682 nots.connect(not_addr)
675 683
676 # setup logging. Note that these will not work in-process, because they clobber
677 # existing loggers.
678 if log_url:
679 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
684 # setup logging.
685 if in_thread:
686 log = Application.instance().log
680 687 else:
681 log = local_logger(logname, loglevel)
688 if log_url:
689 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
690 else:
691 log = local_logger(logname, loglevel)
682 692
683 693 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
684 694 mon_stream=mons, notifier_stream=nots,
685 695 loop=loop, log=log,
686 696 config=config)
687 697 scheduler.start()
688 try:
689 loop.start()
690 except KeyboardInterrupt:
691 print ("interrupted, exiting...", file=sys.__stderr__)
698 if not in_thread:
699 try:
700 loop.start()
701 except KeyboardInterrupt:
702 print ("interrupted, exiting...", file=sys.__stderr__)
692 703
General Comments 0
You need to be logged in to leave comments. Login now