Show More
@@ -327,9 +327,15 b' class IPControllerApp(BaseParallelApplication):' | |||
|
327 | 327 | hub.monitor_url, hub.client_info['notification']) |
|
328 | 328 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
329 | 329 | log_url = self.log_url, config=dict(self.config)) |
|
330 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | |
|
331 | q.daemon=True | |
|
332 | children.append(q) | |
|
330 | if 'Process' in self.mq_class: | |
|
331 | # run the Python scheduler in a Process | |
|
332 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | |
|
333 | q.daemon=True | |
|
334 | children.append(q) | |
|
335 | else: | |
|
336 | # single-threaded Controller | |
|
337 | kwargs['in_thread'] = True | |
|
338 | launch_scheduler(*sargs, **kwargs) | |
|
333 | 339 | |
|
334 | 340 | |
|
335 | 341 | def save_urls(self): |
@@ -38,6 +38,7 b' from zmq.eventloop import ioloop, zmqstream' | |||
|
38 | 38 | |
|
39 | 39 | # local imports |
|
40 | 40 | from IPython.external.decorator import decorator |
|
41 | from IPython.config.application import Application | |
|
41 | 42 | from IPython.config.loader import Config |
|
42 | 43 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum |
|
43 | 44 | |
@@ -650,16 +651,23 b' class TaskScheduler(SessionFactory):' | |||
|
650 | 651 | |
|
651 | 652 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, |
|
652 | 653 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
653 | identity=b'task'): | |
|
654 | from zmq.eventloop import ioloop | |
|
655 | from zmq.eventloop.zmqstream import ZMQStream | |
|
654 | identity=b'task', in_thread=False): | |
|
655 | ||
|
656 | ZMQStream = zmqstream.ZMQStream | |
|
656 | 657 | |
|
657 | 658 | if config: |
|
658 | 659 | # unwrap dict back into Config |
|
659 | 660 | config = Config(config) |
|
660 | 661 | |
|
661 | ctx = zmq.Context() | |
|
662 | loop = ioloop.IOLoop() | |
|
662 | if in_thread: | |
|
663 | # use instance() to get the same Context/Loop as our parent | |
|
664 | ctx = zmq.Context.instance() | |
|
665 | loop = ioloop.IOLoop.instance() | |
|
666 | else: | |
|
667 | # in a process, don't use instance() | |
|
668 | # for safety with multiprocessing | |
|
669 | ctx = zmq.Context() | |
|
670 | loop = ioloop.IOLoop() | |
|
663 | 671 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
664 | 672 | ins.setsockopt(zmq.IDENTITY, identity) |
|
665 | 673 | ins.bind(in_addr) |
@@ -667,26 +675,29 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||
|
667 | 675 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) |
|
668 | 676 | outs.setsockopt(zmq.IDENTITY, identity) |
|
669 | 677 | outs.bind(out_addr) |
|
670 | mons = ZMQStream(ctx.socket(zmq.PUB),loop) | |
|
678 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | |
|
671 | 679 | mons.connect(mon_addr) |
|
672 | nots = ZMQStream(ctx.socket(zmq.SUB),loop) | |
|
680 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | |
|
673 | 681 | nots.setsockopt(zmq.SUBSCRIBE, '') |
|
674 | 682 | nots.connect(not_addr) |
|
675 | 683 | |
|
676 | # setup logging. Note that these will not work in-process, because they clobber | |
|
677 | # existing loggers. | |
|
678 | if log_url: | |
|
679 | log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | |
|
684 | # setup logging. | |
|
685 | if in_thread: | |
|
686 | log = Application.instance().log | |
|
680 | 687 | else: |
|
681 | log = local_logger(logname, loglevel) | |
|
688 | if log_url: | |
|
689 | log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | |
|
690 | else: | |
|
691 | log = local_logger(logname, loglevel) | |
|
682 | 692 | |
|
683 | 693 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
684 | 694 | mon_stream=mons, notifier_stream=nots, |
|
685 | 695 | loop=loop, log=log, |
|
686 | 696 | config=config) |
|
687 | 697 | scheduler.start() |
|
688 | try: | |
|
689 |
|
|
|
690 | except KeyboardInterrupt: | |
|
691 | print ("interrupted, exiting...", file=sys.__stderr__) | |
|
698 | if not in_thread: | |
|
699 | try: | |
|
700 | loop.start() | |
|
701 | except KeyboardInterrupt: | |
|
702 | print ("interrupted, exiting...", file=sys.__stderr__) | |
|
692 | 703 |
General Comments 0
You need to be logged in to leave comments.
Login now