Show More
@@ -327,9 +327,15 b' class IPControllerApp(BaseParallelApplication):' | |||||
327 | hub.monitor_url, hub.client_info['notification']) |
|
327 | hub.monitor_url, hub.client_info['notification']) | |
328 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
328 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
329 | log_url = self.log_url, config=dict(self.config)) |
|
329 | log_url = self.log_url, config=dict(self.config)) | |
330 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) |
|
330 | if 'Process' in self.mq_class: | |
331 | q.daemon=True |
|
331 | # run the Python scheduler in a Process | |
332 | children.append(q) |
|
332 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | |
|
333 | q.daemon=True | |||
|
334 | children.append(q) | |||
|
335 | else: | |||
|
336 | # single-threaded Controller | |||
|
337 | kwargs['in_thread'] = True | |||
|
338 | launch_scheduler(*sargs, **kwargs) | |||
333 |
|
339 | |||
334 |
|
340 | |||
335 | def save_urls(self): |
|
341 | def save_urls(self): |
@@ -38,6 +38,7 b' from zmq.eventloop import ioloop, zmqstream' | |||||
38 |
|
38 | |||
39 | # local imports |
|
39 | # local imports | |
40 | from IPython.external.decorator import decorator |
|
40 | from IPython.external.decorator import decorator | |
|
41 | from IPython.config.application import Application | |||
41 | from IPython.config.loader import Config |
|
42 | from IPython.config.loader import Config | |
42 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum |
|
43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum | |
43 |
|
44 | |||
@@ -650,16 +651,23 b' class TaskScheduler(SessionFactory):' | |||||
650 |
|
651 | |||
651 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, |
|
652 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |
652 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
653 | logname='root', log_url=None, loglevel=logging.DEBUG, | |
653 | identity=b'task'): |
|
654 | identity=b'task', in_thread=False): | |
654 | from zmq.eventloop import ioloop |
|
655 | ||
655 | from zmq.eventloop.zmqstream import ZMQStream |
|
656 | ZMQStream = zmqstream.ZMQStream | |
656 |
|
657 | |||
657 | if config: |
|
658 | if config: | |
658 | # unwrap dict back into Config |
|
659 | # unwrap dict back into Config | |
659 | config = Config(config) |
|
660 | config = Config(config) | |
660 |
|
661 | |||
661 | ctx = zmq.Context() |
|
662 | if in_thread: | |
662 | loop = ioloop.IOLoop() |
|
663 | # use instance() to get the same Context/Loop as our parent | |
|
664 | ctx = zmq.Context.instance() | |||
|
665 | loop = ioloop.IOLoop.instance() | |||
|
666 | else: | |||
|
667 | # in a process, don't use instance() | |||
|
668 | # for safety with multiprocessing | |||
|
669 | ctx = zmq.Context() | |||
|
670 | loop = ioloop.IOLoop() | |||
663 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
671 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | |
664 | ins.setsockopt(zmq.IDENTITY, identity) |
|
672 | ins.setsockopt(zmq.IDENTITY, identity) | |
665 | ins.bind(in_addr) |
|
673 | ins.bind(in_addr) | |
@@ -667,26 +675,29 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
667 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
675 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) | |
668 | outs.setsockopt(zmq.IDENTITY, identity) |
|
676 | outs.setsockopt(zmq.IDENTITY, identity) | |
669 | outs.bind(out_addr) |
|
677 | outs.bind(out_addr) | |
670 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) |
|
678 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | |
671 | mons.connect(mon_addr) |
|
679 | mons.connect(mon_addr) | |
672 | nots = ZMQStream(ctx.socket(zmq.SUB),loop) |
|
680 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | |
673 | nots.setsockopt(zmq.SUBSCRIBE, '') |
|
681 | nots.setsockopt(zmq.SUBSCRIBE, '') | |
674 | nots.connect(not_addr) |
|
682 | nots.connect(not_addr) | |
675 |
|
683 | |||
676 | # setup logging. Note that these will not work in-process, because they clobber |
|
684 | # setup logging. | |
677 | # existing loggers. |
|
685 | if in_thread: | |
678 | if log_url: |
|
686 | log = Application.instance().log | |
679 | log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) |
|
|||
680 | else: |
|
687 | else: | |
681 | log = local_logger(logname, loglevel) |
|
688 | if log_url: | |
|
689 | log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | |||
|
690 | else: | |||
|
691 | log = local_logger(logname, loglevel) | |||
682 |
|
692 | |||
683 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
693 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
684 | mon_stream=mons, notifier_stream=nots, |
|
694 | mon_stream=mons, notifier_stream=nots, | |
685 | loop=loop, log=log, |
|
695 | loop=loop, log=log, | |
686 | config=config) |
|
696 | config=config) | |
687 | scheduler.start() |
|
697 | scheduler.start() | |
688 | try: |
|
698 | if not in_thread: | |
689 |
|
|
699 | try: | |
690 | except KeyboardInterrupt: |
|
700 | loop.start() | |
691 | print ("interrupted, exiting...", file=sys.__stderr__) |
|
701 | except KeyboardInterrupt: | |
|
702 | print ("interrupted, exiting...", file=sys.__stderr__) | |||
692 |
|
703 |
General Comments 0
You need to be logged in to leave comments.
Login now