From 09f255371d05e359df4741e0a0d8933a1898941c 2013-06-27 18:10:49
From: Min RK <>
Date: 2013-06-27 18:10:49
Subject: [PATCH] Merge pull request #3465 from jabooth/htcondor

Add HTCondor bindings to IPython.parallel

diff --git a/IPython/parallel/apps/ b/IPython/parallel/apps/
index 30a7691..9705e8f 100755
--- a/IPython/parallel/apps/
+++ b/IPython/parallel/apps/
@@ -59,8 +59,8 @@ default_config_file_name = u''
 _description = """Start an IPython cluster for parallel computing.
 An IPython cluster consists of 1 controller and 1 or more engines.
-This command automates the startup of these processes using a wide
-range of startup methods (SSH, local processes, PBS, mpiexec,
+This command automates the startup of these processes using a wide range of
+startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
 Windows HPC Server 2008). To start a cluster with 4 engines on your
 local host simply do 'ipcluster start --n=4'. For more complex usage
 you will typically do 'ipython profile create mycluster --parallel', then edit
@@ -116,7 +116,7 @@ def find_launcher_class(clsname, kind):
     clsname : str
         The full name of the launcher class, either with or without the
-        module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
+        module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
     kind : str
         Either 'EngineSet' or 'Controller'.
@@ -125,7 +125,7 @@ def find_launcher_class(clsname, kind):
         # not a module, presume it's the raw name in apps.launcher
         if kind and kind not in clsname:
             # doesn't match necessary full class name, assume it's
-            # just 'PBS' or 'MPI' prefix:
+            # just 'PBS' or 'MPI' etc prefix:
             clsname = clsname + kind + 'Launcher'
         clsname = 'IPython.parallel.apps.launcher.'+clsname
     klass = import_item(clsname)
@@ -287,6 +287,7 @@ class IPClusterEngines(BaseParallelApplication):
                         Note that SSH does *not* move the connection files
                         around, so you will likely have to do this manually
                         unless the machines are on a shared file system.
+            HTCondor : use HTCondor to submit engines to a batch queue
             WindowsHPC : use Windows HPC
         If you are using one of IPython's builtin launchers, you can specify just the
@@ -488,6 +489,7 @@ class IPClusterStart(IPClusterEngines):
             PBS : use PBS (qsub) to submit the controller to a batch queue
             SGE : use SGE (qsub) to submit the controller to a batch queue
             LSF : use LSF (bsub) to submit the controller to a batch queue
+            HTCondor : use HTCondor to submit the controller to a batch queue
             SSH : use SSH to start the controller
             WindowsHPC : use Windows HPC
diff --git a/IPython/parallel/apps/ b/IPython/parallel/apps/
index e939596..b0f00d7 100644
--- a/IPython/parallel/apps/
+++ b/IPython/parallel/apps/
@@ -1019,6 +1019,8 @@ class BatchSystemLauncher(BaseLauncher):
     job_id_regexp = CRegExp('', config=True,
         help="""A regular expression used to get the job id from the output of the
