diff --git a/IPython/parallel/apps/clusterdir.py b/IPython/parallel/apps/clusterdir.py index 5c5a8ec..ddfc121 100755 --- a/IPython/parallel/apps/clusterdir.py +++ b/IPython/parallel/apps/clusterdir.py @@ -20,243 +20,31 @@ from __future__ import with_statement import os import logging import re -import shutil import sys from subprocess import Popen, PIPE -from IPython.config.loader import PyFileConfigLoader, Config -from IPython.config.configurable import Configurable -from IPython.config.application import Application -from IPython.core.crashhandler import CrashHandler -from IPython.core.newapplication import BaseIPythonApplication +from IPython.config.loader import Config from IPython.core import release -from IPython.utils.path import ( - get_ipython_package_dir, - get_ipython_dir, - expand_path +from IPython.core.crashhandler import CrashHandler +from IPython.core.newapplication import ( + BaseIPythonApplication, + base_aliases as base_ip_aliases, + base_flags as base_ip_flags ) +from IPython.utils.path import expand_path + from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List #----------------------------------------------------------------------------- # Module errors #----------------------------------------------------------------------------- -class ClusterDirError(Exception): - pass - - class PIDFileError(Exception): pass #----------------------------------------------------------------------------- -# Class for managing cluster directories -#----------------------------------------------------------------------------- - -class ClusterDir(Configurable): - """An object to manage the cluster directory and its resources. - - The cluster directory is used by :command:`ipengine`, - :command:`ipcontroller` and :command:`ipclsuter` to manage the - configuration, logging and security of these applications. - - This object knows how to find, create and manage these directories. This - should be used by any code that want's to handle cluster directories. - """ - - security_dir_name = Unicode('security') - log_dir_name = Unicode('log') - pid_dir_name = Unicode('pid') - security_dir = Unicode(u'') - log_dir = Unicode(u'') - pid_dir = Unicode(u'') - - auto_create = Bool(False, - help="""Whether to automatically create the ClusterDirectory if it does - not exist""") - overwrite = Bool(False, - help="""Whether to overwrite existing config files""") - location = Unicode(u'', config=True, - help="""Set the cluster dir. This overrides the logic used by the - `profile` option.""", - ) - profile = Unicode(u'default', config=True, - help="""The string name of the profile to be used. This determines the name - of the cluster dir as: cluster_. The default profile is named - 'default'. The cluster directory is resolve this way if the - `cluster_dir` option is not used.""" - ) - - _location_isset = Bool(False) # flag for detecting multiply set location - _new_dir = Bool(False) # flag for whether a new dir was created - - def __init__(self, **kwargs): - # make sure auto_create,overwrite are set *before* location - for name in ('auto_create', 'overwrite'): - v = kwargs.pop(name, None) - if v is not None: - setattr(self, name, v) - super(ClusterDir, self).__init__(**kwargs) - if not self.location: - self._profile_changed('profile', 'default', self.profile) - - def _location_changed(self, name, old, new): - if self._location_isset: - raise RuntimeError("Cannot set ClusterDir more than once.") - self._location_isset = True - if not os.path.isdir(new): - if self.auto_create:# or self.config.ClusterDir.auto_create: - os.makedirs(new) - self._new_dir = True - else: - raise ClusterDirError('Directory not found: %s' % new) - - # ensure config files exist: - self.copy_all_config_files(overwrite=self.overwrite) - self.security_dir = os.path.join(new, self.security_dir_name) - self.log_dir = os.path.join(new, self.log_dir_name) - self.pid_dir = os.path.join(new, self.pid_dir_name) - self.check_dirs() - - def _profile_changed(self, name, old, new): - if self._location_isset: - raise RuntimeError("ClusterDir already set. Cannot set by profile.") - self.location = os.path.join(get_ipython_dir(), 'cluster_'+new) - - def _log_dir_changed(self, name, old, new): - self.check_log_dir() - - def check_log_dir(self): - if not os.path.isdir(self.log_dir): - os.mkdir(self.log_dir) - - def _security_dir_changed(self, name, old, new): - self.check_security_dir() - - def check_security_dir(self): - if not os.path.isdir(self.security_dir): - os.mkdir(self.security_dir, 0700) - os.chmod(self.security_dir, 0700) - - def _pid_dir_changed(self, name, old, new): - self.check_pid_dir() - - def check_pid_dir(self): - if not os.path.isdir(self.pid_dir): - os.mkdir(self.pid_dir, 0700) - os.chmod(self.pid_dir, 0700) - - def check_dirs(self): - self.check_security_dir() - self.check_log_dir() - self.check_pid_dir() - - def copy_config_file(self, config_file, path=None, overwrite=False): - """Copy a default config file into the active cluster directory. - - Default configuration files are kept in :mod:`IPython.config.default`. - This function moves these from that location to the working cluster - directory. - """ - if path is None: - import IPython.config.default - path = IPython.config.default.__file__.split(os.path.sep)[:-1] - path = os.path.sep.join(path) - src = os.path.join(path, config_file) - dst = os.path.join(self.location, config_file) - if not os.path.isfile(dst) or overwrite: - shutil.copy(src, dst) - - def copy_all_config_files(self, path=None, overwrite=False): - """Copy all config files into the active cluster directory.""" - for f in [u'ipcontroller_config.py', u'ipengine_config.py', - u'ipcluster_config.py']: - self.copy_config_file(f, path=path, overwrite=overwrite) - - @classmethod - def create_cluster_dir(csl, cluster_dir): - """Create a new cluster directory given a full path. - - Parameters - ---------- - cluster_dir : str - The full path to the cluster directory. If it does exist, it will - be used. If not, it will be created. - """ - return ClusterDir(location=cluster_dir) - - @classmethod - def create_cluster_dir_by_profile(cls, path, profile=u'default'): - """Create a cluster dir by profile name and path. - - Parameters - ---------- - path : str - The path (directory) to put the cluster directory in. - profile : str - The name of the profile. The name of the cluster directory will - be "cluster_". - """ - if not os.path.isdir(path): - raise ClusterDirError('Directory not found: %s' % path) - cluster_dir = os.path.join(path, u'cluster_' + profile) - return ClusterDir(location=cluster_dir) - - @classmethod - def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'): - """Find an existing cluster dir by profile name, return its ClusterDir. - - This searches through a sequence of paths for a cluster dir. If it - is not found, a :class:`ClusterDirError` exception will be raised. - - The search path algorithm is: - 1. ``os.getcwd()`` - 2. ``ipython_dir`` - 3. The directories found in the ":" separated - :env:`IPCLUSTER_DIR_PATH` environment variable. - - Parameters - ---------- - ipython_dir : unicode or str - The IPython directory to use. - profile : unicode or str - The name of the profile. The name of the cluster directory - will be "cluster_". - """ - dirname = u'cluster_' + profile - cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') - if cluster_dir_paths: - cluster_dir_paths = cluster_dir_paths.split(':') - else: - cluster_dir_paths = [] - paths = [os.getcwd(), ipython_dir] + cluster_dir_paths - for p in paths: - cluster_dir = os.path.join(p, dirname) - if os.path.isdir(cluster_dir): - return ClusterDir(location=cluster_dir) - else: - raise ClusterDirError('Cluster directory not found in paths: %s' % dirname) - - @classmethod - def find_cluster_dir(cls, cluster_dir): - """Find/create a cluster dir and return its ClusterDir. - - This will create the cluster directory if it doesn't exist. - - Parameters - ---------- - cluster_dir : unicode or str - The path of the cluster directory. This is expanded using - :func:`IPython.utils.genutils.expand_path`. - """ - cluster_dir = expand_path(cluster_dir) - if not os.path.isdir(cluster_dir): - raise ClusterDirError('Cluster directory not found: %s' % cluster_dir) - return ClusterDir(location=cluster_dir) - - -#----------------------------------------------------------------------------- # Crash handler for this application #----------------------------------------------------------------------------- @@ -283,7 +71,7 @@ To ensure accurate tracking of this issue, please file a report about it at: $self.bug_tracker """ -class ClusterDirCrashHandler(CrashHandler): +class ParallelCrashHandler(CrashHandler): """sys.excepthook for IPython itself, leaves a detailed report on disk.""" message_template = _message_template @@ -292,7 +80,7 @@ class ClusterDirCrashHandler(CrashHandler): contact_name = release.authors['Min'][0] contact_email = release.authors['Min'][1] bug_tracker = 'http://github.com/ipython/ipython/issues' - super(ClusterDirCrashHandler,self).__init__( + super(ParallelCrashHandler,self).__init__( app, contact_name, contact_email, bug_tracker ) @@ -300,49 +88,35 @@ class ClusterDirCrashHandler(CrashHandler): #----------------------------------------------------------------------------- # Main application #----------------------------------------------------------------------------- -base_aliases = { - 'profile' : "ClusterDir.profile", - 'cluster_dir' : 'ClusterDir.location', - 'log_level' : 'ClusterApplication.log_level', - 'work_dir' : 'ClusterApplication.work_dir', - 'log_to_file' : 'ClusterApplication.log_to_file', - 'clean_logs' : 'ClusterApplication.clean_logs', - 'log_url' : 'ClusterApplication.log_url', - 'config' : 'ClusterApplication.config_file', -} +base_aliases = {} +base_aliases.update(base_ip_aliases) +base_aliases.update({ + 'profile_dir' : 'ProfileDir.location', + 'log_level' : 'BaseParallelApplication.log_level', + 'work_dir' : 'BaseParallelApplication.work_dir', + 'log_to_file' : 'BaseParallelApplication.log_to_file', + 'clean_logs' : 'BaseParallelApplication.clean_logs', + 'log_url' : 'BaseParallelApplication.log_url', +}) base_flags = { - 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"), - 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"), - 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"), + 'log-to-file' : ({'BaseParallelApplication' : Config({ + 'log_to_file' : True}), + }, "send log output to a file") } -for k,v in base_flags.iteritems(): - base_flags[k] = (Config(v[0]),v[1]) - -class ClusterApplication(BaseIPythonApplication): - """An application that puts everything into a cluster directory. - - Instead of looking for things in the ipython_dir, this type of application - will use its own private directory called the "cluster directory" - for things like config files, log files, etc. - - The cluster directory is resolved as follows: +base_flags.update(base_ip_flags) - * If the ``cluster_dir`` option is given, it is used. - * If ``cluster_dir`` is not given, the application directory is - resolve using the profile name as ``cluster_``. The search - path for this directory is then i) cwd if it is found there - and ii) in ipython_dir otherwise. - - The config file for the application is to be put in the cluster - dir and named the value of the ``config_file_name`` class attribute. +class BaseParallelApplication(BaseIPythonApplication): + """The base Application for IPython.parallel apps + + Principle extensions to BaseIPyythonApplication: + + * work_dir + * remote logging via pyzmq + * IOLoop instance """ - crash_handler_class = ClusterDirCrashHandler - auto_create_cluster_dir = Bool(True, config=True, - help="whether to create the cluster_dir if it doesn't exist") - cluster_dir = Instance(ClusterDir) - classes = [ClusterDir] + crash_handler_class = ParallelCrashHandler def _log_level_default(self): # temporarily override default_log_level to INFO @@ -363,21 +137,8 @@ class ClusterApplication(BaseIPythonApplication): log_url = Unicode('', config=True, help="The ZMQ URL of the iplogger to aggregate logging.") - config_file = Unicode(u'', config=True, - help="""Path to ip configuration file. The default is to use - _config.py, as found by cluster-dir.""" - ) - def _config_file_paths_default(self): - # don't include profile dir - return [ os.getcwdu(), self.ipython_dir ] - - def _config_file_changed(self, name, old, new): - if os.pathsep in new: - path, new = new.rsplit(os.pathsep) - self.config_file_paths.insert(0, path) - self.config_file_name = new - - config_file_name = Unicode('') + def _config_files_default(self): + return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py'] loop = Instance('zmq.eventloop.ioloop.IOLoop') def _loop_default(self): @@ -386,54 +147,10 @@ class ClusterApplication(BaseIPythonApplication): aliases = Dict(base_aliases) flags = Dict(base_flags) - - def init_clusterdir(self): - """This resolves the cluster directory. - - This tries to find the cluster directory and if successful, it will - have done: - * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for - the application. - * Sets ``self.cluster_dir`` attribute of the application and config - objects. - - The algorithm used for this is as follows: - 1. Try ``Global.cluster_dir``. - 2. Try using ``Global.profile``. - 3. If both of these fail and ``self.auto_create_cluster_dir`` is - ``True``, then create the new cluster dir in the IPython directory. - 4. If all fails, then raise :class:`ClusterDirError`. - """ - try: - self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config) - except ClusterDirError as e: - self.log.fatal("Error initializing cluster dir: %s"%e) - self.log.fatal("A cluster dir must be created before running this command.") - self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more " - "information about creating and listing cluster dirs." - ) - self.exit(1) - - if self.cluster_dir._new_dir: - self.log.info('Creating new cluster dir: %s' % \ - self.cluster_dir.location) - else: - self.log.info('Using existing cluster dir: %s' % \ - self.cluster_dir.location) - - # insert after cwd: - self.config_file_paths.insert(1, self.cluster_dir.location) def initialize(self, argv=None): """initialize the app""" - self.init_crash_handler() - self.parse_command_line(argv) - cl_config = self.config - self.init_clusterdir() - self.load_config_file() - # command-line should *override* config file, but command-line is necessary - # to determine clusterdir, etc. - self.update_config(cl_config) + super(BaseParallelApplication, self).initialize(argv) self.to_work_dir() self.reinit_logging() @@ -447,7 +164,7 @@ class ClusterApplication(BaseIPythonApplication): def reinit_logging(self): # Remove old log files - log_dir = self.cluster_dir.log_dir + log_dir = self.profile_dir.log_dir if self.clean_logs: for f in os.listdir(log_dir): if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): @@ -472,7 +189,7 @@ class ClusterApplication(BaseIPythonApplication): This must be called after pre_construct, which sets `self.pid_dir`. This raises :exc:`PIDFileError` if the pid file exists already. """ - pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): pid = self.get_pid_from_file() if not overwrite: @@ -491,7 +208,7 @@ class ClusterApplication(BaseIPythonApplication): :func:`reactor.addSystemEventTrigger`. This needs to return ``None``. """ - pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): try: self.log.info("Removing pid file: %s" % pid_file) @@ -504,7 +221,7 @@ class ClusterApplication(BaseIPythonApplication): If the pid file doesn't exist a :exc:`PIDFileError` is raised. """ - pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): with open(pid_file, 'r') as f: pid = int(f.read().strip()) diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index 8ef7221..668ea4d 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -27,12 +27,12 @@ from zmq.eventloop import ioloop from IPython.config.application import Application, boolean_flag from IPython.config.loader import Config -from IPython.core.newapplication import BaseIPythonApplication +from IPython.core.newapplication import BaseIPythonApplication, ProfileDir from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List from IPython.parallel.apps.clusterdir import ( - ClusterApplication, ClusterDirError, ClusterDir, + BaseParallelApplication, PIDFileError, base_flags, base_aliases ) @@ -86,7 +86,7 @@ security related files and are named using the convention subcommand of 'ipcluster'. If your cluster directory is in the cwd or the ipython directory, you can simply refer to it using its profile name, 'ipcluster start n=4 profile=`, -otherwise use the 'cluster_dir' option. +otherwise use the 'profile_dir' option. """ stop_help = """Stop a running IPython cluster @@ -95,7 +95,7 @@ directory. Cluster directories are named using the convention 'cluster_'. If your cluster directory is in the cwd or the ipython directory, you can simply refer to it using its profile name, 'ipcluster stop profile=`, otherwise -use the 'cluster_dir' option. +use the 'profile_dir' option. """ engines_help = """Start engines connected to an existing IPython cluster @@ -107,7 +107,7 @@ security related files and are named using the convention subcommand of 'ipcluster'. If your cluster directory is in the cwd or the ipython directory, you can simply refer to it using its profile name, 'ipcluster engines n=4 profile=`, -otherwise use the 'cluster_dir' option. +otherwise use the 'profile_dir' option. """ create_help = """Create an ipcluster profile by name @@ -142,74 +142,63 @@ class IPClusterList(BaseIPythonApplication): def _log_level_default(self): return 20 - def list_cluster_dirs(self): + def list_profile_dirs(self): # Find the search paths - cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') - if cluster_dir_paths: - cluster_dir_paths = cluster_dir_paths.split(':') + profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','') + if profile_dir_paths: + profile_dir_paths = profile_dir_paths.split(':') else: - cluster_dir_paths = [] + profile_dir_paths = [] ipython_dir = self.ipython_dir - paths = [os.getcwd(), ipython_dir] + cluster_dir_paths + paths = [os.getcwd(), ipython_dir] + profile_dir_paths paths = list(set(paths)) - self.log.info('Searching for cluster dirs in paths: %r' % paths) + self.log.info('Searching for cluster profiles in paths: %r' % paths) for path in paths: files = os.listdir(path) for f in files: full_path = os.path.join(path, f) - if os.path.isdir(full_path) and f.startswith('cluster_'): - profile = full_path.split('_')[-1] + if os.path.isdir(full_path) and f.startswith('profile_') and \ + os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')): + profile = f.split('_')[-1] start_cmd = 'ipcluster start profile=%s n=4' % profile print start_cmd + " ==> " + full_path def start(self): - self.list_cluster_dirs() + self.list_profile_dirs() + + +# `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists create_flags = {} create_flags.update(base_flags) -create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset', +create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite', "reset config files to defaults", "leave existing config files")) -class IPClusterCreate(ClusterApplication): - name = u'ipcluster' +class IPClusterCreate(BaseParallelApplication): + name = u'ipcluster-create' description = create_help - auto_create_cluster_dir = Bool(True, - help="whether to create the cluster_dir if it doesn't exist") + auto_create = Bool(True) config_file_name = Unicode(default_config_file_name) - reset = Bool(False, config=True, - help="Whether to reset config files as part of 'create'." - ) - flags = Dict(create_flags) - aliases = Dict(dict(profile='ClusterDir.profile')) + aliases = Dict(dict(profile='BaseIPythonApplication.profile')) - classes = [ClusterDir] + classes = [ProfileDir] - def init_clusterdir(self): - super(IPClusterCreate, self).init_clusterdir() - self.log.info('Copying default config files to cluster directory ' - '[overwrite=%r]' % (self.reset,)) - self.cluster_dir.copy_all_config_files(overwrite=self.reset) - - def initialize(self, argv=None): - self.parse_command_line(argv) - self.init_clusterdir() stop_aliases = dict( signal='IPClusterStop.signal', - profile='ClusterDir.profile', - cluster_dir='ClusterDir.location', + profile='BaseIPythonApplication.profile', + profile_dir='ProfileDir.location', ) -class IPClusterStop(ClusterApplication): +class IPClusterStop(BaseParallelApplication): name = u'ipcluster' description = stop_help - auto_create_cluster_dir = Bool(False) config_file_name = Unicode(default_config_file_name) signal = Int(signal.SIGINT, config=True, @@ -217,13 +206,6 @@ class IPClusterStop(ClusterApplication): aliases = Dict(stop_aliases) - def init_clusterdir(self): - try: - super(IPClusterStop, self).init_clusterdir() - except ClusterDirError as e: - self.log.fatal("Failed ClusterDir init: %s"%e) - self.exit(1) - def start(self): """Start the app for the stop subcommand.""" try: @@ -272,20 +254,19 @@ engine_aliases.update(dict( n='IPClusterEngines.n', elauncher = 'IPClusterEngines.engine_launcher_class', )) -class IPClusterEngines(ClusterApplication): +class IPClusterEngines(BaseParallelApplication): name = u'ipcluster' description = engines_help usage = None config_file_name = Unicode(default_config_file_name) default_log_level = logging.INFO - auto_create_cluster_dir = Bool(False) classes = List() def _classes_default(self): from IPython.parallel.apps import launcher launchers = launcher.all_launchers eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__] - return [ClusterDir]+eslaunchers + return [ProfileDir]+eslaunchers n = Int(2, config=True, help="The number of engines to start.") @@ -327,7 +308,7 @@ class IPClusterEngines(ClusterApplication): klass = import_item(clsname) launcher = klass( - work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name + work_dir=self.profile_dir.location, config=self.config, logname=self.log.name ) return launcher @@ -335,7 +316,7 @@ class IPClusterEngines(ClusterApplication): self.log.info("Starting %i engines"%self.n) self.engine_launcher.start( self.n, - cluster_dir=self.cluster_dir.location + profile_dir=self.profile_dir.location ) def stop_engines(self): @@ -362,12 +343,12 @@ class IPClusterEngines(ClusterApplication): def start_logging(self): # Remove old log files of the controller and engine if self.clean_logs: - log_dir = self.cluster_dir.log_dir + log_dir = self.profile_dir.log_dir for f in os.listdir(log_dir): if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): os.remove(os.path.join(log_dir, f)) # This will remove old log files for ipcluster itself - # super(IPClusterApp, self).start_logging() + # super(IPBaseParallelApplication, self).start_logging() def start(self): """Start the app for the engines subcommand.""" @@ -410,12 +391,12 @@ class IPClusterStart(IPClusterEngines): name = u'ipcluster' description = start_help default_log_level = logging.INFO - auto_create_cluster_dir = Bool(True, config=True, - help="whether to create the cluster_dir if it doesn't exist") + auto_create = Bool(True, config=True, + help="whether to create the profile_dir if it doesn't exist") classes = List() def _classes_default(self,): from IPython.parallel.apps import launcher - return [ClusterDir]+launcher.all_launchers + return [ProfileDir]+launcher.all_launchers clean_logs = Bool(True, config=True, help="whether to cleanup old logs before starting") @@ -441,7 +422,7 @@ class IPClusterStart(IPClusterEngines): def start_controller(self): self.controller_launcher.start( - cluster_dir=self.cluster_dir.location + profile_dir=self.profile_dir.location ) def stop_controller(self): @@ -504,7 +485,7 @@ class IPClusterStart(IPClusterEngines): base='IPython.parallel.apps.ipclusterapp.IPCluster' -class IPClusterApp(Application): +class IPBaseParallelApplication(Application): name = u'ipcluster' description = _description @@ -530,7 +511,7 @@ class IPClusterApp(Application): def launch_new_instance(): """Create and run the IPython cluster.""" - app = IPClusterApp() + app = IPBaseParallelApplication() app.initialize() app.start() diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 40bb50d..00c260f 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -17,9 +17,7 @@ The IPython controller application. from __future__ import with_statement -import copy import os -import logging import socket import stat import sys @@ -33,14 +31,11 @@ from zmq.log.handlers import PUBHandler from zmq.utils import jsonapi as json from IPython.config.loader import Config - -from IPython.parallel import factory +from IPython.core.newapplication import ProfileDir from IPython.parallel.apps.clusterdir import ( - ClusterDir, - ClusterApplication, + BaseParallelApplication, base_flags - # ClusterDirConfigLoader ) from IPython.utils.importstring import import_item from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict @@ -48,11 +43,11 @@ from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict # from IPython.parallel.controller.controller import ControllerFactory from IPython.parallel.streamsession import StreamSession from IPython.parallel.controller.heartmonitor import HeartMonitor -from IPython.parallel.controller.hub import Hub, HubFactory +from IPython.parallel.controller.hub import HubFactory from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler from IPython.parallel.controller.sqlitedb import SQLiteDB -from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url +from IPython.parallel.util import signal_children, split_url # conditional import of MongoDB backend class @@ -80,7 +75,7 @@ clients. The controller needs to be started before the engines and can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in your ipython directory and named as "cluster_". See the `profile` -and `cluster_dir` options for details. +and `profile_dir` options for details. """ @@ -106,15 +101,17 @@ flags.update({ flags.update() -class IPControllerApp(ClusterApplication): +class IPControllerApp(BaseParallelApplication): name = u'ipcontroller' description = _description config_file_name = Unicode(default_config_file_name) - classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo + classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo + + # change default to True + auto_create = Bool(True, config=True, + help="""Whether to create profile dir if it doesn't exist""") - auto_create_cluster_dir = Bool(True, config=True, - help="Whether to create cluster_dir if it exists.") reuse_files = Bool(False, config=True, help='Whether to reuse existing json connection files [default: False]' ) @@ -146,8 +143,6 @@ class IPControllerApp(ClusterApplication): self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') aliases = Dict(dict( - config = 'IPControllerApp.config_file', - # file = 'IPControllerApp.url_file', log_level = 'IPControllerApp.log_level', log_url = 'IPControllerApp.log_url', reuse_files = 'IPControllerApp.reuse_files', @@ -172,8 +167,8 @@ class IPControllerApp(ClusterApplication): hwm = 'TaskScheduler.hwm', - profile = "ClusterDir.profile", - cluster_dir = 'ClusterDir.location', + profile = "BaseIPythonApplication.profile", + profile_dir = 'ProfileDir.location', )) flags = Dict(flags) @@ -192,7 +187,7 @@ class IPControllerApp(ClusterApplication): else: location = socket.gethostbyname_ex(socket.gethostname())[2][-1] cdict['location'] = location - fname = os.path.join(self.cluster_dir.security_dir, fname) + fname = os.path.join(self.profile_dir.security_dir, fname) with open(fname, 'w') as f: f.write(json.dumps(cdict, indent=2)) os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) @@ -201,7 +196,7 @@ class IPControllerApp(ClusterApplication): """load config from existing json connector files.""" c = self.config # load from engine config - with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f: + with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: cfg = json.loads(f.read()) key = c.StreamSession.key = cfg['exec_key'] xport,addr = cfg['url'].split('://') @@ -212,7 +207,7 @@ class IPControllerApp(ClusterApplication): self.location = cfg['location'] # load client config - with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f: + with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: cfg = json.loads(f.read()) assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" xport,addr = cfg['url'].split('://') @@ -237,7 +232,7 @@ class IPControllerApp(ClusterApplication): pass elif self.secure: key = str(uuid.uuid4()) - # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key) + # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key) # with open(keyfile, 'w') as f: # f.write(key) # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) @@ -332,7 +327,7 @@ class IPControllerApp(ClusterApplication): """save the registration urls to files.""" c = self.config - sec_dir = self.cluster_dir.security_dir + sec_dir = self.profile_dir.security_dir cf = self.factory with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index fdd4b68..ce202f1 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -22,10 +22,9 @@ import sys import zmq from zmq.eventloop import ioloop +from IPython.core.newapplication import ProfileDir from IPython.parallel.apps.clusterdir import ( - ClusterApplication, - ClusterDir, - # ClusterDirConfigLoader + BaseParallelApplication, ) from IPython.zmq.log import EnginePUBHandler @@ -53,7 +52,7 @@ and controller. A controller needs to be started before the engines. The engine can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in your ipython directory and named as "cluster_". -See the `profile` and `cluster_dir` options for details. +See the `profile` and `profile_dir` options for details. """ @@ -98,16 +97,13 @@ class MPI(Configurable): #----------------------------------------------------------------------------- -class IPEngineApp(ClusterApplication): +class IPEngineApp(BaseParallelApplication): app_name = Unicode(u'ipengine') description = Unicode(_description) config_file_name = Unicode(default_config_file_name) - classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI]) + classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI]) - auto_create_cluster_dir = Bool(False, - help="whether to create the cluster_dir if it doesn't exist") - startup_script = Unicode(u'', config=True, help='specify a script to be run at startup') startup_command = Unicode('', config=True, @@ -117,7 +113,7 @@ class IPEngineApp(ClusterApplication): help="""The full location of the file containing the connection information for the controller. If this is not given, the file must be in the security directory of the cluster directory. This location is - resolved using the `profile` or `cluster_dir` options.""", + resolved using the `profile` or `profile_dir` options.""", ) url_file_name = Unicode(u'ipcontroller-engine.json') @@ -126,7 +122,6 @@ class IPEngineApp(ClusterApplication): logging to a central location.""") aliases = Dict(dict( - config = 'IPEngineApp.config_file', file = 'IPEngineApp.url_file', c = 'IPEngineApp.startup_command', s = 'IPEngineApp.startup_script', @@ -143,8 +138,8 @@ class IPEngineApp(ClusterApplication): timeout = 'EngineFactory.timeout', - profile = "ClusterDir.profile", - cluster_dir = 'ClusterDir.location', + profile = "IPEngineApp.profile", + profile_dir = 'ProfileDir.location', mpi = 'MPI.use', @@ -162,7 +157,7 @@ class IPEngineApp(ClusterApplication): # # Find the actual controller key file # if not config.Global.key_file: # try_this = os.path.join( - # config.Global.cluster_dir, + # config.Global.profile_dir, # config.Global.security_dir, # config.Global.key_file_name # ) @@ -178,7 +173,7 @@ class IPEngineApp(ClusterApplication): # Find the actual controller key file if not self.url_file: self.url_file = os.path.join( - self.cluster_dir.security_dir, + self.profile_dir.security_dir, self.url_file_name ) def init_engine(self): diff --git a/IPython/parallel/apps/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py index 5b1cf73..6f330d4 100755 --- a/IPython/parallel/apps/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -20,11 +20,11 @@ import sys import zmq +from IPython.core.newapplication import ProfileDir from IPython.utils.traitlets import Bool, Dict, Unicode from IPython.parallel.apps.clusterdir import ( - ClusterApplication, - ClusterDir, + BaseParallelApplication, base_aliases ) from IPython.parallel.apps.logwatcher import LogWatcher @@ -43,7 +43,7 @@ by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The logger can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in your ipython directory and named as "cluster_". -See the `profile` and `cluster_dir` options for details. +See the `profile` and `profile_dir` options for details. """ @@ -54,14 +54,13 @@ aliases = {} aliases.update(base_aliases) aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics')) -class IPLoggerApp(ClusterApplication): +class IPLoggerApp(BaseParallelApplication): name = u'iploggerz' description = _description config_file_name = Unicode(default_config_file_name) - auto_create_cluster_dir = Bool(False) - classes = [LogWatcher, ClusterDir] + classes = [LogWatcher, ProfileDir] aliases = Dict(aliases) def initialize(self, argv=None): diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 42ad434..cc7a8d7 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -101,7 +101,7 @@ class BaseLauncher(LoggingFactory): """An asbtraction for starting, stopping and signaling a process.""" # In all of the launchers, the work_dir is where child processes will be - # run. This will usually be the cluster_dir, but may not be. any work_dir + # run. This will usually be the profile_dir, but may not be. any work_dir # passed into the __init__ method will override the config value. # This should not be used to set the work_dir for the actual engine # and controller. Instead, use their own config files or the @@ -337,10 +337,10 @@ class LocalControllerLauncher(LocalProcessLauncher): def find_args(self): return self.controller_cmd + self.controller_args - def start(self, cluster_dir): - """Start the controller by cluster_dir.""" - self.controller_args.extend(['cluster_dir=%s'%cluster_dir]) - self.cluster_dir = unicode(cluster_dir) + def start(self, profile_dir): + """Start the controller by profile_dir.""" + self.controller_args.extend(['profile_dir=%s'%profile_dir]) + self.profile_dir = unicode(profile_dir) self.log.info("Starting LocalControllerLauncher: %r" % self.args) return super(LocalControllerLauncher, self).start() @@ -358,10 +358,10 @@ class LocalEngineLauncher(LocalProcessLauncher): def find_args(self): return self.engine_cmd + self.engine_args - def start(self, cluster_dir): - """Start the engine by cluster_dir.""" - self.engine_args.extend(['cluster_dir=%s'%cluster_dir]) - self.cluster_dir = unicode(cluster_dir) + def start(self, profile_dir): + """Start the engine by profile_dir.""" + self.engine_args.extend(['profile_dir=%s'%profile_dir]) + self.profile_dir = unicode(profile_dir) return super(LocalEngineLauncher, self).start() @@ -385,16 +385,16 @@ class LocalEngineSetLauncher(BaseLauncher): ) self.stop_data = {} - def start(self, n, cluster_dir): - """Start n engines by profile or cluster_dir.""" - self.cluster_dir = unicode(cluster_dir) + def start(self, n, profile_dir): + """Start n engines by profile or profile_dir.""" + self.profile_dir = unicode(profile_dir) dlist = [] for i in range(n): el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name) # Copy the engine args over to each engine launcher. el.engine_args = copy.deepcopy(self.engine_args) el.on_stop(self._notice_engine_stopped) - d = el.start(cluster_dir) + d = el.start(profile_dir) if i==0: self.log.info("Starting LocalEngineSetLauncher: %r" % el.args) self.launchers[i] = el @@ -481,10 +481,10 @@ class MPIExecControllerLauncher(MPIExecLauncher): ) n = Int(1) - def start(self, cluster_dir): - """Start the controller by cluster_dir.""" - self.controller_args.extend(['cluster_dir=%s'%cluster_dir]) - self.cluster_dir = unicode(cluster_dir) + def start(self, profile_dir): + """Start the controller by profile_dir.""" + self.controller_args.extend(['profile_dir=%s'%profile_dir]) + self.profile_dir = unicode(profile_dir) self.log.info("Starting MPIExecControllerLauncher: %r" % self.args) return super(MPIExecControllerLauncher, self).start(1) @@ -504,10 +504,10 @@ class MPIExecEngineSetLauncher(MPIExecLauncher): ) n = Int(1) - def start(self, n, cluster_dir): - """Start n engines by profile or cluster_dir.""" - self.program_args.extend(['cluster_dir=%s'%cluster_dir]) - self.cluster_dir = unicode(cluster_dir) + def start(self, n, profile_dir): + """Start n engines by profile or profile_dir.""" + self.program_args.extend(['profile_dir=%s'%profile_dir]) + self.profile_dir = unicode(profile_dir) self.n = n self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args) return super(MPIExecEngineSetLauncher, self).start(n) @@ -554,8 +554,8 @@ class SSHLauncher(LocalProcessLauncher): return self.ssh_cmd + self.ssh_args + [self.location] + \ self.program + self.program_args - def start(self, cluster_dir, hostname=None, user=None): - self.cluster_dir = unicode(cluster_dir) + def start(self, profile_dir, hostname=None, user=None): + self.profile_dir = unicode(profile_dir) if hostname is not None: self.hostname = hostname if user is not None: @@ -594,12 +594,12 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): help="""dict of engines to launch. This is a dict by hostname of ints, corresponding to the number of engines to start on that host.""") - def start(self, n, cluster_dir): - """Start engines by profile or cluster_dir. + def start(self, n, profile_dir): + """Start engines by profile or profile_dir. `n` is ignored, and the `engines` config property is used instead. """ - self.cluster_dir = unicode(cluster_dir) + self.profile_dir = unicode(profile_dir) dlist = [] for host, n in self.engines.iteritems(): if isinstance(n, (tuple, list)): @@ -618,7 +618,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): i el.program_args = args el.on_stop(self._notice_engine_stopped) - d = el.start(cluster_dir, user=user, hostname=host) + d = el.start(profile_dir, user=user, hostname=host) if i==0: self.log.info("Starting SSHEngineSetLauncher: %r" % el.args) self.launchers[host+str(i)] = el @@ -739,8 +739,8 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): # The tasks work directory is *not* the actual work directory of # the controller. It is used as the base path for the stdout/stderr # files that the scheduler redirects to. - t.work_directory = self.cluster_dir - # Add the cluster_dir and from self.start(). + t.work_directory = self.profile_dir + # Add the profile_dir and from self.start(). t.controller_args.extend(self.extra_args) job.add_task(t) @@ -749,12 +749,12 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): @property def job_file(self): - return os.path.join(self.cluster_dir, self.job_file_name) + return os.path.join(self.profile_dir, self.job_file_name) - def start(self, cluster_dir): - """Start the controller by cluster_dir.""" - self.extra_args = ['cluster_dir=%s'%cluster_dir] - self.cluster_dir = unicode(cluster_dir) + def start(self, profile_dir): + """Start the controller by profile_dir.""" + self.extra_args = ['profile_dir=%s'%profile_dir] + self.profile_dir = unicode(profile_dir) return super(WindowsHPCControllerLauncher, self).start(1) @@ -773,8 +773,8 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # The tasks work directory is *not* the actual work directory of # the engine. It is used as the base path for the stdout/stderr # files that the scheduler redirects to. - t.work_directory = self.cluster_dir - # Add the cluster_dir and from self.start(). + t.work_directory = self.profile_dir + # Add the profile_dir and from self.start(). t.engine_args.extend(self.extra_args) job.add_task(t) @@ -783,12 +783,12 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): @property def job_file(self): - return os.path.join(self.cluster_dir, self.job_file_name) + return os.path.join(self.profile_dir, self.job_file_name) - def start(self, n, cluster_dir): - """Start the controller by cluster_dir.""" - self.extra_args = ['cluster_dir=%s'%cluster_dir] - self.cluster_dir = unicode(cluster_dir) + def start(self, n, profile_dir): + """Start the controller by profile_dir.""" + self.extra_args = ['profile_dir=%s'%profile_dir] + self.profile_dir = unicode(profile_dir) return super(WindowsHPCEngineSetLauncher, self).start(n) @@ -897,13 +897,13 @@ class BatchSystemLauncher(BaseLauncher): f.write(script_as_string) os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) - def start(self, n, cluster_dir): + def start(self, n, profile_dir): """Start n copies of the process using a batch system.""" - # Here we save profile and cluster_dir in the context so they + # Here we save profile and profile_dir in the context so they # can be used in the batch script template as ${profile} and - # ${cluster_dir} - self.context['cluster_dir'] = cluster_dir - self.cluster_dir = unicode(cluster_dir) + # ${profile_dir} + self.context['profile_dir'] = profile_dir + self.profile_dir = unicode(profile_dir) self.write_batch_script(n) output = check_output(self.args, env=os.environ) @@ -942,13 +942,13 @@ class PBSControllerLauncher(PBSLauncher): default_template= Unicode("""#!/bin/sh #PBS -V #PBS -N ipcontroller -%s --log-to-file cluster_dir $cluster_dir +%s --log-to-file profile_dir $profile_dir """%(' '.join(ipcontroller_cmd_argv))) - def start(self, cluster_dir): - """Start the controller by profile or cluster_dir.""" + def start(self, profile_dir): + """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(PBSControllerLauncher, self).start(1, cluster_dir) + return super(PBSControllerLauncher, self).start(1, profile_dir) class PBSEngineSetLauncher(PBSLauncher): @@ -958,13 +958,13 @@ class PBSEngineSetLauncher(PBSLauncher): default_template= Unicode(u"""#!/bin/sh #PBS -V #PBS -N ipengine -%s cluster_dir $cluster_dir +%s profile_dir $profile_dir """%(' '.join(ipengine_cmd_argv))) - def start(self, n, cluster_dir): - """Start n engines by profile or cluster_dir.""" + def start(self, n, profile_dir): + """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) - return super(PBSEngineSetLauncher, self).start(n, cluster_dir) + return super(PBSEngineSetLauncher, self).start(n, profile_dir) #SGE is very similar to PBS @@ -983,13 +983,13 @@ class SGEControllerLauncher(SGELauncher): default_template= Unicode(u"""#$$ -V #$$ -S /bin/sh #$$ -N ipcontroller -%s --log-to-file cluster_dir=$cluster_dir +%s --log-to-file profile_dir=$profile_dir """%(' '.join(ipcontroller_cmd_argv))) - def start(self, cluster_dir): - """Start the controller by profile or cluster_dir.""" + def start(self, profile_dir): + """Start the controller by profile or profile_dir.""" self.log.info("Starting PBSControllerLauncher: %r" % self.args) - return super(PBSControllerLauncher, self).start(1, cluster_dir) + return super(PBSControllerLauncher, self).start(1, profile_dir) class SGEEngineSetLauncher(SGELauncher): """Launch Engines with SGE""" @@ -998,13 +998,13 @@ class SGEEngineSetLauncher(SGELauncher): default_template = Unicode("""#$$ -V #$$ -S /bin/sh #$$ -N ipengine -%s cluster_dir=$cluster_dir +%s profile_dir=$profile_dir """%(' '.join(ipengine_cmd_argv))) - def start(self, n, cluster_dir): - """Start n engines by profile or cluster_dir.""" + def start(self, n, profile_dir): + """Start n engines by profile or profile_dir.""" self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) - return super(SGEEngineSetLauncher, self).start(n, cluster_dir) + return super(SGEEngineSetLauncher, self).start(n, profile_dir) #----------------------------------------------------------------------------- diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 1984ba6..ee875d6 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -34,7 +34,7 @@ from IPython.parallel import streamsession as ss from IPython.parallel import util from .asyncresult import AsyncResult, AsyncHubResult -from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError +from IPython.core.newapplication import ProfileDir, ProfileDirError from .view import DirectView, LoadBalancedView #-------------------------------------------------------------------------- @@ -234,7 +234,7 @@ class Client(HasTraits): _ignored_control_replies=Int(0) _ignored_hub_replies=Int(0) - def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None, + def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None, context=None, username=None, debug=False, exec_key=None, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10 @@ -245,7 +245,7 @@ class Client(HasTraits): self._context = context - self._setup_cluster_dir(profile, cluster_dir, ipython_dir) + self._setup_profile_dir(profile, profile_dir, ipython_dir) if self._cd is not None: if url_or_file is None: url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') @@ -318,21 +318,21 @@ class Client(HasTraits): """cleanup sockets, but _not_ context.""" self.close() - def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir): + def _setup_profile_dir(self, profile, profile_dir, ipython_dir): if ipython_dir is None: ipython_dir = get_ipython_dir() - if cluster_dir is not None: + if profile_dir is not None: try: - self._cd = ClusterDir.find_cluster_dir(cluster_dir) + self._cd = ProfileDir.find_profile_dir(profile_dir) return - except ClusterDirError: + except ProfileDirError: pass elif profile is not None: try: - self._cd = ClusterDir.find_cluster_dir_by_profile( + self._cd = ProfileDir.find_profile_dir_by_name( ipython_dir, profile) return - except ClusterDirError: + except ProfileDirError: pass self._cd = None diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 737cf2d..c6f90bd 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -122,10 +122,16 @@ class SQLiteDB(BaseDB): # use session, and prefix _, since starting with # is illegal self.table = '_'+self.session.replace('-','_') if not self.location: - if hasattr(self.config.Global, 'cluster_dir'): - self.location = self.config.Global.cluster_dir + # get current profile + from IPython.core.newapplication import BaseIPythonApplication + if BaseIPythonApplication.initialized(): + app = BaseIPythonApplication.instance() + if app.profile_dir is not None: + self.location = app.profile_dir.location + else: + self.location = u'.' else: - self.location = '.' + self.location = u'.' self._init_db() # register db commit as 2s periodic callback