diff --git a/IPython/parallel/apps/clusterdir.py b/IPython/parallel/apps/clusterdir.py index d7da231..8f520d7 100755 --- a/IPython/parallel/apps/clusterdir.py +++ b/IPython/parallel/apps/clusterdir.py @@ -82,6 +82,11 @@ class ClusterDir(Configurable): 'default'. The cluster directory is resolve this way if the `cluster_dir` option is not used.""", config=True ) + auto_create = Bool(False, + help="""Whether to automatically create the ClusterDirectory if it does + not exist""") + overwrite = Bool(False, + help="""Whether to overwrite existing config files""") _location_isset = Bool(False) # flag for detecting multiply set location _new_dir = Bool(False) # flag for whether a new dir was created @@ -96,10 +101,14 @@ class ClusterDir(Configurable): raise RuntimeError("Cannot set ClusterDir more than once.") self._location_isset = True if not os.path.isdir(new): - os.makedirs(new) - self._new_dir = True + if self.auto_create: + os.makedirs(new) + self._new_dir = True + else: + raise ClusterDirError('Directory not found: %s' % new) + # ensure config files exist: - self.copy_all_config_files(overwrite=False) + self.copy_all_config_files(overwrite=self.overwrite) self.security_dir = os.path.join(new, self.security_dir_name) self.log_dir = os.path.join(new, self.log_dir_name) self.pid_dir = os.path.join(new, self.pid_dir_name) @@ -289,22 +298,23 @@ class ClusterDirCrashHandler(CrashHandler): base_aliases = { 'profile' : "ClusterDir.profile", 'cluster_dir' : 'ClusterDir.location', - 'log_level' : 'Application.log_level', - 'work_dir' : 'ClusterDirApplicaiton.work_dir', - 'log_to_file' : 'ClusterDirApplicaiton.log_to_file', - 'clean_logs' : 'ClusterDirApplicaiton.clean_logs', - 'log_url' : 'ClusterDirApplicaiton.log_url', + 'auto_create' : 'ClusterDirApplication.auto_create', + 'log_level' : 'ClusterApplication.log_level', + 'work_dir' : 'ClusterApplication.work_dir', + 'log_to_file' : 'ClusterApplication.log_to_file', + 'clean_logs' : 'ClusterApplication.clean_logs', + 'log_url' : 'ClusterApplication.log_url', } base_flags = { - 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"), - 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"), - 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file") + 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"), + 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"), + 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"), } for k,v in base_flags.iteritems(): base_flags[k] = (Config(v[0]),v[1]) -class ClusterDirApplication(BaseIPythonApplication): +class ClusterApplication(BaseIPythonApplication): """An application that puts everything into a cluster directory. Instead of looking for things in the ipython_dir, this type of application @@ -326,9 +336,12 @@ class ClusterDirApplication(BaseIPythonApplication): crash_handler_class = ClusterDirCrashHandler auto_create_cluster_dir = Bool(True, config=True, help="whether to create the cluster_dir if it doesn't exist") - # temporarily override default_log_level to INFO - default_log_level = logging.INFO cluster_dir = Instance(ClusterDir) + classes = [ClusterDir] + + def _log_level_default(self): + # temporarily override default_log_level to INFO + return logging.INFO work_dir = Unicode(os.getcwdu(), config=True, help='Set the working dir for the process.' @@ -339,7 +352,7 @@ class ClusterDirApplication(BaseIPythonApplication): log_to_file = Bool(config=True, help="whether to log to a file") - clean_logs = Bool(True, shortname='--clean-logs', config=True, + clean_logs = Bool(False, shortname='--clean-logs', config=True, help="whether to cleanup old logfiles before starting") log_url = CStr('', shortname='--log-url', config=True, @@ -349,6 +362,11 @@ class ClusterDirApplication(BaseIPythonApplication): help="""Path to ipcontroller configuration file. The default is to use _config.py, as found by cluster-dir.""" ) + + loop = Instance('zmq.eventloop.ioloop.IOLoop') + def _loop_default(self): + from zmq.eventloop.ioloop import IOLoop + return IOLoop.instance() aliases = Dict(base_aliases) flags = Dict(base_flags) @@ -370,14 +388,30 @@ class ClusterDirApplication(BaseIPythonApplication): ``True``, then create the new cluster dir in the IPython directory. 4. If all fails, then raise :class:`ClusterDirError`. """ - self.cluster_dir = ClusterDir(config=self.config) + self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir) if self.cluster_dir._new_dir: self.log.info('Creating new cluster dir: %s' % \ self.cluster_dir.location) else: self.log.info('Using existing cluster dir: %s' % \ self.cluster_dir.location) - + + def initialize(self, argv=None): + """initialize the app""" + self.parse_command_line(argv) + cl_config = self.config + self.init_clusterdir() + if self.config_file: + self.load_config_file(self.config_file) + else: + self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location) + # command-line should *override* config file, but command-line is necessary + # to determine clusterdir, etc. + self.update_config(cl_config) + self.reinit_logging() + + self.to_work_dir() + def to_work_dir(self): wd = self.work_dir if unicode(wd) != os.getcwdu(): @@ -386,6 +420,9 @@ class ClusterDirApplication(BaseIPythonApplication): def load_config_file(self, filename, path=None): """Load a .py based config file by filename and path.""" + # use config.application.Application.load_config + # instead of inflexible + # core.newapplication.BaseIPythonApplication.load_config return Application.load_config_file(self, filename, path=path) # # def load_default_config_file(self): @@ -393,30 +430,26 @@ class ClusterDirApplication(BaseIPythonApplication): # return BaseIPythonApplication.load_config_file(self) # disable URL-logging - # def init_logging(self): - # # Remove old log files - # if self.master_config.Global.clean_logs: - # log_dir = self.master_config.Global.log_dir - # for f in os.listdir(log_dir): - # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): - # # if f.startswith(self.name + u'-') and f.endswith('.log'): - # os.remove(os.path.join(log_dir, f)) - # # Start logging to the new log file - # if self.master_config.Global.log_to_file: - # log_filename = self.name + u'-' + str(os.getpid()) + u'.log' - # logfile = os.path.join(self.log_dir, log_filename) - # open_log_file = open(logfile, 'w') - # elif self.master_config.Global.log_url: - # open_log_file = None - # else: - # open_log_file = sys.stdout - # if open_log_file is not None: - # self.log.removeHandler(self._log_handler) - # self._log_handler = logging.StreamHandler(open_log_file) - # self._log_formatter = logging.Formatter("[%(name)s] %(message)s") - # self._log_handler.setFormatter(self._log_formatter) - # self.log.addHandler(self._log_handler) - # # log.startLogging(open_log_file) + def reinit_logging(self): + # Remove old log files + log_dir = self.cluster_dir.log_dir + if self.clean_logs: + for f in os.listdir(log_dir): + if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): + os.remove(os.path.join(log_dir, f)) + if self.log_to_file: + # Start logging to the new log file + log_filename = self.name + u'-' + str(os.getpid()) + u'.log' + logfile = os.path.join(log_dir, log_filename) + open_log_file = open(logfile, 'w') + else: + open_log_file = None + if open_log_file is not None: + self.log.removeHandler(self._log_handler) + self._log_handler = logging.StreamHandler(open_log_file) + self._log_formatter = logging.Formatter("[%(name)s] %(message)s") + self._log_handler.setFormatter(self._log_formatter) + self.log.addHandler(self._log_handler) def write_pid_file(self, overwrite=False): """Create a .pid file in the pid_dir with my pid. diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index 67ce5f1..a6f3579 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -25,14 +25,16 @@ from subprocess import check_call, CalledProcessError, PIPE import zmq from zmq.eventloop import ioloop +from IPython.config.application import Application, boolean_flag from IPython.config.loader import Config +from IPython.core.newapplication import BaseIPythonApplication from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List from IPython.parallel.apps.clusterdir import ( - ClusterDirApplication, ClusterDirError, + ClusterApplication, ClusterDirError, ClusterDir, PIDFileError, - base_flags, + base_flags, base_aliases ) @@ -52,8 +54,8 @@ This command automates the startup of these processes using a wide range of startup methods (SSH, local processes, PBS, mpiexec, Windows HPC Server 2008). To start a cluster with 4 engines on your local host simply do 'ipcluster start n=4'. For more complex usage -you will typically do 'ipcluster --create profile=mycluster', then edit -configuration files, followed by 'ipcluster --start -p mycluster -n 4'. +you will typically do 'ipcluster create profile=mycluster', then edit +configuration files, followed by 'ipcluster start profile=mycluster n=4'. """ @@ -76,158 +78,65 @@ NO_CLUSTER = 12 #----------------------------------------------------------------------------- # Main application #----------------------------------------------------------------------------- -start_help = """Start an ipython cluster by its profile name or cluster - directory. Cluster directories contain configuration, log and - security related files and are named using the convention - 'cluster_' and should be creating using the 'start' - subcommand of 'ipcluster'. If your cluster directory is in - the cwd or the ipython directory, you can simply refer to it - using its profile name, 'ipcluster start -n 4 -p `, - otherwise use the '--cluster-dir' option. - """ -stop_help = """Stop a running ipython cluster by its profile name or cluster - directory. Cluster directories are named using the convention - 'cluster_'. If your cluster directory is in - the cwd or the ipython directory, you can simply refer to it - using its profile name, 'ipcluster stop -p `, otherwise - use the '--cluster-dir' option. - """ -engines_help = """Start one or more engines to connect to an existing Cluster - by profile name or cluster directory. - Cluster directories contain configuration, log and - security related files and are named using the convention - 'cluster_' and should be creating using the 'start' - subcommand of 'ipcluster'. If your cluster directory is in - the cwd or the ipython directory, you can simply refer to it - using its profile name, 'ipcluster --engines -n 4 -p `, - otherwise use the 'cluster_dir' option. - """ -create_help = """Create an ipython cluster directory by its profile name or - cluster directory path. Cluster directories contain - configuration, log and security related files and are named - using the convention 'cluster_'. By default they are - located in your ipython directory. Once created, you will - probably need to edit the configuration files in the cluster - directory to configure your cluster. Most users will create a - cluster directory by profile name, - 'ipcluster create -p mycluster', which will put the directory - in '/cluster_mycluster'. - """ +start_help = """ +Start an ipython cluster by its profile name or cluster +directory. Cluster directories contain configuration, log and +security related files and are named using the convention +'cluster_' and should be creating using the 'start' +subcommand of 'ipcluster'. If your cluster directory is in +the cwd or the ipython directory, you can simply refer to it +using its profile name, 'ipcluster start n=4 profile=`, +otherwise use the 'cluster_dir' option. +""" +stop_help = """ +Stop a running ipython cluster by its profile name or cluster +directory. Cluster directories are named using the convention +'cluster_'. If your cluster directory is in +the cwd or the ipython directory, you can simply refer to it +using its profile name, 'ipcluster stop profile=`, otherwise +use the 'cluster_dir' option. +""" +engines_help = """ +Start one or more engines to connect to an existing Cluster +by profile name or cluster directory. +Cluster directories contain configuration, log and +security related files and are named using the convention +'cluster_' and should be creating using the 'start' +subcommand of 'ipcluster'. If your cluster directory is in +the cwd or the ipython directory, you can simply refer to it +using its profile name, 'ipcluster engines n=4 profile=`, +otherwise use the 'cluster_dir' option. +""" +create_help = """ +Create an ipython cluster directory by its profile name or +cluster directory path. Cluster directories contain +configuration, log and security related files and are named +using the convention 'cluster_'. By default they are +located in your ipython directory. Once created, you will +probably need to edit the configuration files in the cluster +directory to configure your cluster. Most users will create a +cluster directory by profile name, +`ipcluster create profile=mycluster`, which will put the directory +in `/cluster_mycluster`. +""" list_help = """List all available clusters, by cluster directory, that can - be found in the current working directly or in the ipython - directory. Cluster directories are named using the convention - 'cluster_'.""" - - -flags = {} -flags.update(base_flags) -flags.update({ - 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help), - 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help), - 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help), - 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help), - 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help), - -}) - -class IPClusterApp(ClusterDirApplication): - - name = u'ipcluster' - description = _description - usage = None - default_config_file_name = default_config_file_name - default_log_level = logging.INFO - auto_create_cluster_dir = False - classes = List() - def _classes_default(self,): - from IPython.parallel.apps import launcher - return launcher.all_launchers - - n = Int(0, config=True, - help="The number of engines to start.") - signal = Int(signal.SIGINT, config=True, - help="signal to use for stopping. [default: SIGINT]") - delay = CFloat(1., config=True, - help="delay (in s) between starting the controller and the engines") - - subcommand = Str('', config=True, - help="""ipcluster has a variety of subcommands. The general way of - running ipcluster is 'ipcluster -- [options]'.""" - ) - - controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher', - config=True, - help="The class for launching a Controller." - ) - engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher', - config=True, - help="The class for launching Engines." - ) - reset = Bool(False, config=True, - help="Whether to reset config files as part of '--create'." - ) - daemonize = Bool(False, config=True, - help='Daemonize the ipcluster program. This implies --log-to-file') - - def _daemonize_changed(self, name, old, new): - if new: - self.log_to_file = True - - def _n_changed(self, name, old, new): - # propagate n all over the place... - # TODO make this clean - # ensure all classes are covered. - self.config.LocalEngineSetLauncher.n=new - self.config.MPIExecEngineSetLauncher.n=new - self.config.SSHEngineSetLauncher.n=new - self.config.PBSEngineSetLauncher.n=new - self.config.SGEEngineSetLauncher.n=new - self.config.WinHPEngineSetLauncher.n=new - - aliases = Dict(dict( - n='IPClusterApp.n', - signal = 'IPClusterApp.signal', - delay = 'IPClusterApp.delay', - clauncher = 'IPClusterApp.controller_launcher_class', - elauncher = 'IPClusterApp.engine_launcher_class', - )) - flags = Dict(flags) +be found in the current working directly or in the ipython +directory. Cluster directories are named using the convention +'cluster_'. +""" - def init_clusterdir(self): - subcommand = self.subcommand - if subcommand=='list': - self.list_cluster_dirs() - self.exit(0) - if subcommand=='create': - reset = self.reset_config - self.auto_create_cluster_dir = True - super(IPClusterApp, self).init_clusterdir() - self.log.info('Copying default config files to cluster directory ' - '[overwrite=%r]' % (reset,)) - self.cluster_dir.copy_all_config_files(overwrite=reset) - elif subcommand=='start' or subcommand=='stop': - self.auto_create_cluster_dir = True - try: - super(IPClusterApp, self).init_clusterdir() - except ClusterDirError: - raise ClusterDirError( - "Could not find a cluster directory. A cluster dir must " - "be created before running 'ipcluster start'. Do " - "'ipcluster create -h' or 'ipcluster list -h' for more " - "information about creating and listing cluster dirs." - ) - elif subcommand=='engines': - self.auto_create_cluster_dir = False - try: - super(IPClusterApp, self).init_clusterdir() - except ClusterDirError: - raise ClusterDirError( - "Could not find a cluster directory. A cluster dir must " - "be created before running 'ipcluster start'. Do " - "'ipcluster create -h' or 'ipcluster list -h' for more " - "information about creating and listing cluster dirs." - ) +class IPClusterList(BaseIPythonApplication): + name = u'ipcluster-list' + description = list_help + + # empty aliases + aliases=Dict() + flags = Dict(base_flags) + + def _log_level_default(self): + return 20 + def list_cluster_dirs(self): # Find the search paths cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') @@ -235,12 +144,10 @@ class IPClusterApp(ClusterDirApplication): cluster_dir_paths = cluster_dir_paths.split(':') else: cluster_dir_paths = [] - try: - ipython_dir = self.ipython_dir - except AttributeError: - ipython_dir = self.ipython_dir - paths = [os.getcwd(), ipython_dir] + \ - cluster_dir_paths + + ipython_dir = self.ipython_dir + + paths = [os.getcwd(), ipython_dir] + cluster_dir_paths paths = list(set(paths)) self.log.info('Searching for cluster dirs in paths: %r' % paths) @@ -250,135 +157,195 @@ class IPClusterApp(ClusterDirApplication): full_path = os.path.join(path, f) if os.path.isdir(full_path) and f.startswith('cluster_'): profile = full_path.split('_')[-1] - start_cmd = 'ipcluster --start profile=%s n=4' % profile + start_cmd = 'ipcluster start profile=%s n=4' % profile print start_cmd + " ==> " + full_path + + def start(self): + self.list_cluster_dirs() - def init_launchers(self): - config = self.config - subcmd = self.subcommand - if subcmd =='start': - self.start_logging() - self.loop = ioloop.IOLoop.instance() - # reactor.callWhenRunning(self.start_launchers) - dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) - dc.start() - if subcmd == 'engines': - self.start_logging() - self.loop = ioloop.IOLoop.instance() - # reactor.callWhenRunning(self.start_launchers) - engine_only = lambda : self.start_launchers(controller=False) - dc = ioloop.DelayedCallback(engine_only, 0, self.loop) - dc.start() +create_flags = {} +create_flags.update(base_flags) +create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset', + "reset config files to defaults", "leave existing config files")) + +class IPClusterCreate(ClusterApplication): + name = u'ipcluster' + description = create_help + auto_create_cluster_dir = Bool(True, + help="whether to create the cluster_dir if it doesn't exist") + default_config_file_name = default_config_file_name + + reset = Bool(False, config=True, + help="Whether to reset config files as part of 'create'." + ) + + flags = Dict(create_flags) + + aliases = Dict(dict(profile='ClusterDir.profile')) + + classes = [ClusterDir] + + def init_clusterdir(self): + super(IPClusterCreate, self).init_clusterdir() + self.log.info('Copying default config files to cluster directory ' + '[overwrite=%r]' % (self.reset,)) + self.cluster_dir.copy_all_config_files(overwrite=self.reset) + + def initialize(self, argv=None): + self.parse_command_line(argv) + self.init_clusterdir() + +stop_aliases = dict( + signal='IPClusterStop.signal', + profile='ClusterDir.profile', + cluster_dir='ClusterDir.location', +) + +class IPClusterStop(ClusterApplication): + name = u'ipcluster' + description = stop_help + auto_create_cluster_dir = Bool(False, + help="whether to create the cluster_dir if it doesn't exist") + default_config_file_name = default_config_file_name + + signal = Int(signal.SIGINT, config=True, + help="signal to use for stopping processes.") + + aliases = Dict(stop_aliases) - def start_launchers(self, controller=True): - config = self.config - - # Create the launchers. In both bases, we set the work_dir of - # the launcher to the cluster_dir. This is where the launcher's - # subprocesses will be launched. It is not where the controller - # and engine will be launched. - if controller: - clsname = self.controller_launcher_class - if '.' not in clsname: - clsname = 'IPython.parallel.apps.launcher.'+clsname - cl_class = import_item(clsname) - self.controller_launcher = cl_class( - work_dir=self.cluster_dir.location, config=config, - logname=self.log.name + def start(self): + """Start the app for the stop subcommand.""" + try: + pid = self.get_pid_from_file() + except PIDFileError: + self.log.critical( + 'Could not read pid file, cluster is probably not running.' ) - # Setup the observing of stopping. If the controller dies, shut - # everything down as that will be completely fatal for the engines. - self.controller_launcher.on_stop(self.stop_launchers) - # But, we don't monitor the stopping of engines. An engine dying - # is just fine and in principle a user could start a new engine. - # Also, if we did monitor engine stopping, it is difficult to - # know what to do when only some engines die. Currently, the - # observing of engine stopping is inconsistent. Some launchers - # might trigger on a single engine stopping, other wait until - # all stop. TODO: think more about how to handle this. - else: - self.controller_launcher = None + # Here I exit with a unusual exit status that other processes + # can watch for to learn how I existed. + self.remove_pid_file() + self.exit(ALREADY_STOPPED) - clsname = self.engine_launcher_class - if '.' not in clsname: - # not a module, presume it's the raw name in apps.launcher - clsname = 'IPython.parallel.apps.launcher.'+clsname - print repr(clsname) - el_class = import_item(clsname) + if not self.check_pid(pid): + self.log.critical( + 'Cluster [pid=%r] is not running.' % pid + ) + self.remove_pid_file() + # Here I exit with a unusual exit status that other processes + # can watch for to learn how I existed. + self.exit(ALREADY_STOPPED) + + elif os.name=='posix': + sig = self.signal + self.log.info( + "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) + ) + try: + os.kill(pid, sig) + except OSError: + self.log.error("Stopping cluster failed, assuming already dead.", + exc_info=True) + self.remove_pid_file() + elif os.name=='nt': + try: + # kill the whole tree + p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) + except (CalledProcessError, OSError): + self.log.error("Stopping cluster failed, assuming already dead.", + exc_info=True) + self.remove_pid_file() + +engine_aliases = {} +engine_aliases.update(base_aliases) +engine_aliases.update(dict( + n='IPClusterEngines.n', + elauncher = 'IPClusterEngines.engine_launcher_class', +)) +class IPClusterEngines(ClusterApplication): + + name = u'ipcluster' + description = engines_help + usage = None + default_config_file_name = default_config_file_name + default_log_level = logging.INFO + auto_create_cluster_dir = Bool(False) + classes = List() + def _classes_default(self): + from IPython.parallel.apps import launcher + launchers = launcher.all_launchers + eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__] + return [ClusterDir]+eslaunchers + + n = Int(2, config=True, + help="The number of engines to start.") - self.engine_launcher = el_class( - work_dir=self.cluster_dir.location, config=config, logname=self.log.name + engine_launcher_class = Str('LocalEngineSetLauncher', + config=True, + help="The class for launching a set of Engines." ) + daemonize = Bool(False, config=True, + help='Daemonize the ipcluster program. This implies --log-to-file') + def _daemonize_changed(self, name, old, new): + if new: + self.log_to_file = True + + aliases = Dict(engine_aliases) + # flags = Dict(flags) + _stopping = False + + def initialize(self, argv=None): + super(IPClusterEngines, self).initialize(argv) + self.init_signal() + self.init_launchers() + + def init_launchers(self): + self.engine_launcher = self.build_launcher(self.engine_launcher_class) + self.engine_launcher.on_stop(lambda r: self.loop.stop()) + + def init_signal(self): # Setup signals signal.signal(signal.SIGINT, self.sigint_handler) + + def build_launcher(self, clsname): + """import and instantiate a Launcher based on importstring""" + if '.' not in clsname: + # not a module, presume it's the raw name in apps.launcher + clsname = 'IPython.parallel.apps.launcher.'+clsname + # print repr(clsname) + klass = import_item(clsname) - # Start the controller and engines - self._stopping = False # Make sure stop_launchers is not called 2x. - if controller: - self.start_controller() - dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop) - dc.start() - self.startup_message() - - def startup_message(self, r=None): - self.log.info("IPython cluster: started") - return r - - def start_controller(self, r=None): - # self.log.info("In start_controller") - config = self.config - d = self.controller_launcher.start( - cluster_dir=self.cluster_dir.location + launcher = klass( + work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name ) - return d - - def start_engines(self, r=None): - # self.log.info("In start_engines") - config = self.config - - d = self.engine_launcher.start( + return launcher + + def start_engines(self): + self.log.info("Starting %i engines"%self.n) + self.engine_launcher.start( self.n, cluster_dir=self.cluster_dir.location ) - return d - def stop_controller(self, r=None): - # self.log.info("In stop_controller") - if self.controller_launcher and self.controller_launcher.running: - return self.controller_launcher.stop() - - def stop_engines(self, r=None): - # self.log.info("In stop_engines") + def stop_engines(self): + self.log.info("Stopping Engines...") if self.engine_launcher.running: d = self.engine_launcher.stop() - # d.addErrback(self.log_err) return d else: return None - def log_err(self, f): - self.log.error(f.getTraceback()) - return None - def stop_launchers(self, r=None): if not self._stopping: self._stopping = True - # if isinstance(r, failure.Failure): - # self.log.error('Unexpected error in ipcluster:') - # self.log.info(r.getTraceback()) self.log.error("IPython cluster: stopping") - # These return deferreds. We are not doing anything with them - # but we are holding refs to them as a reminder that they - # do return deferreds. - d1 = self.stop_engines() - d2 = self.stop_controller() + self.stop_engines() # Wait a few seconds to let things shut down. dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) dc.start() - # reactor.callLater(4.0, reactor.stop) def sigint_handler(self, signum, frame): + self.log.debug("SIGINT received, stopping launchers...") self.stop_launchers() def start_logging(self): @@ -392,25 +359,105 @@ class IPClusterApp(ClusterDirApplication): # super(IPClusterApp, self).start_logging() def start(self): - """Start the application, depending on what subcommand is used.""" - subcmd = self.subcommand - if subcmd=='create': - # init_clusterdir step completed create action - return - elif subcmd=='start': - self.start_app_start() - elif subcmd=='stop': - self.start_app_stop() - elif subcmd=='engines': - self.start_app_engines() - else: - self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'" - " must be specified") - self.exit(-1) + """Start the app for the engines subcommand.""" + self.log.info("IPython cluster: started") + # First see if the cluster is already running + + # Now log and daemonize + self.log.info( + 'Starting engines with [daemon=%r]' % self.daemonize + ) + # TODO: Get daemonize working on Windows or as a Windows Server. + if self.daemonize: + if os.name=='posix': + from twisted.scripts._twistd_unix import daemonize + daemonize() + + dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop) + dc.start() + # Now write the new pid file AFTER our new forked pid is active. + # self.write_pid_file() + try: + self.loop.start() + except KeyboardInterrupt: + pass + except zmq.ZMQError as e: + if e.errno == errno.EINTR: + pass + else: + raise + +start_aliases = {} +start_aliases.update(engine_aliases) +start_aliases.update(dict( + delay='IPClusterStart.delay', + clean_logs='IPClusterStart.clean_logs', +)) + +class IPClusterStart(IPClusterEngines): + + name = u'ipcluster' + description = start_help + usage = None + default_config_file_name = default_config_file_name + default_log_level = logging.INFO + auto_create_cluster_dir = Bool(True, config=True, + help="whether to create the cluster_dir if it doesn't exist") + classes = List() + def _classes_default(self,): + from IPython.parallel.apps import launcher + return [ClusterDir]+launcher.all_launchers + + clean_logs = Bool(True, config=True, + help="whether to cleanup old logs before starting") + + delay = CFloat(1., config=True, + help="delay (in s) between starting the controller and the engines") + + controller_launcher_class = Str('LocalControllerLauncher', + config=True, + help="The class for launching a Controller." + ) + reset = Bool(False, config=True, + help="Whether to reset config files as part of '--create'." + ) + + # flags = Dict(flags) + aliases = Dict(start_aliases) + + def init_clusterdir(self): + try: + super(IPClusterStart, self).init_clusterdir() + except ClusterDirError: + raise ClusterDirError( + "Could not find a cluster directory. A cluster dir must " + "be created before running 'ipcluster start'. Do " + "'ipcluster create -h' or 'ipcluster list -h' for more " + "information about creating and listing cluster dirs." + ) + + def init_launchers(self): + self.controller_launcher = self.build_launcher(self.controller_launcher_class) + self.engine_launcher = self.build_launcher(self.engine_launcher_class) + self.controller_launcher.on_stop(self.stop_launchers) + + def start_controller(self): + self.controller_launcher.start( + cluster_dir=self.cluster_dir.location + ) + + def stop_controller(self): + # self.log.info("In stop_controller") + if self.controller_launcher and self.controller_launcher.running: + return self.controller_launcher.stop() + + def stop_launchers(self, r=None): + if not self._stopping: + self.stop_controller() + super(IPClusterStart, self).stop_launchers() - def start_app_start(self): + def start(self): """Start the app for the start subcommand.""" - config = self.config # First see if the cluster is already running try: pid = self.get_pid_from_file() @@ -439,6 +486,10 @@ class IPClusterApp(ClusterDirApplication): from twisted.scripts._twistd_unix import daemonize daemonize() + dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) + dc.start() + dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) + dc.start() # Now write the new pid file AFTER our new forked pid is active. self.write_pid_file() try: @@ -453,95 +504,36 @@ class IPClusterApp(ClusterDirApplication): finally: self.remove_pid_file() - def start_app_engines(self): - """Start the app for the start subcommand.""" - config = self.config - # First see if the cluster is already running - - # Now log and daemonize - self.log.info( - 'Starting engines with [daemon=%r]' % self.daemonize - ) - # TODO: Get daemonize working on Windows or as a Windows Server. - if self.daemonize: - if os.name=='posix': - from twisted.scripts._twistd_unix import daemonize - daemonize() +base='IPython.parallel.apps.ipclusterapp.IPCluster' - # Now write the new pid file AFTER our new forked pid is active. - # self.write_pid_file() - try: - self.loop.start() - except KeyboardInterrupt: - pass - except zmq.ZMQError as e: - if e.errno == errno.EINTR: - pass - else: - raise - # self.remove_pid_file() +class IPClusterApp(Application): + name = u'ipcluster' + description = _description - def start_app_stop(self): - """Start the app for the stop subcommand.""" - config = self.config - try: - pid = self.get_pid_from_file() - except PIDFileError: - self.log.critical( - 'Could not read pid file, cluster is probably not running.' - ) - # Here I exit with a unusual exit status that other processes - # can watch for to learn how I existed. - self.remove_pid_file() - self.exit(ALREADY_STOPPED) - - if not self.check_pid(pid): - self.log.critical( - 'Cluster [pid=%r] is not running.' % pid - ) - self.remove_pid_file() - # Here I exit with a unusual exit status that other processes - # can watch for to learn how I existed. - self.exit(ALREADY_STOPPED) - - elif os.name=='posix': - sig = self.signal - self.log.info( - "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) - ) - try: - os.kill(pid, sig) - except OSError: - self.log.error("Stopping cluster failed, assuming already dead.", - exc_info=True) - self.remove_pid_file() - elif os.name=='nt': - try: - # kill the whole tree - p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) - except (CalledProcessError, OSError): - self.log.error("Stopping cluster failed, assuming already dead.", - exc_info=True) - self.remove_pid_file() - + subcommands = {'create' : (base+'Create', create_help), + 'list' : (base+'List', list_help), + 'start' : (base+'Start', start_help), + 'stop' : (base+'Stop', stop_help), + 'engines' : (base+'Engines', engines_help), + } + + # no aliases or flags for parent App + aliases = Dict() + flags = Dict() + + def start(self): + if self.subapp is None: + print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys()) + print + self.print_subcommands() + self.exit(1) + else: + return self.subapp.start() def launch_new_instance(): """Create and run the IPython cluster.""" app = IPClusterApp() - app.parse_command_line() - cl_config = app.config - app.init_clusterdir() - if app.config_file: - app.load_config_file(app.config_file) - else: - app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) - # command-line should *override* config file, but command-line is necessary - # to determine clusterdir, etc. - app.update_config(cl_config) - - app.to_work_dir() - app.init_launchers() - + app.initialize() app.start() diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index c00e8f9..6cc0044 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -38,7 +38,7 @@ from IPython.parallel import factory from IPython.parallel.apps.clusterdir import ( ClusterDir, - ClusterDirApplication, + ClusterApplication, base_flags # ClusterDirConfigLoader ) @@ -104,7 +104,7 @@ flags.update({ flags.update() -class IPControllerApp(ClusterDirApplication): +class IPControllerApp(ClusterApplication): name = u'ipcontroller' description = _description @@ -361,6 +361,12 @@ class IPControllerApp(ClusterDirApplication): # handler.setLevel(self.log_level) # self.log.addHandler(handler) # # + + def initialize(self, argv=None): + super(IPControllerApp, self).initialize(argv) + self.init_hub() + self.init_schedulers() + def start(self): # Start the subprocesses: self.factory.start() @@ -380,27 +386,13 @@ class IPControllerApp(ClusterDirApplication): self.factory.loop.start() except KeyboardInterrupt: self.log.critical("Interrupted, Exiting...\n") + def launch_new_instance(): """Create and run the IPython controller""" app = IPControllerApp() - app.parse_command_line() - cl_config = app.config - # app.load_config_file() - app.init_clusterdir() - if app.config_file: - app.load_config_file(app.config_file) - else: - app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) - # command-line should *override* config file, but command-line is necessary - # to determine clusterdir, etc. - app.update_config(cl_config) - - app.to_work_dir() - app.init_hub() - app.init_schedulers() - + app.initialize() app.start() diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index e2261e0..bfd74c9 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -23,7 +23,7 @@ import zmq from zmq.eventloop import ioloop from IPython.parallel.apps.clusterdir import ( - ClusterDirApplication, + ClusterApplication, ClusterDir, base_aliases, # ClusterDirConfigLoader @@ -99,13 +99,16 @@ class MPI(Configurable): #----------------------------------------------------------------------------- -class IPEngineApp(ClusterDirApplication): +class IPEngineApp(ClusterApplication): app_name = Unicode(u'ipengine') description = Unicode(_description) default_config_file_name = default_config_file_name classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI]) + auto_create_cluster_dir = Bool(False, config=True, + help="whether to create the cluster_dir if it doesn't exist") + startup_script = Unicode(u'', config=True, help='specify a script to be run at startup') startup_command = Str('', config=True, @@ -262,7 +265,11 @@ class IPEngineApp(ClusterDirApplication): else: mpi = None - + def initialize(self, argv=None): + super(IPEngineApp, self).initialize(argv) + self.init_mpi() + self.init_engine() + def start(self): self.engine.start() try: @@ -274,25 +281,7 @@ class IPEngineApp(ClusterDirApplication): def launch_new_instance(): """Create and run the IPython engine""" app = IPEngineApp() - app.parse_command_line() - cl_config = app.config - app.init_clusterdir() - # app.load_config_file() - # print app.config - if app.config_file: - app.load_config_file(app.config_file) - else: - app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) - - # command-line should *override* config file, but command-line is necessary - # to determine clusterdir, etc. - app.update_config(cl_config) - - # print app.config - app.to_work_dir() - app.init_mpi() - app.init_engine() - print app.config + app.initialize() app.start() diff --git a/IPython/parallel/apps/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py index 76792fb..6be8729 100755 --- a/IPython/parallel/apps/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -21,7 +21,7 @@ import sys import zmq from IPython.parallel.apps.clusterdir import ( - ClusterDirApplication, + ClusterApplication, ClusterDirConfigLoader ) from IPython.parallel.apps.logwatcher import LogWatcher @@ -74,7 +74,7 @@ class IPLoggerAppConfigLoader(ClusterDirConfigLoader): #----------------------------------------------------------------------------- -class IPLoggerApp(ClusterDirApplication): +class IPLoggerApp(ClusterApplication): name = u'iploggerz' description = _description