##// END OF EJS Templates
re-enable log forwarding and iplogger
MinRK -
Show More
@@ -418,20 +418,25 b' class ClusterApplication(BaseIPythonApplication):'
418 418 self.init_clusterdir()
419 419 if self.config_file:
420 420 self.load_config_file(self.config_file)
421 else:
422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
421 elif self.default_config_file_name:
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
423 427 # command-line should *override* config file, but command-line is necessary
424 428 # to determine clusterdir, etc.
425 429 self.update_config(cl_config)
426 self.reinit_logging()
427
428 430 self.to_work_dir()
431 self.reinit_logging()
429 432
430 433 def to_work_dir(self):
431 434 wd = self.work_dir
432 435 if unicode(wd) != os.getcwdu():
433 436 os.chdir(wd)
434 437 self.log.info("Changing to working dir: %s" % wd)
438 # This is the working dir by now.
439 sys.path.insert(0, '')
435 440
436 441 def load_config_file(self, filename, path=None):
437 442 """Load a .py based config file by filename and path."""
@@ -148,6 +148,7 b' class IPControllerApp(ClusterApplication):'
148 148 config = 'IPControllerApp.config_file',
149 149 # file = 'IPControllerApp.url_file',
150 150 log_level = 'IPControllerApp.log_level',
151 log_url = 'IPControllerApp.log_url',
151 152 reuse_files = 'IPControllerApp.reuse_files',
152 153 secure = 'IPControllerApp.secure',
153 154 ssh = 'IPControllerApp.ssh_server',
@@ -221,8 +222,6 b' class IPControllerApp(ClusterApplication):'
221 222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222 223
223 224 def init_hub(self):
224 # This is the working dir by now.
225 sys.path.insert(0, '')
226 225 c = self.config
227 226
228 227 self.do_import_statements()
@@ -269,7 +268,7 b' class IPControllerApp(ClusterApplication):'
269 268 #
270 269 def init_schedulers(self):
271 270 children = self.children
272 mq = import_item(self.mq_class)
271 mq = import_item(str(self.mq_class))
273 272
274 273 hub = self.factory
275 274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
@@ -321,8 +320,8 b' class IPControllerApp(ClusterApplication):'
321 320 self.log.info("task::using Python %s Task scheduler"%scheme)
322 321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 322 hub.monitor_url, hub.client_info['notification'])
324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
325 config=dict(self.config))
323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
324 log_url = self.log_url, config=dict(self.config))
326 325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 326 q.daemon=True
328 327 children.append(q)
@@ -351,20 +350,23 b' class IPControllerApp(ClusterApplication):'
351 350 except:
352 351 self.log.msg("Error running statement: %s" % s)
353 352
354 # def start_logging(self):
355 # super(IPControllerApp, self).start_logging()
356 # if self.config.Global.log_url:
357 # context = self.factory.context
358 # lsock = context.socket(zmq.PUB)
359 # lsock.connect(self.config.Global.log_url)
360 # handler = PUBHandler(lsock)
361 # handler.root_topic = 'controller'
362 # handler.setLevel(self.log_level)
363 # self.log.addHandler(handler)
353 def forward_logging(self):
354 if self.log_url:
355 self.log.info("Forwarding logging to %s"%self.log_url)
356 context = zmq.Context.instance()
357 lsock = context.socket(zmq.PUB)
358 lsock.connect(self.log_url)
359 handler = PUBHandler(lsock)
360 self.log.removeHandler(self._log_handler)
361 handler.root_topic = 'controller'
362 handler.setLevel(self.log_level)
363 self.log.addHandler(handler)
364 self._log_handler = handler
364 365 # #
365 366
366 367 def initialize(self, argv=None):
367 368 super(IPControllerApp, self).initialize(argv)
369 self.forward_logging()
368 370 self.init_hub()
369 371 self.init_schedulers()
370 372
@@ -25,7 +25,6 b' from zmq.eventloop import ioloop'
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 base_aliases,
29 28 # ClusterDirConfigLoader
30 29 )
31 30 from IPython.zmq.log import EnginePUBHandler
@@ -37,7 +36,7 b' from IPython.parallel.engine.streamkernel import Kernel'
37 36 from IPython.parallel.util import disambiguate_url
38 37
39 38 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Bool, Unicode, Dict, List, CStr
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
41 40
42 41
43 42 #-----------------------------------------------------------------------------
@@ -122,6 +121,9 b' class IPEngineApp(ClusterApplication):'
122 121 )
123 122
124 123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
125 127
126 128 aliases = Dict(dict(
127 129 config = 'IPEngineApp.config_file',
@@ -147,6 +149,7 b' class IPEngineApp(ClusterApplication):'
147 149 mpi = 'MPI.use',
148 150
149 151 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
150 153 ))
151 154
152 155 # def find_key_file(self):
@@ -221,33 +224,17 b' class IPEngineApp(ClusterApplication):'
221 224 self.log.error("Couldn't start the Engine", exc_info=True)
222 225 self.exit(1)
223 226
224 # self.start_logging()
225
226 # Create the service hierarchy
227 # self.main_service = service.MultiService()
228 # self.engine_service.setServiceParent(self.main_service)
229 # self.tub_service = Tub()
230 # self.tub_service.setServiceParent(self.main_service)
231 # # This needs to be called before the connection is initiated
232 # self.main_service.startService()
233
234 # This initiates the connection to the controller and calls
235 # register_engine to tell the controller we are ready to do work
236 # self.engine_connector = EngineConnector(self.tub_service)
237
238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
239
240 # reactor.callWhenRunning(self.call_connect)
241
242 # def start_logging(self):
243 # super(IPEngineApp, self).start_logging()
244 # if self.master_config.Global.log_url:
245 # context = self.engine.context
246 # lsock = context.socket(zmq.PUB)
247 # lsock.connect(self.master_config.Global.log_url)
248 # handler = EnginePUBHandler(self.engine, lsock)
249 # handler.setLevel(self.log_level)
250 # self.log.addHandler(handler)
227 def forward_logging(self):
228 if self.log_url:
229 self.log.info("Forwarding logging to %s"%self.log_url)
230 context = self.engine.context
231 lsock = context.socket(zmq.PUB)
232 lsock.connect(self.log_url)
233 self.log.removeHandler(self._log_handler)
234 handler = EnginePUBHandler(self.engine, lsock)
235 handler.setLevel(self.log_level)
236 self.log.addHandler(handler)
237 self._log_handler = handler
251 238 #
252 239 def init_mpi(self):
253 240 global mpi
@@ -268,6 +255,7 b' class IPEngineApp(ClusterApplication):'
268 255 super(IPEngineApp, self).initialize(argv)
269 256 self.init_mpi()
270 257 self.init_engine()
258 self.forward_logging()
271 259
272 260 def start(self):
273 261 self.engine.start()
@@ -20,9 +20,12 b' import sys'
20 20
21 21 import zmq
22 22
23 from IPython.utils.traitlets import Bool, Dict
24
23 25 from IPython.parallel.apps.clusterdir import (
24 26 ClusterApplication,
25 ClusterDirConfigLoader
27 ClusterDir,
28 base_aliases
26 29 )
27 30 from IPython.parallel.apps.logwatcher import LogWatcher
28 31
@@ -43,79 +46,40 b' usually located in your ipython directory and named as "cluster_<profile>".'
43 46 See the --profile and --cluster-dir options for details.
44 47 """
45 48
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71 49
72 50 #-----------------------------------------------------------------------------
73 51 # Main application
74 52 #-----------------------------------------------------------------------------
75
53 aliases = {}
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
76 56
77 57 class IPLoggerApp(ClusterApplication):
78 58
79 59 name = u'iploggerz'
80 60 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 61 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
84
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
87
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
91
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
62 auto_create_cluster_dir = Bool(False)
96 63
97 def post_load_command_line_config(self):
98 pass
64 classes = [LogWatcher, ClusterDir]
65 aliases = Dict(aliases)
99 66
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
67 def initialize(self, argv=None):
68 super(IPLoggerApp, self).initialize(argv)
69 self.init_watcher()
108 70
71 def init_watcher(self):
109 72 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
111 74 except:
112 75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 76 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
114 78
115 79
116 def start_app(self):
117 try:
80 def start(self):
118 81 self.watcher.start()
82 try:
119 83 self.watcher.loop.start()
120 84 except KeyboardInterrupt:
121 85 self.log.critical("Logging Interrupted, shutting down...\n")
@@ -124,6 +88,7 b' class IPLoggerApp(ClusterApplication):'
124 88 def launch_new_instance():
125 89 """Create and run the IPython LogWatcher"""
126 90 app = IPLoggerApp()
91 app.initialize()
127 92 app.start()
128 93
129 94
@@ -35,13 +35,19 b' class LogWatcher(LoggingFactory):'
35 35 This can subscribe to multiple topics, but defaults to all topics.
36 36 """
37 37 # configurables
38 topics = List([''], config=True)
39 url = Unicode('tcp://127.0.0.1:20202', config=True)
38 topics = List([''], config=True,
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
40 url = Unicode('tcp://127.0.0.1:20202', config=True,
41 help="ZMQ url on which to listen for log messages")
40 42
41 43 # internals
42 context = Instance(zmq.Context, (), {})
43 44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 loop = Instance('zmq.eventloop.ioloop.IOLoop')
45
46 context = Instance(zmq.Context)
47 def _context_default(self):
48 return zmq.Context.instance()
49
50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
45 51 def _loop_default(self):
46 52 return ioloop.IOLoop.instance()
47 53
@@ -62,8 +68,12 b' class LogWatcher(LoggingFactory):'
62 68 def subscribe(self):
63 69 """Update our SUB socket's subscriptions."""
64 70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
71 if '' in self.topics:
72 self.log.debug("Subscribing to: everything")
73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
74 else:
65 75 for topic in self.topics:
66 self.log.debug("Subscribing to: %r"%topic)
76 self.log.debug("Subscribing to: %r"%(topic))
67 77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
68 78
69 79 def _extract_level(self, topic_str):
@@ -94,5 +104,5 b' class LogWatcher(LoggingFactory):'
94 104 level,topic = self._extract_level(topic)
95 105 if msg[-1] == '\n':
96 106 msg = msg[:-1]
97 logging.log(level, "[%s] %s" % (topic, msg))
107 self.log.log(level, "[%s] %s" % (topic, msg))
98 108
@@ -141,7 +141,9 b' class TaskScheduler(SessionFactory):'
141 141 self.scheme = globals()[new]
142 142
143 143 # input arguments:
144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
146 return leastload
145 147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
146 148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
147 149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
@@ -634,7 +636,7 b' class TaskScheduler(SessionFactory):'
634 636
635 637
636 638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
637 log_addr=None, loglevel=logging.DEBUG,
639 log_url=None, loglevel=logging.DEBUG,
638 640 identity=b'task'):
639 641 from zmq.eventloop import ioloop
640 642 from zmq.eventloop.zmqstream import ZMQStream
@@ -658,10 +660,10 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
658 660 nots.setsockopt(zmq.SUBSCRIBE, '')
659 661 nots.connect(not_addr)
660 662
661 # scheme = globals().get(scheme, None)
662 # setup logging
663 if log_addr:
664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
663 # setup logging. Note that these will not work in-process, because they clobber
664 # existing loggers.
665 if log_url:
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
665 667 else:
666 668 local_logger(logname, loglevel)
667 669
General Comments 0
You need to be logged in to leave comments. Login now