From 0a6d54020ae7fe3d624e7db5d279b8d97e4d1abe 2011-06-20 23:40:18 From: MinRK Date: 2011-06-20 23:40:18 Subject: [PATCH] reorganize Factory classes to follow relocation of Session object --- diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index f47512a..ce8c534 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -308,7 +308,7 @@ class IPClusterEngines(BaseParallelApplication): klass = import_item(clsname) launcher = klass( - work_dir=self.profile_dir.location, config=self.config, logname=self.log.name + work_dir=self.profile_dir.location, config=self.config, log=self.log ) return launcher diff --git a/IPython/parallel/apps/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py index bbfc1f0..1f14979 100755 --- a/IPython/parallel/apps/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -69,7 +69,7 @@ class IPLoggerApp(BaseParallelApplication): def init_watcher(self): try: - self.watcher = LogWatcher(config=self.config, logname=self.log.name) + self.watcher = LogWatcher(config=self.config, log=self.log) except: self.log.error("Couldn't start the LogWatcher", exc_info=True) self.exit(1) diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 3930ee5..41179aa 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -49,14 +49,12 @@ except ImportError: from zmq.eventloop import ioloop -# from IPython.config.configurable import Configurable +from IPython.config.configurable import Configurable from IPython.utils.text import EvalFormatter from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError -from IPython.parallel.factory import LoggingFactory - from .win32support import forward_read_events from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob @@ -97,7 +95,7 @@ class UnknownStatus(LauncherError): pass -class BaseLauncher(LoggingFactory): +class BaseLauncher(Configurable): """An asbtraction for starting, stopping and signaling a process.""" # In all of the launchers, the work_dir is where child processes will be @@ -109,6 +107,7 @@ class BaseLauncher(LoggingFactory): # the work_dir option. work_dir = Unicode(u'.') loop = Instance('zmq.eventloop.ioloop.IOLoop') + log = Instance('logging.Logger', ('root',)) start_data = Any() stop_data = Any() @@ -390,7 +389,7 @@ class LocalEngineSetLauncher(BaseLauncher): self.profile_dir = unicode(profile_dir) dlist = [] for i in range(n): - el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) # Copy the engine args over to each engine launcher. el.engine_args = copy.deepcopy(self.engine_args) el.on_stop(self._notice_engine_stopped) @@ -612,7 +611,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): else: user=None for i in range(n): - el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log) # Copy the engine args over to each engine launcher. i diff --git a/IPython/parallel/apps/logwatcher.py b/IPython/parallel/apps/logwatcher.py index e6dcf84..960376f 100644 --- a/IPython/parallel/apps/logwatcher.py +++ b/IPython/parallel/apps/logwatcher.py @@ -19,21 +19,23 @@ import sys import zmq from zmq.eventloop import ioloop, zmqstream +from IPython.config.configurable import Configurable from IPython.utils.traitlets import Int, Unicode, Instance, List -from IPython.parallel.factory import LoggingFactory - #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- -class LogWatcher(LoggingFactory): +class LogWatcher(Configurable): """A simple class that receives messages on a SUB socket, as published by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. This can subscribe to multiple topics, but defaults to all topics. """ + + log = Instance('logging.Logger', ('root',)) + # configurables topics = List([''], config=True, help="The ZMQ topics to subscribe to. Default is to subscribe to all messages") diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index bd69d9a..5d02c02 100644 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -18,8 +18,8 @@ import zmq from zmq.devices import ThreadDevice from zmq.eventloop import ioloop, zmqstream +from IPython.config.configurable import Configurable from IPython.utils.traitlets import Set, Instance, CFloat -from IPython.parallel.factory import LoggingFactory class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -47,7 +47,7 @@ class Heart(object): def start(self): return self.device.start() -class HeartMonitor(LoggingFactory): +class HeartMonitor(Configurable): """A basic HeartMonitor class pingstream: a PUB stream pongstream: an XREP stream @@ -58,6 +58,7 @@ class HeartMonitor(LoggingFactory): ' (in ms) [default: 100]', ) + log = Instance('logging.Logger', ('root',)) pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') loop = Instance('zmq.eventloop.ioloop.IOLoop') diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index d2749a1..8c73d25 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -31,7 +31,9 @@ from IPython.utils.traitlets import ( from IPython.utils.jsonutil import ISO8601, extract_dates from IPython.parallel import error, util -from IPython.parallel.factory import RegistrationFactory, LoggingFactory +from IPython.parallel.factory import RegistrationFactory + +from IPython.zmq.session import SessionFactory from .heartmonitor import HeartMonitor @@ -231,8 +233,10 @@ class HubFactory(RegistrationFactory): hpub.bind(engine_iface % self.hb[0]) hrep = ctx.socket(zmq.XREP) hrep.bind(engine_iface % self.hb[1]) - self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop), - config=self.config) + self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, + pingstream=ZMQStream(hpub,loop), + pongstream=ZMQStream(hrep,loop) + ) ### Client connections ### # Notifier socket @@ -287,10 +291,10 @@ class HubFactory(RegistrationFactory): self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, query=q, notifier=n, resubmit=r, db=self.db, engine_info=self.engine_info, client_info=self.client_info, - logname=self.log.name) + log=self.log) -class Hub(LoggingFactory): +class Hub(SessionFactory): """The IPython Controller Hub with 0MQ connections Parameters @@ -328,7 +332,6 @@ class Hub(LoggingFactory): _idcounter=Int(0) # objects from constructor: - loop=Instance(ioloop.IOLoop) query=Instance(ZMQStream) monitor=Instance(ZMQStream) notifier=Instance(ZMQStream) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 1141934..7c9c7a5 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -612,7 +612,8 @@ class TaskScheduler(SessionFactory): for msg_id in jobs: raw_msg, targets, after, follow, timeout = self.depending[msg_id] - if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed): + if after.unreachable(self.all_completed, self.all_failed)\ + or follow.unreachable(self.all_completed, self.all_failed): self.fail_unreachable(msg_id) elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run @@ -643,9 +644,9 @@ class TaskScheduler(SessionFactory): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', - log_url=None, loglevel=logging.DEBUG, - identity=b'task'): +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, + logname='root', log_url=None, loglevel=logging.DEBUG, + identity=b'task'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream @@ -671,13 +672,13 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname= # setup logging. Note that these will not work in-process, because they clobber # existing loggers. if log_url: - connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) + log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) else: - local_logger(logname, loglevel) + log = local_logger(logname, loglevel) scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, - loop=loop, logname=logname, + loop=loop, log=log, config=config) scheduler.start() try: diff --git a/IPython/parallel/factory.py b/IPython/parallel/factory.py index 4bb53c1..d287ba3 100644 --- a/IPython/parallel/factory.py +++ b/IPython/parallel/factory.py @@ -1,7 +1,7 @@ """Base config factories.""" #----------------------------------------------------------------------------- -# Copyright (C) 2008-2009 The IPython Development Team +# Copyright (C) 2010-2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. @@ -22,39 +22,12 @@ from IPython.config.configurable import Configurable from IPython.utils.traitlets import Int, Instance, Unicode from IPython.parallel.util import select_random_ports -from IPython.zmq.session import Session +from IPython.zmq.session import Session, SessionFactory #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- -class LoggingFactory(Configurable): - """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait.""" - log = Instance('logging.Logger', ('ZMQ', logging.WARN)) - logname = Unicode('ZMQ') - def _logname_changed(self, name, old, new): - self.log = logging.getLogger(new) - -class SessionFactory(LoggingFactory): - """The Base factory from which every factory in IPython.parallel inherits""" - - # not configurable: - context = Instance('zmq.Context') - def _context_default(self): - return zmq.Context.instance() - - session = Instance('IPython.zmq.session.Session') - loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) - def _loop_default(self): - return IOLoop.instance() - - - def __init__(self, **kwargs): - super(SessionFactory, self).__init__(**kwargs) - - # construct the session - self.session = Session(**kwargs) - class RegistrationFactory(SessionFactory): """The Base Configurable for objects that involve registration.""" diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index f777c43..42578f9 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -452,6 +452,7 @@ def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): handler.setLevel(loglevel) logger.addHandler(handler) logger.setLevel(loglevel) + return logger def local_logger(logname, loglevel=logging.DEBUG): loglevel = integer_loglevel(loglevel) @@ -463,4 +464,5 @@ def local_logger(logname, loglevel=logging.DEBUG): handler.setLevel(loglevel) logger.addHandler(handler) logger.setLevel(loglevel) + return logger diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index e3bf3bc..40397f1 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -13,6 +13,7 @@ #----------------------------------------------------------------------------- import hmac +import logging import os import pprint import uuid @@ -27,6 +28,7 @@ except: import zmq from zmq.utils import jsonapi +from zmq.eventloop.ioloop import IOLoop from zmq.eventloop.zmqstream import ZMQStream from IPython.config.configurable import Configurable @@ -73,6 +75,36 @@ DELIM="" # Classes #----------------------------------------------------------------------------- +class SessionFactory(Configurable): + """The Base class for configurables that have a Session, Context, logger, + and IOLoop. + """ + + log = Instance('logging.Logger', ('', logging.WARN)) + + logname = Unicode('') + def _logname_changed(self, name, old, new): + self.log = logging.getLogger(new) + + # not configurable: + context = Instance('zmq.Context') + def _context_default(self): + return zmq.Context.instance() + + session = Instance('IPython.zmq.session.Session') + + loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) + def _loop_default(self): + return IOLoop.instance() + + def __init__(self, **kwargs): + super(SessionFactory, self).__init__(**kwargs) + + if self.session is None: + # construct the session + self.session = Session(**kwargs) + + class Message(object): """A simple message object that maps dict keys to attributes. @@ -143,7 +175,7 @@ class Session(Configurable): else: self.pack = import_item(new) - unpacker = Unicode('json',config=True, + unpacker = Unicode('json', config=True, help="""The name of the unpacker for unserializing messages. Only used with custom functions for `packer`.""") def _unpacker_changed(self, name, old, new): @@ -156,7 +188,7 @@ class Session(Configurable): else: self.unpack = import_item(new) - session = CStr('',config=True, + session = CStr('', config=True, help="""The UUID identifying this session.""") def _session_default(self): return bytes(uuid.uuid4())