##// END OF EJS Templates
re-enable log forwarding and iplogger
MinRK -
Show More
@@ -418,20 +418,25 b' class ClusterApplication(BaseIPythonApplication):'
418 self.init_clusterdir()
418 self.init_clusterdir()
419 if self.config_file:
419 if self.config_file:
420 self.load_config_file(self.config_file)
420 self.load_config_file(self.config_file)
421 else:
421 elif self.default_config_file_name:
422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
423 # command-line should *override* config file, but command-line is necessary
427 # command-line should *override* config file, but command-line is necessary
424 # to determine clusterdir, etc.
428 # to determine clusterdir, etc.
425 self.update_config(cl_config)
429 self.update_config(cl_config)
426 self.reinit_logging()
427
428 self.to_work_dir()
430 self.to_work_dir()
431 self.reinit_logging()
429
432
430 def to_work_dir(self):
433 def to_work_dir(self):
431 wd = self.work_dir
434 wd = self.work_dir
432 if unicode(wd) != os.getcwdu():
435 if unicode(wd) != os.getcwdu():
433 os.chdir(wd)
436 os.chdir(wd)
434 self.log.info("Changing to working dir: %s" % wd)
437 self.log.info("Changing to working dir: %s" % wd)
438 # This is the working dir by now.
439 sys.path.insert(0, '')
435
440
436 def load_config_file(self, filename, path=None):
441 def load_config_file(self, filename, path=None):
437 """Load a .py based config file by filename and path."""
442 """Load a .py based config file by filename and path."""
@@ -148,6 +148,7 b' class IPControllerApp(ClusterApplication):'
148 config = 'IPControllerApp.config_file',
148 config = 'IPControllerApp.config_file',
149 # file = 'IPControllerApp.url_file',
149 # file = 'IPControllerApp.url_file',
150 log_level = 'IPControllerApp.log_level',
150 log_level = 'IPControllerApp.log_level',
151 log_url = 'IPControllerApp.log_url',
151 reuse_files = 'IPControllerApp.reuse_files',
152 reuse_files = 'IPControllerApp.reuse_files',
152 secure = 'IPControllerApp.secure',
153 secure = 'IPControllerApp.secure',
153 ssh = 'IPControllerApp.ssh_server',
154 ssh = 'IPControllerApp.ssh_server',
@@ -221,8 +222,6 b' class IPControllerApp(ClusterApplication):'
221 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222
223
223 def init_hub(self):
224 def init_hub(self):
224 # This is the working dir by now.
225 sys.path.insert(0, '')
226 c = self.config
225 c = self.config
227
226
228 self.do_import_statements()
227 self.do_import_statements()
@@ -269,7 +268,7 b' class IPControllerApp(ClusterApplication):'
269 #
268 #
270 def init_schedulers(self):
269 def init_schedulers(self):
271 children = self.children
270 children = self.children
272 mq = import_item(self.mq_class)
271 mq = import_item(str(self.mq_class))
273
272
274 hub = self.factory
273 hub = self.factory
275 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
@@ -321,8 +320,8 b' class IPControllerApp(ClusterApplication):'
321 self.log.info("task::using Python %s Task scheduler"%scheme)
320 self.log.info("task::using Python %s Task scheduler"%scheme)
322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 hub.monitor_url, hub.client_info['notification'])
322 hub.monitor_url, hub.client_info['notification'])
324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
325 config=dict(self.config))
324 log_url = self.log_url, config=dict(self.config))
326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 q.daemon=True
326 q.daemon=True
328 children.append(q)
327 children.append(q)
@@ -351,20 +350,23 b' class IPControllerApp(ClusterApplication):'
351 except:
350 except:
352 self.log.msg("Error running statement: %s" % s)
351 self.log.msg("Error running statement: %s" % s)
353
352
354 # def start_logging(self):
353 def forward_logging(self):
355 # super(IPControllerApp, self).start_logging()
354 if self.log_url:
356 # if self.config.Global.log_url:
355 self.log.info("Forwarding logging to %s"%self.log_url)
357 # context = self.factory.context
356 context = zmq.Context.instance()
358 # lsock = context.socket(zmq.PUB)
357 lsock = context.socket(zmq.PUB)
359 # lsock.connect(self.config.Global.log_url)
358 lsock.connect(self.log_url)
360 # handler = PUBHandler(lsock)
359 handler = PUBHandler(lsock)
361 # handler.root_topic = 'controller'
360 self.log.removeHandler(self._log_handler)
362 # handler.setLevel(self.log_level)
361 handler.root_topic = 'controller'
363 # self.log.addHandler(handler)
362 handler.setLevel(self.log_level)
363 self.log.addHandler(handler)
364 self._log_handler = handler
364 # #
365 # #
365
366
366 def initialize(self, argv=None):
367 def initialize(self, argv=None):
367 super(IPControllerApp, self).initialize(argv)
368 super(IPControllerApp, self).initialize(argv)
369 self.forward_logging()
368 self.init_hub()
370 self.init_hub()
369 self.init_schedulers()
371 self.init_schedulers()
370
372
@@ -25,7 +25,6 b' from zmq.eventloop import ioloop'
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
26 ClusterApplication,
27 ClusterDir,
27 ClusterDir,
28 base_aliases,
29 # ClusterDirConfigLoader
28 # ClusterDirConfigLoader
30 )
29 )
31 from IPython.zmq.log import EnginePUBHandler
30 from IPython.zmq.log import EnginePUBHandler
@@ -37,7 +36,7 b' from IPython.parallel.engine.streamkernel import Kernel'
37 from IPython.parallel.util import disambiguate_url
36 from IPython.parallel.util import disambiguate_url
38
37
39 from IPython.utils.importstring import import_item
38 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Bool, Unicode, Dict, List, CStr
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
41
40
42
41
43 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
@@ -122,6 +121,9 b' class IPEngineApp(ClusterApplication):'
122 )
121 )
123
122
124 url_file_name = Unicode(u'ipcontroller-engine.json')
123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
125
127
126 aliases = Dict(dict(
128 aliases = Dict(dict(
127 config = 'IPEngineApp.config_file',
129 config = 'IPEngineApp.config_file',
@@ -147,6 +149,7 b' class IPEngineApp(ClusterApplication):'
147 mpi = 'MPI.use',
149 mpi = 'MPI.use',
148
150
149 log_level = 'IPEngineApp.log_level',
151 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
150 ))
153 ))
151
154
152 # def find_key_file(self):
155 # def find_key_file(self):
@@ -221,33 +224,17 b' class IPEngineApp(ClusterApplication):'
221 self.log.error("Couldn't start the Engine", exc_info=True)
224 self.log.error("Couldn't start the Engine", exc_info=True)
222 self.exit(1)
225 self.exit(1)
223
226
224 # self.start_logging()
227 def forward_logging(self):
225
228 if self.log_url:
226 # Create the service hierarchy
229 self.log.info("Forwarding logging to %s"%self.log_url)
227 # self.main_service = service.MultiService()
230 context = self.engine.context
228 # self.engine_service.setServiceParent(self.main_service)
231 lsock = context.socket(zmq.PUB)
229 # self.tub_service = Tub()
232 lsock.connect(self.log_url)
230 # self.tub_service.setServiceParent(self.main_service)
233 self.log.removeHandler(self._log_handler)
231 # # This needs to be called before the connection is initiated
234 handler = EnginePUBHandler(self.engine, lsock)
232 # self.main_service.startService()
235 handler.setLevel(self.log_level)
233
236 self.log.addHandler(handler)
234 # This initiates the connection to the controller and calls
237 self._log_handler = handler
235 # register_engine to tell the controller we are ready to do work
236 # self.engine_connector = EngineConnector(self.tub_service)
237
238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
239
240 # reactor.callWhenRunning(self.call_connect)
241
242 # def start_logging(self):
243 # super(IPEngineApp, self).start_logging()
244 # if self.master_config.Global.log_url:
245 # context = self.engine.context
246 # lsock = context.socket(zmq.PUB)
247 # lsock.connect(self.master_config.Global.log_url)
248 # handler = EnginePUBHandler(self.engine, lsock)
249 # handler.setLevel(self.log_level)
250 # self.log.addHandler(handler)
251 #
238 #
252 def init_mpi(self):
239 def init_mpi(self):
253 global mpi
240 global mpi
@@ -268,6 +255,7 b' class IPEngineApp(ClusterApplication):'
268 super(IPEngineApp, self).initialize(argv)
255 super(IPEngineApp, self).initialize(argv)
269 self.init_mpi()
256 self.init_mpi()
270 self.init_engine()
257 self.init_engine()
258 self.forward_logging()
271
259
272 def start(self):
260 def start(self):
273 self.engine.start()
261 self.engine.start()
@@ -20,9 +20,12 b' import sys'
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.utils.traitlets import Bool, Dict
24
23 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
24 ClusterApplication,
26 ClusterApplication,
25 ClusterDirConfigLoader
27 ClusterDir,
28 base_aliases
26 )
29 )
27 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
28
31
@@ -43,79 +46,40 b' usually located in your ipython directory and named as "cluster_<profile>".'
43 See the --profile and --cluster-dir options for details.
46 See the --profile and --cluster-dir options for details.
44 """
47 """
45
48
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71
49
72 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
73 # Main application
51 # Main application
74 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
75
53 aliases = {}
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
76
56
77 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(ClusterApplication):
78
58
79 name = u'iploggerz'
59 name = u'iploggerz'
80 description = _description
60 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
61 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
62 auto_create_cluster_dir = Bool(False)
84
63
85 def create_default_config(self):
64 classes = [LogWatcher, ClusterDir]
86 super(IPLoggerApp, self).create_default_config()
65 aliases = Dict(aliases)
87
66
88 # The engine should not clean logs as we don't want to remove the
67 def initialize(self, argv=None):
89 # active log files of other running engines.
68 super(IPLoggerApp, self).initialize(argv)
90 self.default_config.Global.clean_logs = False
69 self.init_watcher()
91
70
92 # If given, this is the actual location of the logger's URL file.
71 def init_watcher(self):
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
96
97 def post_load_command_line_config(self):
98 pass
99
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
108
109 try:
72 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
111 except:
74 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
76 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
114
78
115
79
116 def start_app(self):
80 def start(self):
81 self.watcher.start()
117 try:
82 try:
118 self.watcher.start()
119 self.watcher.loop.start()
83 self.watcher.loop.start()
120 except KeyboardInterrupt:
84 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
85 self.log.critical("Logging Interrupted, shutting down...\n")
@@ -124,6 +88,7 b' class IPLoggerApp(ClusterApplication):'
124 def launch_new_instance():
88 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
89 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
90 app = IPLoggerApp()
91 app.initialize()
127 app.start()
92 app.start()
128
93
129
94
@@ -35,13 +35,19 b' class LogWatcher(LoggingFactory):'
35 This can subscribe to multiple topics, but defaults to all topics.
35 This can subscribe to multiple topics, but defaults to all topics.
36 """
36 """
37 # configurables
37 # configurables
38 topics = List([''], config=True)
38 topics = List([''], config=True,
39 url = Unicode('tcp://127.0.0.1:20202', config=True)
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
40 url = Unicode('tcp://127.0.0.1:20202', config=True,
41 help="ZMQ url on which to listen for log messages")
40
42
41 # internals
43 # internals
42 context = Instance(zmq.Context, (), {})
43 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 loop = Instance('zmq.eventloop.ioloop.IOLoop')
45
46 context = Instance(zmq.Context)
47 def _context_default(self):
48 return zmq.Context.instance()
49
50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
45 def _loop_default(self):
51 def _loop_default(self):
46 return ioloop.IOLoop.instance()
52 return ioloop.IOLoop.instance()
47
53
@@ -62,9 +68,13 b' class LogWatcher(LoggingFactory):'
62 def subscribe(self):
68 def subscribe(self):
63 """Update our SUB socket's subscriptions."""
69 """Update our SUB socket's subscriptions."""
64 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
65 for topic in self.topics:
71 if '' in self.topics:
66 self.log.debug("Subscribing to: %r"%topic)
72 self.log.debug("Subscribing to: everything")
67 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
74 else:
75 for topic in self.topics:
76 self.log.debug("Subscribing to: %r"%(topic))
77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
68
78
69 def _extract_level(self, topic_str):
79 def _extract_level(self, topic_str):
70 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
80 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
@@ -94,5 +104,5 b' class LogWatcher(LoggingFactory):'
94 level,topic = self._extract_level(topic)
104 level,topic = self._extract_level(topic)
95 if msg[-1] == '\n':
105 if msg[-1] == '\n':
96 msg = msg[:-1]
106 msg = msg[:-1]
97 logging.log(level, "[%s] %s" % (topic, msg))
107 self.log.log(level, "[%s] %s" % (topic, msg))
98
108
@@ -141,7 +141,9 b' class TaskScheduler(SessionFactory):'
141 self.scheme = globals()[new]
141 self.scheme = globals()[new]
142
142
143 # input arguments:
143 # input arguments:
144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
146 return leastload
145 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
146 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
147 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
@@ -634,7 +636,7 b' class TaskScheduler(SessionFactory):'
634
636
635
637
636 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
637 log_addr=None, loglevel=logging.DEBUG,
639 log_url=None, loglevel=logging.DEBUG,
638 identity=b'task'):
640 identity=b'task'):
639 from zmq.eventloop import ioloop
641 from zmq.eventloop import ioloop
640 from zmq.eventloop.zmqstream import ZMQStream
642 from zmq.eventloop.zmqstream import ZMQStream
@@ -658,10 +660,10 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
658 nots.setsockopt(zmq.SUBSCRIBE, '')
660 nots.setsockopt(zmq.SUBSCRIBE, '')
659 nots.connect(not_addr)
661 nots.connect(not_addr)
660
662
661 # scheme = globals().get(scheme, None)
663 # setup logging. Note that these will not work in-process, because they clobber
662 # setup logging
664 # existing loggers.
663 if log_addr:
665 if log_url:
664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
665 else:
667 else:
666 local_logger(logname, loglevel)
668 local_logger(logname, loglevel)
667
669
General Comments 0
You need to be logged in to leave comments. Login now