+    job_id_regexp_group = Integer(0, config=True,
+        help="""The group we wish to match in job_id_regexp (0 to match all)""")
     batch_template = Unicode('', config=True,
         help="The string that is the batch script template itself.")
     batch_template_file = Unicode(u'', config=True,
@@ -1047,6 +1049,7 @@ class BatchSystemLauncher(BaseLauncher):
     batch_file = Unicode(u'')
     # the format dict used with batch_template:
     context = Dict()
     def _context_default(self):
         """load the default context with the default values for the basic keys
@@ -1058,7 +1061,6 @@ class BatchSystemLauncher(BaseLauncher):
     # the Formatter instance for rendering the templates:
     formatter = Instance(EvalFormatter, (), {})
     def find_args(self):
         return self.submit_command + [self.batch_file]
@@ -1072,7 +1074,7 @@ class BatchSystemLauncher(BaseLauncher):
         """Take the output of the submit command and return the job id."""
         m =
         if m is not None:
-            job_id =
+            job_id =
             raise LauncherError("Job id couldn't be determined: %s" % output)
         self.job_id = job_id
@@ -1090,28 +1092,32 @@ class BatchSystemLauncher(BaseLauncher):
         if not self.batch_template:
             # third (last) priority is default_template
             self.batch_template = self.default_template
             # add jobarray or queue lines to user-specified template
             # note that this is *only* when user did not specify a template.
-            # print
-            if not
-                self.log.debug("adding job array settings to batch script")
-                firstline, rest = self.batch_template.split('\n',1)
-                self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
-            # print
-            if self.queue and not
-                self.log.debug("adding PBS queue settings to batch script")
-                firstline, rest = self.batch_template.split('\n',1)
-                self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
+            self._insert_queue_in_script()
+            self._insert_job_array_in_script()
         script_as_string = self.formatter.format(self.batch_template, **self.context)
         self.log.debug('Writing batch script: %s', self.batch_file)
         with open(self.batch_file, 'w') as f:
         os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+    def _insert_queue_in_script(self):
+        """Inserts a queue if required into the batch script.
+        """
+        if self.queue and not
+            self.log.debug("adding PBS queue settings to batch script")
+            firstline, rest = self.batch_template.split('\n',1)
+            self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
+    def _insert_job_array_in_script(self):
+        """Inserts a job array if required into the batch script.
+        """
+        if not
+            self.log.debug("adding job array settings to batch script")
+            firstline, rest = self.batch_template.split('\n',1)
+            self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
     def start(self, n):
         """Start n copies of the process using a batch system."""
         self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
@@ -1160,7 +1166,6 @@ class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
     def start(self):
         """Start the controller by profile or profile_dir."""
         return super(PBSControllerLauncher, self).start(1)
@@ -1176,9 +1181,6 @@ class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
-    def start(self, n):
-        """Start n engines by profile or profile_dir."""
-        return super(PBSEngineSetLauncher, self).start(n)
 #SGE is very similar to PBS
@@ -1189,6 +1191,7 @@ class SGELauncher(PBSLauncher):
     queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
     queue_template = Unicode('#$ -q {queue}')
 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
     """Launch a controller using SGE."""
@@ -1204,6 +1207,7 @@ class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
         """Start the controller by profile or profile_dir."""
         return super(SGEControllerLauncher, self).start(1)
 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
     """Launch Engines with SGE"""
     batch_file_name = Unicode(u'sge_engines', config=True,
@@ -1214,10 +1218,6 @@ class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
-    def start(self, n):
-        """Start n engines by profile or profile_dir."""
-        return super(SGEEngineSetLauncher, self).start(n)
 # LSF launchers
@@ -1283,9 +1283,87 @@ class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
     %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
     """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
-    def start(self, n):
-        """Start n engines by profile or profile_dir."""
-        return super(LSFEngineSetLauncher, self).start(n)
+class HTCondorLauncher(BatchSystemLauncher):
+    """A BatchSystemLauncher subclass for HTCondor.
+    HTCondor requires that we launch the ipengine/ipcontroller scripts rather
+    that the python instance but otherwise is very similar to PBS.  This is because 
+    HTCondor destroys sys.executable when launching remote processes - a launched 
+    python process depends on sys.executable to effectively evaluate its
+    module search paths. Without it, regardless of which python interpreter you launch
+    you will get the to built in module search paths.
+    We use the ip{cluster, engine, controller} scripts as our executable to circumvent 
+    this - the mechanism of shebanged scripts means that the python binary will be 
+    launched with argv[0] set to the *location of the ip{cluster, engine, controller} 
+    scripts on the remote node*. This means you need to take care that:
+      a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
+         of the python environment you wish to execute code in having top precedence.
+      b. This functionality is untested on Windows.
+    If you need different behavior, consider making you own template.
+    """
+    submit_command = List(['condor_submit'], config=True,
+        help="The HTCondor submit command ['condor_submit']")
+    delete_command = List(['condor_rm'], config=True,
+        help="The HTCondor delete command ['condor_rm']")
+    job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
+        help="Regular expression for identifying the job ID [r'(\d+)\.$']")
+    job_id_regexp_group = Integer(1, config=True,
+        help="""The group we wish to match in job_id_regexp [1]""")
+    job_array_regexp = CRegExp('queue\W+\$')
+    job_array_template = Unicode('queue {n}')
+    def _insert_job_array_in_script(self):
+        """Inserts a job array if required into the batch script.
+        """
+        if not
+            self.log.debug("adding job array settings to batch script")
+            #HTCondor requires that the job array goes at the bottom of the script
+            self.batch_template = '\n'.join([self.batch_template,
+                self.job_array_template])
+    def _insert_queue_in_script(self):
+        """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
+        specified in the script.
+        """
+        pass
+class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
+    """Launch a controller using HTCondor."""
+    batch_file_name = Unicode(u'htcondor_controller', config=True,
+                              help="batch file name for the controller job.")
+    default_template = Unicode(r"""
+universe        = vanilla
+executable      = ipcontroller
+# by default we expect a shared file system
+transfer_executable = False
+arguments       = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
+    def start(self):
+        """Start the controller by profile or profile_dir."""
+        return super(HTCondorControllerLauncher, self).start(1)
+class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
+    """Launch Engines using HTCondor"""
+    batch_file_name = Unicode(u'htcondor_engines', config=True,
+                              help="batch file name for the engine(s) job.")
+    default_template = Unicode("""
+universe        = vanilla
+executable      = ipengine
+# by default we expect a shared file system
+transfer_executable = False
+arguments       = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
@@ -1354,6 +1432,10 @@ lsf_launchers = [
+htcondor_launchers = [
+    HTCondorLauncher,
+    HTCondorControllerLauncher,
+    HTCondorEngineSetLauncher,
 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
-                + pbs_launchers + sge_launchers + lsf_launchers
+                + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
diff --git a/IPython/parallel/tests/ b/IPython/parallel/tests/
index 6035a8f..30421cc 100644
--- a/IPython/parallel/tests/
+++ b/IPython/parallel/tests/
@@ -129,6 +129,9 @@ class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
 class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
     launcher_class = launcher.LSFControllerLauncher
+class TestHTCondorControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
+    launcher_class = launcher.HTCondorControllerLauncher
 class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase):
     launcher_class = launcher.SSHControllerLauncher
@@ -155,6 +158,9 @@ class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
 class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
     launcher_class = launcher.LSFEngineSetLauncher
+class TestHTCondorEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
+    launcher_class = launcher.HTCondorEngineSetLauncher
 class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase):
     launcher_class = launcher.SSHEngineSetLauncher