Show More
@@ -418,20 +418,25 b' class ClusterApplication(BaseIPythonApplication):' | |||
|
418 | 418 | self.init_clusterdir() |
|
419 | 419 | if self.config_file: |
|
420 | 420 | self.load_config_file(self.config_file) |
|
421 | else: | |
|
422 | self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location) | |
|
421 | elif self.default_config_file_name: | |
|
422 | try: | |
|
423 | self.load_config_file(self.default_config_file_name, | |
|
424 | path=self.cluster_dir.location) | |
|
425 | except IOError: | |
|
426 | self.log.warn("Warning: Default config file not found") | |
|
423 | 427 | # command-line should *override* config file, but command-line is necessary |
|
424 | 428 | # to determine clusterdir, etc. |
|
425 | 429 | self.update_config(cl_config) |
|
426 | self.reinit_logging() | |
|
427 | ||
|
428 | 430 | self.to_work_dir() |
|
431 | self.reinit_logging() | |
|
429 | 432 | |
|
430 | 433 | def to_work_dir(self): |
|
431 | 434 | wd = self.work_dir |
|
432 | 435 | if unicode(wd) != os.getcwdu(): |
|
433 | 436 | os.chdir(wd) |
|
434 | 437 | self.log.info("Changing to working dir: %s" % wd) |
|
438 | # This is the working dir by now. | |
|
439 | sys.path.insert(0, '') | |
|
435 | 440 | |
|
436 | 441 | def load_config_file(self, filename, path=None): |
|
437 | 442 | """Load a .py based config file by filename and path.""" |
@@ -148,6 +148,7 b' class IPControllerApp(ClusterApplication):' | |||
|
148 | 148 | config = 'IPControllerApp.config_file', |
|
149 | 149 | # file = 'IPControllerApp.url_file', |
|
150 | 150 | log_level = 'IPControllerApp.log_level', |
|
151 | log_url = 'IPControllerApp.log_url', | |
|
151 | 152 | reuse_files = 'IPControllerApp.reuse_files', |
|
152 | 153 | secure = 'IPControllerApp.secure', |
|
153 | 154 | ssh = 'IPControllerApp.ssh_server', |
@@ -221,8 +222,6 b' class IPControllerApp(ClusterApplication):' | |||
|
221 | 222 | assert int(ports) == c.HubFactory.regport, "regport mismatch" |
|
222 | 223 | |
|
223 | 224 | def init_hub(self): |
|
224 | # This is the working dir by now. | |
|
225 | sys.path.insert(0, '') | |
|
226 | 225 | c = self.config |
|
227 | 226 | |
|
228 | 227 | self.do_import_statements() |
@@ -269,7 +268,7 b' class IPControllerApp(ClusterApplication):' | |||
|
269 | 268 | # |
|
270 | 269 | def init_schedulers(self): |
|
271 | 270 | children = self.children |
|
272 | mq = import_item(self.mq_class) | |
|
271 | mq = import_item(str(self.mq_class)) | |
|
273 | 272 | |
|
274 | 273 | hub = self.factory |
|
275 | 274 | # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url |
@@ -321,8 +320,8 b' class IPControllerApp(ClusterApplication):' | |||
|
321 | 320 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
322 | 321 | sargs = (hub.client_info['task'][1], hub.engine_info['task'], |
|
323 | 322 | hub.monitor_url, hub.client_info['notification']) |
|
324 |
kwargs = dict(logname= |
|
|
325 |
|
|
|
323 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
|
324 | log_url = self.log_url, config=dict(self.config)) | |
|
326 | 325 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) |
|
327 | 326 | q.daemon=True |
|
328 | 327 | children.append(q) |
@@ -351,20 +350,23 b' class IPControllerApp(ClusterApplication):' | |||
|
351 | 350 | except: |
|
352 | 351 | self.log.msg("Error running statement: %s" % s) |
|
353 | 352 | |
|
354 |
|
|
|
355 | # super(IPControllerApp, self).start_logging() | |
|
356 | # if self.config.Global.log_url: | |
|
357 |
|
|
|
358 |
|
|
|
359 |
|
|
|
360 |
|
|
|
361 | # handler.root_topic = 'controller' | |
|
362 | # handler.setLevel(self.log_level) | |
|
363 | # self.log.addHandler(handler) | |
|
353 | def forward_logging(self): | |
|
354 | if self.log_url: | |
|
355 | self.log.info("Forwarding logging to %s"%self.log_url) | |
|
356 | context = zmq.Context.instance() | |
|
357 | lsock = context.socket(zmq.PUB) | |
|
358 | lsock.connect(self.log_url) | |
|
359 | handler = PUBHandler(lsock) | |
|
360 | self.log.removeHandler(self._log_handler) | |
|
361 | handler.root_topic = 'controller' | |
|
362 | handler.setLevel(self.log_level) | |
|
363 | self.log.addHandler(handler) | |
|
364 | self._log_handler = handler | |
|
364 | 365 | # # |
|
365 | 366 | |
|
366 | 367 | def initialize(self, argv=None): |
|
367 | 368 | super(IPControllerApp, self).initialize(argv) |
|
369 | self.forward_logging() | |
|
368 | 370 | self.init_hub() |
|
369 | 371 | self.init_schedulers() |
|
370 | 372 |
@@ -25,7 +25,6 b' from zmq.eventloop import ioloop' | |||
|
25 | 25 | from IPython.parallel.apps.clusterdir import ( |
|
26 | 26 | ClusterApplication, |
|
27 | 27 | ClusterDir, |
|
28 | base_aliases, | |
|
29 | 28 | # ClusterDirConfigLoader |
|
30 | 29 | ) |
|
31 | 30 | from IPython.zmq.log import EnginePUBHandler |
@@ -37,7 +36,7 b' from IPython.parallel.engine.streamkernel import Kernel' | |||
|
37 | 36 | from IPython.parallel.util import disambiguate_url |
|
38 | 37 | |
|
39 | 38 | from IPython.utils.importstring import import_item |
|
40 |
from IPython.utils.traitlets import Bool, Unicode, Dict, List |
|
|
39 | from IPython.utils.traitlets import Bool, Unicode, Dict, List | |
|
41 | 40 | |
|
42 | 41 | |
|
43 | 42 | #----------------------------------------------------------------------------- |
@@ -122,6 +121,9 b' class IPEngineApp(ClusterApplication):' | |||
|
122 | 121 | ) |
|
123 | 122 | |
|
124 | 123 | url_file_name = Unicode(u'ipcontroller-engine.json') |
|
124 | log_url = Unicode('', config=True, | |
|
125 | help="""The URL for the iploggerapp instance, for forwarding | |
|
126 | logging to a central location.""") | |
|
125 | 127 | |
|
126 | 128 | aliases = Dict(dict( |
|
127 | 129 | config = 'IPEngineApp.config_file', |
@@ -147,6 +149,7 b' class IPEngineApp(ClusterApplication):' | |||
|
147 | 149 | mpi = 'MPI.use', |
|
148 | 150 | |
|
149 | 151 | log_level = 'IPEngineApp.log_level', |
|
152 | log_url = 'IPEngineApp.log_url' | |
|
150 | 153 | )) |
|
151 | 154 | |
|
152 | 155 | # def find_key_file(self): |
@@ -221,33 +224,17 b' class IPEngineApp(ClusterApplication):' | |||
|
221 | 224 | self.log.error("Couldn't start the Engine", exc_info=True) |
|
222 | 225 | self.exit(1) |
|
223 | 226 | |
|
224 |
|
|
|
225 | ||
|
226 | # Create the service hierarchy | |
|
227 | # self.main_service = service.MultiService() | |
|
228 | # self.engine_service.setServiceParent(self.main_service) | |
|
229 | # self.tub_service = Tub() | |
|
230 | # self.tub_service.setServiceParent(self.main_service) | |
|
231 | # # This needs to be called before the connection is initiated | |
|
232 | # self.main_service.startService() | |
|
233 | ||
|
234 | # This initiates the connection to the controller and calls | |
|
235 | # register_engine to tell the controller we are ready to do work | |
|
236 | # self.engine_connector = EngineConnector(self.tub_service) | |
|
237 | ||
|
238 | # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file) | |
|
239 | ||
|
240 | # reactor.callWhenRunning(self.call_connect) | |
|
241 | ||
|
242 | # def start_logging(self): | |
|
243 | # super(IPEngineApp, self).start_logging() | |
|
244 | # if self.master_config.Global.log_url: | |
|
245 | # context = self.engine.context | |
|
246 | # lsock = context.socket(zmq.PUB) | |
|
247 | # lsock.connect(self.master_config.Global.log_url) | |
|
248 | # handler = EnginePUBHandler(self.engine, lsock) | |
|
249 | # handler.setLevel(self.log_level) | |
|
250 | # self.log.addHandler(handler) | |
|
227 | def forward_logging(self): | |
|
228 | if self.log_url: | |
|
229 | self.log.info("Forwarding logging to %s"%self.log_url) | |
|
230 | context = self.engine.context | |
|
231 | lsock = context.socket(zmq.PUB) | |
|
232 | lsock.connect(self.log_url) | |
|
233 | self.log.removeHandler(self._log_handler) | |
|
234 | handler = EnginePUBHandler(self.engine, lsock) | |
|
235 | handler.setLevel(self.log_level) | |
|
236 | self.log.addHandler(handler) | |
|
237 | self._log_handler = handler | |
|
251 | 238 | # |
|
252 | 239 | def init_mpi(self): |
|
253 | 240 | global mpi |
@@ -268,6 +255,7 b' class IPEngineApp(ClusterApplication):' | |||
|
268 | 255 | super(IPEngineApp, self).initialize(argv) |
|
269 | 256 | self.init_mpi() |
|
270 | 257 | self.init_engine() |
|
258 | self.forward_logging() | |
|
271 | 259 | |
|
272 | 260 | def start(self): |
|
273 | 261 | self.engine.start() |
@@ -20,9 +20,12 b' import sys' | |||
|
20 | 20 | |
|
21 | 21 | import zmq |
|
22 | 22 | |
|
23 | from IPython.utils.traitlets import Bool, Dict | |
|
24 | ||
|
23 | 25 | from IPython.parallel.apps.clusterdir import ( |
|
24 | 26 | ClusterApplication, |
|
25 |
ClusterDir |
|
|
27 | ClusterDir, | |
|
28 | base_aliases | |
|
26 | 29 | ) |
|
27 | 30 | from IPython.parallel.apps.logwatcher import LogWatcher |
|
28 | 31 | |
@@ -43,79 +46,40 b' usually located in your ipython directory and named as "cluster_<profile>".' | |||
|
43 | 46 | See the --profile and --cluster-dir options for details. |
|
44 | 47 | """ |
|
45 | 48 | |
|
46 | #----------------------------------------------------------------------------- | |
|
47 | # Command line options | |
|
48 | #----------------------------------------------------------------------------- | |
|
49 | ||
|
50 | ||
|
51 | class IPLoggerAppConfigLoader(ClusterDirConfigLoader): | |
|
52 | ||
|
53 | def _add_arguments(self): | |
|
54 | super(IPLoggerAppConfigLoader, self)._add_arguments() | |
|
55 | paa = self.parser.add_argument | |
|
56 | # Controller config | |
|
57 | paa('--url', | |
|
58 | type=str, dest='LogWatcher.url', | |
|
59 | help='The url the LogWatcher will listen on', | |
|
60 | ) | |
|
61 | # MPI | |
|
62 | paa('--topics', | |
|
63 | type=str, dest='LogWatcher.topics', nargs='+', | |
|
64 | help='What topics to subscribe to', | |
|
65 | metavar='topics') | |
|
66 | # Global config | |
|
67 | paa('--log-to-file', | |
|
68 | action='store_true', dest='Global.log_to_file', | |
|
69 | help='Log to a file in the log directory (default is stdout)') | |
|
70 | ||
|
71 | 49 | |
|
72 | 50 | #----------------------------------------------------------------------------- |
|
73 | 51 | # Main application |
|
74 | 52 | #----------------------------------------------------------------------------- |
|
75 | ||
|
53 | aliases = {} | |
|
54 | aliases.update(base_aliases) | |
|
55 | aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics')) | |
|
76 | 56 | |
|
77 | 57 | class IPLoggerApp(ClusterApplication): |
|
78 | 58 | |
|
79 | 59 | name = u'iploggerz' |
|
80 | 60 | description = _description |
|
81 | command_line_loader = IPLoggerAppConfigLoader | |
|
82 | 61 | default_config_file_name = default_config_file_name |
|
83 |
auto_create_cluster_dir = |
|
|
84 | ||
|
85 | def create_default_config(self): | |
|
86 | super(IPLoggerApp, self).create_default_config() | |
|
87 | ||
|
88 | # The engine should not clean logs as we don't want to remove the | |
|
89 | # active log files of other running engines. | |
|
90 | self.default_config.Global.clean_logs = False | |
|
91 | ||
|
92 | # If given, this is the actual location of the logger's URL file. | |
|
93 | # If not, this is computed using the profile, app_dir and furl_file_name | |
|
94 | self.default_config.Global.url_file_name = u'iplogger.url' | |
|
95 | self.default_config.Global.url_file = u'' | |
|
62 | auto_create_cluster_dir = Bool(False) | |
|
96 | 63 | |
|
97 | def post_load_command_line_config(self): | |
|
98 | pass | |
|
64 | classes = [LogWatcher, ClusterDir] | |
|
65 | aliases = Dict(aliases) | |
|
99 | 66 | |
|
100 | def pre_construct(self): | |
|
101 |
super(IPLoggerApp, self). |
|
|
102 | ||
|
103 | def construct(self): | |
|
104 | # This is the working dir by now. | |
|
105 | sys.path.insert(0, '') | |
|
106 | ||
|
107 | self.start_logging() | |
|
67 | def initialize(self, argv=None): | |
|
68 | super(IPLoggerApp, self).initialize(argv) | |
|
69 | self.init_watcher() | |
|
108 | 70 | |
|
71 | def init_watcher(self): | |
|
109 | 72 | try: |
|
110 |
self.watcher = LogWatcher(config=self. |
|
|
73 | self.watcher = LogWatcher(config=self.config, logname=self.log.name) | |
|
111 | 74 | except: |
|
112 | 75 | self.log.error("Couldn't start the LogWatcher", exc_info=True) |
|
113 | 76 | self.exit(1) |
|
77 | self.log.info("Listening for log messages on %r"%self.watcher.url) | |
|
114 | 78 | |
|
115 | 79 | |
|
116 |
def start |
|
|
117 | try: | |
|
80 | def start(self): | |
|
118 | 81 |
|
|
82 | try: | |
|
119 | 83 | self.watcher.loop.start() |
|
120 | 84 | except KeyboardInterrupt: |
|
121 | 85 | self.log.critical("Logging Interrupted, shutting down...\n") |
@@ -124,6 +88,7 b' class IPLoggerApp(ClusterApplication):' | |||
|
124 | 88 | def launch_new_instance(): |
|
125 | 89 | """Create and run the IPython LogWatcher""" |
|
126 | 90 | app = IPLoggerApp() |
|
91 | app.initialize() | |
|
127 | 92 | app.start() |
|
128 | 93 | |
|
129 | 94 |
@@ -35,13 +35,19 b' class LogWatcher(LoggingFactory):' | |||
|
35 | 35 | This can subscribe to multiple topics, but defaults to all topics. |
|
36 | 36 | """ |
|
37 | 37 | # configurables |
|
38 |
topics = List([''], config=True |
|
|
39 | url = Unicode('tcp://127.0.0.1:20202', config=True) | |
|
38 | topics = List([''], config=True, | |
|
39 | help="The ZMQ topics to subscribe to. Default is to subscribe to all messages") | |
|
40 | url = Unicode('tcp://127.0.0.1:20202', config=True, | |
|
41 | help="ZMQ url on which to listen for log messages") | |
|
40 | 42 | |
|
41 | 43 | # internals |
|
42 | context = Instance(zmq.Context, (), {}) | |
|
43 | 44 | stream = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
44 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |
|
45 | ||
|
46 | context = Instance(zmq.Context) | |
|
47 | def _context_default(self): | |
|
48 | return zmq.Context.instance() | |
|
49 | ||
|
50 | loop = Instance(zmq.eventloop.ioloop.IOLoop) | |
|
45 | 51 | def _loop_default(self): |
|
46 | 52 | return ioloop.IOLoop.instance() |
|
47 | 53 | |
@@ -62,8 +68,12 b' class LogWatcher(LoggingFactory):' | |||
|
62 | 68 | def subscribe(self): |
|
63 | 69 | """Update our SUB socket's subscriptions.""" |
|
64 | 70 | self.stream.setsockopt(zmq.UNSUBSCRIBE, '') |
|
71 | if '' in self.topics: | |
|
72 | self.log.debug("Subscribing to: everything") | |
|
73 | self.stream.setsockopt(zmq.SUBSCRIBE, '') | |
|
74 | else: | |
|
65 | 75 | for topic in self.topics: |
|
66 | self.log.debug("Subscribing to: %r"%topic) | |
|
76 | self.log.debug("Subscribing to: %r"%(topic)) | |
|
67 | 77 | self.stream.setsockopt(zmq.SUBSCRIBE, topic) |
|
68 | 78 | |
|
69 | 79 | def _extract_level(self, topic_str): |
@@ -94,5 +104,5 b' class LogWatcher(LoggingFactory):' | |||
|
94 | 104 | level,topic = self._extract_level(topic) |
|
95 | 105 | if msg[-1] == '\n': |
|
96 | 106 | msg = msg[:-1] |
|
97 |
log |
|
|
107 | self.log.log(level, "[%s] %s" % (topic, msg)) | |
|
98 | 108 |
@@ -141,7 +141,9 b' class TaskScheduler(SessionFactory):' | |||
|
141 | 141 | self.scheme = globals()[new] |
|
142 | 142 | |
|
143 | 143 | # input arguments: |
|
144 |
scheme = Instance(FunctionType |
|
|
144 | scheme = Instance(FunctionType) # function for determining the destination | |
|
145 | def _scheme_default(self): | |
|
146 | return leastload | |
|
145 | 147 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream |
|
146 | 148 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
147 | 149 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
@@ -634,7 +636,7 b' class TaskScheduler(SessionFactory):' | |||
|
634 | 636 | |
|
635 | 637 | |
|
636 | 638 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', |
|
637 |
log_ |
|
|
639 | log_url=None, loglevel=logging.DEBUG, | |
|
638 | 640 | identity=b'task'): |
|
639 | 641 | from zmq.eventloop import ioloop |
|
640 | 642 | from zmq.eventloop.zmqstream import ZMQStream |
@@ -658,10 +660,10 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname=' | |||
|
658 | 660 | nots.setsockopt(zmq.SUBSCRIBE, '') |
|
659 | 661 | nots.connect(not_addr) |
|
660 | 662 | |
|
661 | # scheme = globals().get(scheme, None) | |
|
662 |
# |
|
|
663 |
if log_ |
|
|
664 |
connect_logger(logname, ctx, log_ |
|
|
663 | # setup logging. Note that these will not work in-process, because they clobber | |
|
664 | # existing loggers. | |
|
665 | if log_url: | |
|
666 | connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | |
|
665 | 667 | else: |
|
666 | 668 | local_logger(logname, loglevel) |
|
667 | 669 |
General Comments 0
You need to be logged in to leave comments.
Login now