From 09f255371d05e359df4741e0a0d8933a1898941c 2013-06-27 18:10:49 From: Min RK <benjaminrk@gmail.com> Date: 2013-06-27 18:10:49 Subject: [PATCH] Merge pull request #3465 from jabooth/htcondor Add HTCondor bindings to IPython.parallel --- diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index 30a7691..9705e8f 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -59,8 +59,8 @@ default_config_file_name = u'ipcluster_config.py' _description = """Start an IPython cluster for parallel computing. An IPython cluster consists of 1 controller and 1 or more engines. -This command automates the startup of these processes using a wide -range of startup methods (SSH, local processes, PBS, mpiexec, +This command automates the startup of these processes using a wide range of +startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor, Windows HPC Server 2008). To start a cluster with 4 engines on your local host simply do 'ipcluster start --n=4'. For more complex usage you will typically do 'ipython profile create mycluster --parallel', then edit @@ -116,7 +116,7 @@ def find_launcher_class(clsname, kind): ========== clsname : str The full name of the launcher class, either with or without the - module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, + module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor WindowsHPC). kind : str Either 'EngineSet' or 'Controller'. @@ -125,7 +125,7 @@ def find_launcher_class(clsname, kind): # not a module, presume it's the raw name in apps.launcher if kind and kind not in clsname: # doesn't match necessary full class name, assume it's - # just 'PBS' or 'MPI' prefix: + # just 'PBS' or 'MPI' etc prefix: clsname = clsname + kind + 'Launcher' clsname = 'IPython.parallel.apps.launcher.'+clsname klass = import_item(clsname) @@ -287,6 +287,7 @@ class IPClusterEngines(BaseParallelApplication): Note that SSH does *not* move the connection files around, so you will likely have to do this manually unless the machines are on a shared file system. + HTCondor : use HTCondor to submit engines to a batch queue WindowsHPC : use Windows HPC If you are using one of IPython's builtin launchers, you can specify just the @@ -488,6 +489,7 @@ class IPClusterStart(IPClusterEngines): PBS : use PBS (qsub) to submit the controller to a batch queue SGE : use SGE (qsub) to submit the controller to a batch queue LSF : use LSF (bsub) to submit the controller to a batch queue + HTCondor : use HTCondor to submit the controller to a batch queue SSH : use SSH to start the controller WindowsHPC : use Windows HPC diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index e939596..b0f00d7 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -1019,6 +1019,8 @@ class BatchSystemLauncher(BaseLauncher): job_id_regexp = CRegExp('', config=True, help="""A regular expression used to get the job id from the output of the submit_command.""") + job_id_regexp_group = Integer(0, config=True, + help="""The group we wish to match in job_id_regexp (0 to match all)""") batch_template = Unicode('', config=True, help="The string that is the batch script template itself.") batch_template_file = Unicode(u'', config=True, @@ -1047,6 +1049,7 @@ class BatchSystemLauncher(BaseLauncher): batch_file = Unicode(u'') # the format dict used with batch_template: context = Dict() + def _context_default(self): """load the default context with the default values for the basic keys @@ -1058,7 +1061,6 @@ class BatchSystemLauncher(BaseLauncher): # the Formatter instance for rendering the templates: formatter = Instance(EvalFormatter, (), {}) - def find_args(self): return self.submit_command + [self.batch_file] @@ -1072,7 +1074,7 @@ class BatchSystemLauncher(BaseLauncher): """Take the output of the submit command and return the job id.""" m = self.job_id_regexp.search(output) if m is not None: - job_id = m.group() + job_id = m.group(self.job_id_regexp_group) else: raise LauncherError("Job id couldn't be determined: %s" % output) self.job_id = job_id @@ -1090,28 +1092,32 @@ class BatchSystemLauncher(BaseLauncher): if not self.batch_template: # third (last) priority is default_template self.batch_template = self.default_template - # add jobarray or queue lines to user-specified template # note that this is *only* when user did not specify a template. - # print self.job_array_regexp.search(self.batch_template) - if not self.job_array_regexp.search(self.batch_template): - self.log.debug("adding job array settings to batch script") - firstline, rest = self.batch_template.split('\n',1) - self.batch_template = u'\n'.join([firstline, self.job_array_template, rest]) - - # print self.queue_regexp.search(self.batch_template) - if self.queue and not self.queue_regexp.search(self.batch_template): - self.log.debug("adding PBS queue settings to batch script") - firstline, rest = self.batch_template.split('\n',1) - self.batch_template = u'\n'.join([firstline, self.queue_template, rest]) - + self._insert_queue_in_script() + self._insert_job_array_in_script() script_as_string = self.formatter.format(self.batch_template, **self.context) self.log.debug('Writing batch script: %s', self.batch_file) - with open(self.batch_file, 'w') as f: f.write(script_as_string) os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR) + def _insert_queue_in_script(self): + """Inserts a queue if required into the batch script. + """ + if self.queue and not self.queue_regexp.search(self.batch_template): + self.log.debug("adding PBS queue settings to batch script") + firstline, rest = self.batch_template.split('\n',1) + self.batch_template = u'\n'.join([firstline, self.queue_template, rest]) + + def _insert_job_array_in_script(self): + """Inserts a job array if required into the batch script. + """ + if not self.job_array_regexp.search(self.batch_template): + self.log.debug("adding job array settings to batch script") + firstline, rest = self.batch_template.split('\n',1) + self.batch_template = u'\n'.join([firstline, self.job_array_template, rest]) + def start(self, n): """Start n copies of the process using a batch system.""" self.log.debug("Starting %s: %r", self.__class__.__name__, self.args) @@ -1160,7 +1166,6 @@ class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin): %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv)))) - def start(self): """Start the controller by profile or profile_dir.""" return super(PBSControllerLauncher, self).start(1) @@ -1176,9 +1181,6 @@ class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin): %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote,ipengine_cmd_argv)))) - def start(self, n): - """Start n engines by profile or profile_dir.""" - return super(PBSEngineSetLauncher, self).start(n) #SGE is very similar to PBS @@ -1189,6 +1191,7 @@ class SGELauncher(PBSLauncher): queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+') queue_template = Unicode('#$ -q {queue}') + class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin): """Launch a controller using SGE.""" @@ -1204,6 +1207,7 @@ class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin): """Start the controller by profile or profile_dir.""" return super(SGEControllerLauncher, self).start(1) + class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin): """Launch Engines with SGE""" batch_file_name = Unicode(u'sge_engines', config=True, @@ -1214,10 +1218,6 @@ class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin): %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipengine_cmd_argv)))) - def start(self, n): - """Start n engines by profile or profile_dir.""" - return super(SGEEngineSetLauncher, self).start(n) - # LSF launchers @@ -1283,9 +1283,87 @@ class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin): %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}" """%(' '.join(map(pipes.quote, ipengine_cmd_argv)))) - def start(self, n): - """Start n engines by profile or profile_dir.""" - return super(LSFEngineSetLauncher, self).start(n) + + +class HTCondorLauncher(BatchSystemLauncher): + """A BatchSystemLauncher subclass for HTCondor. + + HTCondor requires that we launch the ipengine/ipcontroller scripts rather + that the python instance but otherwise is very similar to PBS. This is because + HTCondor destroys sys.executable when launching remote processes - a launched + python process depends on sys.executable to effectively evaluate its + module search paths. Without it, regardless of which python interpreter you launch + you will get the to built in module search paths. + + We use the ip{cluster, engine, controller} scripts as our executable to circumvent + this - the mechanism of shebanged scripts means that the python binary will be + launched with argv[0] set to the *location of the ip{cluster, engine, controller} + scripts on the remote node*. This means you need to take care that: + a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller + of the python environment you wish to execute code in having top precedence. + b. This functionality is untested on Windows. + + If you need different behavior, consider making you own template. + """ + + submit_command = List(['condor_submit'], config=True, + help="The HTCondor submit command ['condor_submit']") + delete_command = List(['condor_rm'], config=True, + help="The HTCondor delete command ['condor_rm']") + job_id_regexp = CRegExp(r'(\d+)\.$', config=True, + help="Regular expression for identifying the job ID [r'(\d+)\.$']") + job_id_regexp_group = Integer(1, config=True, + help="""The group we wish to match in job_id_regexp [1]""") + + job_array_regexp = CRegExp('queue\W+\$') + job_array_template = Unicode('queue {n}') + + + def _insert_job_array_in_script(self): + """Inserts a job array if required into the batch script. + """ + if not self.job_array_regexp.search(self.batch_template): + self.log.debug("adding job array settings to batch script") + #HTCondor requires that the job array goes at the bottom of the script + self.batch_template = '\n'.join([self.batch_template, + self.job_array_template]) + + def _insert_queue_in_script(self): + """AFAIK, HTCondor doesn't have a concept of multiple queues that can be + specified in the script. + """ + pass + + +class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin): + """Launch a controller using HTCondor.""" + + batch_file_name = Unicode(u'htcondor_controller', config=True, + help="batch file name for the controller job.") + default_template = Unicode(r""" +universe = vanilla +executable = ipcontroller +# by default we expect a shared file system +transfer_executable = False +arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}' +""") + + def start(self): + """Start the controller by profile or profile_dir.""" + return super(HTCondorControllerLauncher, self).start(1) + + +class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin): + """Launch Engines using HTCondor""" + batch_file_name = Unicode(u'htcondor_engines', config=True, + help="batch file name for the engine(s) job.") + default_template = Unicode(""" +universe = vanilla +executable = ipengine +# by default we expect a shared file system +transfer_executable = False +arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'" +""") #----------------------------------------------------------------------------- @@ -1354,6 +1432,10 @@ lsf_launchers = [ LSFControllerLauncher, LSFEngineSetLauncher, ] +htcondor_launchers = [ + HTCondorLauncher, + HTCondorControllerLauncher, + HTCondorEngineSetLauncher, +] all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\ - + pbs_launchers + sge_launchers + lsf_launchers - + + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers diff --git a/IPython/parallel/tests/test_launcher.py b/IPython/parallel/tests/test_launcher.py index 6035a8f..30421cc 100644 --- a/IPython/parallel/tests/test_launcher.py +++ b/IPython/parallel/tests/test_launcher.py @@ -129,6 +129,9 @@ class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase): class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase): launcher_class = launcher.LSFControllerLauncher +class TestHTCondorControllerLauncher(BatchTest, ControllerLauncherTest, TestCase): + launcher_class = launcher.HTCondorControllerLauncher + class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase): launcher_class = launcher.SSHControllerLauncher @@ -155,6 +158,9 @@ class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase): class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase): launcher_class = launcher.LSFEngineSetLauncher +class TestHTCondorEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase): + launcher_class = launcher.HTCondorEngineSetLauncher + class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase): launcher_class = launcher.SSHEngineSetLauncher