diff --git a/IPython/parallel/apps/clusterdir.py b/IPython/parallel/apps/clusterdir.py index c82a47a..edf4375 100755 --- a/IPython/parallel/apps/clusterdir.py +++ b/IPython/parallel/apps/clusterdir.py @@ -418,20 +418,25 @@ class ClusterApplication(BaseIPythonApplication): self.init_clusterdir() if self.config_file: self.load_config_file(self.config_file) - else: - self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location) + elif self.default_config_file_name: + try: + self.load_config_file(self.default_config_file_name, + path=self.cluster_dir.location) + except IOError: + self.log.warn("Warning: Default config file not found") # command-line should *override* config file, but command-line is necessary # to determine clusterdir, etc. self.update_config(cl_config) - self.reinit_logging() - self.to_work_dir() + self.reinit_logging() def to_work_dir(self): wd = self.work_dir if unicode(wd) != os.getcwdu(): os.chdir(wd) self.log.info("Changing to working dir: %s" % wd) + # This is the working dir by now. + sys.path.insert(0, '') def load_config_file(self, filename, path=None): """Load a .py based config file by filename and path.""" diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 77cc47a..25d80cb 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -148,6 +148,7 @@ class IPControllerApp(ClusterApplication): config = 'IPControllerApp.config_file', # file = 'IPControllerApp.url_file', log_level = 'IPControllerApp.log_level', + log_url = 'IPControllerApp.log_url', reuse_files = 'IPControllerApp.reuse_files', secure = 'IPControllerApp.secure', ssh = 'IPControllerApp.ssh_server', @@ -221,8 +222,6 @@ class IPControllerApp(ClusterApplication): assert int(ports) == c.HubFactory.regport, "regport mismatch" def init_hub(self): - # This is the working dir by now. - sys.path.insert(0, '') c = self.config self.do_import_statements() @@ -269,7 +268,7 @@ class IPControllerApp(ClusterApplication): # def init_schedulers(self): children = self.children - mq = import_item(self.mq_class) + mq = import_item(str(self.mq_class)) hub = self.factory # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url @@ -321,8 +320,8 @@ class IPControllerApp(ClusterApplication): self.log.info("task::using Python %s Task scheduler"%scheme) sargs = (hub.client_info['task'][1], hub.engine_info['task'], hub.monitor_url, hub.client_info['notification']) - kwargs = dict(logname=self.log.name, loglevel=self.log_level, - config=dict(self.config)) + kwargs = dict(logname='scheduler', loglevel=self.log_level, + log_url = self.log_url, config=dict(self.config)) q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) q.daemon=True children.append(q) @@ -351,20 +350,23 @@ class IPControllerApp(ClusterApplication): except: self.log.msg("Error running statement: %s" % s) - # def start_logging(self): - # super(IPControllerApp, self).start_logging() - # if self.config.Global.log_url: - # context = self.factory.context - # lsock = context.socket(zmq.PUB) - # lsock.connect(self.config.Global.log_url) - # handler = PUBHandler(lsock) - # handler.root_topic = 'controller' - # handler.setLevel(self.log_level) - # self.log.addHandler(handler) + def forward_logging(self): + if self.log_url: + self.log.info("Forwarding logging to %s"%self.log_url) + context = zmq.Context.instance() + lsock = context.socket(zmq.PUB) + lsock.connect(self.log_url) + handler = PUBHandler(lsock) + self.log.removeHandler(self._log_handler) + handler.root_topic = 'controller' + handler.setLevel(self.log_level) + self.log.addHandler(handler) + self._log_handler = handler # # def initialize(self, argv=None): super(IPControllerApp, self).initialize(argv) + self.forward_logging() self.init_hub() self.init_schedulers() diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index a545d42..c6afee1 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -25,7 +25,6 @@ from zmq.eventloop import ioloop from IPython.parallel.apps.clusterdir import ( ClusterApplication, ClusterDir, - base_aliases, # ClusterDirConfigLoader ) from IPython.zmq.log import EnginePUBHandler @@ -37,7 +36,7 @@ from IPython.parallel.engine.streamkernel import Kernel from IPython.parallel.util import disambiguate_url from IPython.utils.importstring import import_item -from IPython.utils.traitlets import Bool, Unicode, Dict, List, CStr +from IPython.utils.traitlets import Bool, Unicode, Dict, List #----------------------------------------------------------------------------- @@ -122,6 +121,9 @@ class IPEngineApp(ClusterApplication): ) url_file_name = Unicode(u'ipcontroller-engine.json') + log_url = Unicode('', config=True, + help="""The URL for the iploggerapp instance, for forwarding + logging to a central location.""") aliases = Dict(dict( config = 'IPEngineApp.config_file', @@ -147,6 +149,7 @@ class IPEngineApp(ClusterApplication): mpi = 'MPI.use', log_level = 'IPEngineApp.log_level', + log_url = 'IPEngineApp.log_url' )) # def find_key_file(self): @@ -221,33 +224,17 @@ class IPEngineApp(ClusterApplication): self.log.error("Couldn't start the Engine", exc_info=True) self.exit(1) - # self.start_logging() - - # Create the service hierarchy - # self.main_service = service.MultiService() - # self.engine_service.setServiceParent(self.main_service) - # self.tub_service = Tub() - # self.tub_service.setServiceParent(self.main_service) - # # This needs to be called before the connection is initiated - # self.main_service.startService() - - # This initiates the connection to the controller and calls - # register_engine to tell the controller we are ready to do work - # self.engine_connector = EngineConnector(self.tub_service) - - # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file) - - # reactor.callWhenRunning(self.call_connect) - - # def start_logging(self): - # super(IPEngineApp, self).start_logging() - # if self.master_config.Global.log_url: - # context = self.engine.context - # lsock = context.socket(zmq.PUB) - # lsock.connect(self.master_config.Global.log_url) - # handler = EnginePUBHandler(self.engine, lsock) - # handler.setLevel(self.log_level) - # self.log.addHandler(handler) + def forward_logging(self): + if self.log_url: + self.log.info("Forwarding logging to %s"%self.log_url) + context = self.engine.context + lsock = context.socket(zmq.PUB) + lsock.connect(self.log_url) + self.log.removeHandler(self._log_handler) + handler = EnginePUBHandler(self.engine, lsock) + handler.setLevel(self.log_level) + self.log.addHandler(handler) + self._log_handler = handler # def init_mpi(self): global mpi @@ -268,6 +255,7 @@ class IPEngineApp(ClusterApplication): super(IPEngineApp, self).initialize(argv) self.init_mpi() self.init_engine() + self.forward_logging() def start(self): self.engine.start() diff --git a/IPython/parallel/apps/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py index 6be8729..c592781 100755 --- a/IPython/parallel/apps/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -20,9 +20,12 @@ import sys import zmq +from IPython.utils.traitlets import Bool, Dict + from IPython.parallel.apps.clusterdir import ( ClusterApplication, - ClusterDirConfigLoader + ClusterDir, + base_aliases ) from IPython.parallel.apps.logwatcher import LogWatcher @@ -43,79 +46,40 @@ usually located in your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ -#----------------------------------------------------------------------------- -# Command line options -#----------------------------------------------------------------------------- - - -class IPLoggerAppConfigLoader(ClusterDirConfigLoader): - - def _add_arguments(self): - super(IPLoggerAppConfigLoader, self)._add_arguments() - paa = self.parser.add_argument - # Controller config - paa('--url', - type=str, dest='LogWatcher.url', - help='The url the LogWatcher will listen on', - ) - # MPI - paa('--topics', - type=str, dest='LogWatcher.topics', nargs='+', - help='What topics to subscribe to', - metavar='topics') - # Global config - paa('--log-to-file', - action='store_true', dest='Global.log_to_file', - help='Log to a file in the log directory (default is stdout)') - #----------------------------------------------------------------------------- # Main application #----------------------------------------------------------------------------- - +aliases = {} +aliases.update(base_aliases) +aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics')) class IPLoggerApp(ClusterApplication): name = u'iploggerz' description = _description - command_line_loader = IPLoggerAppConfigLoader default_config_file_name = default_config_file_name - auto_create_cluster_dir = True - - def create_default_config(self): - super(IPLoggerApp, self).create_default_config() - - # The engine should not clean logs as we don't want to remove the - # active log files of other running engines. - self.default_config.Global.clean_logs = False - - # If given, this is the actual location of the logger's URL file. - # If not, this is computed using the profile, app_dir and furl_file_name - self.default_config.Global.url_file_name = u'iplogger.url' - self.default_config.Global.url_file = u'' - - def post_load_command_line_config(self): - pass - - def pre_construct(self): - super(IPLoggerApp, self).pre_construct() - - def construct(self): - # This is the working dir by now. - sys.path.insert(0, '') - - self.start_logging() - + auto_create_cluster_dir = Bool(False) + + classes = [LogWatcher, ClusterDir] + aliases = Dict(aliases) + + def initialize(self, argv=None): + super(IPLoggerApp, self).initialize(argv) + self.init_watcher() + + def init_watcher(self): try: - self.watcher = LogWatcher(config=self.master_config, logname=self.log.name) + self.watcher = LogWatcher(config=self.config, logname=self.log.name) except: self.log.error("Couldn't start the LogWatcher", exc_info=True) self.exit(1) + self.log.info("Listening for log messages on %r"%self.watcher.url) - def start_app(self): + def start(self): + self.watcher.start() try: - self.watcher.start() self.watcher.loop.start() except KeyboardInterrupt: self.log.critical("Logging Interrupted, shutting down...\n") @@ -124,6 +88,7 @@ class IPLoggerApp(ClusterApplication): def launch_new_instance(): """Create and run the IPython LogWatcher""" app = IPLoggerApp() + app.initialize() app.start() diff --git a/IPython/parallel/apps/logwatcher.py b/IPython/parallel/apps/logwatcher.py index 32dbf7d..e6dcf84 100644 --- a/IPython/parallel/apps/logwatcher.py +++ b/IPython/parallel/apps/logwatcher.py @@ -35,13 +35,19 @@ class LogWatcher(LoggingFactory): This can subscribe to multiple topics, but defaults to all topics. """ # configurables - topics = List([''], config=True) - url = Unicode('tcp://127.0.0.1:20202', config=True) + topics = List([''], config=True, + help="The ZMQ topics to subscribe to. Default is to subscribe to all messages") + url = Unicode('tcp://127.0.0.1:20202', config=True, + help="ZMQ url on which to listen for log messages") # internals - context = Instance(zmq.Context, (), {}) stream = Instance('zmq.eventloop.zmqstream.ZMQStream') - loop = Instance('zmq.eventloop.ioloop.IOLoop') + + context = Instance(zmq.Context) + def _context_default(self): + return zmq.Context.instance() + + loop = Instance(zmq.eventloop.ioloop.IOLoop) def _loop_default(self): return ioloop.IOLoop.instance() @@ -62,9 +68,13 @@ class LogWatcher(LoggingFactory): def subscribe(self): """Update our SUB socket's subscriptions.""" self.stream.setsockopt(zmq.UNSUBSCRIBE, '') - for topic in self.topics: - self.log.debug("Subscribing to: %r"%topic) - self.stream.setsockopt(zmq.SUBSCRIBE, topic) + if '' in self.topics: + self.log.debug("Subscribing to: everything") + self.stream.setsockopt(zmq.SUBSCRIBE, '') + else: + for topic in self.topics: + self.log.debug("Subscribing to: %r"%(topic)) + self.stream.setsockopt(zmq.SUBSCRIBE, topic) def _extract_level(self, topic_str): """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" @@ -94,5 +104,5 @@ class LogWatcher(LoggingFactory): level,topic = self._extract_level(topic) if msg[-1] == '\n': msg = msg[:-1] - logging.log(level, "[%s] %s" % (topic, msg)) + self.log.log(level, "[%s] %s" % (topic, msg)) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 5f0a615..6fc6a8c 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -141,7 +141,9 @@ class TaskScheduler(SessionFactory): self.scheme = globals()[new] # input arguments: - scheme = Instance(FunctionType, default=leastload) # function for determining the destination + scheme = Instance(FunctionType) # function for determining the destination + def _scheme_default(self): + return leastload client_stream = Instance(zmqstream.ZMQStream) # client-facing stream engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream @@ -634,7 +636,7 @@ class TaskScheduler(SessionFactory): def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', - log_addr=None, loglevel=logging.DEBUG, + log_url=None, loglevel=logging.DEBUG, identity=b'task'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream @@ -658,10 +660,10 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname= nots.setsockopt(zmq.SUBSCRIBE, '') nots.connect(not_addr) - # scheme = globals().get(scheme, None) - # setup logging - if log_addr: - connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel) + # setup logging. Note that these will not work in-process, because they clobber + # existing loggers. + if log_url: + connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) else: local_logger(logname, loglevel)