##// END OF EJS Templates
Merge pull request #3465 from jabooth/htcondor...
Min RK -
r11007:09f25537 merge
parent child Browse files
Show More
@@ -59,8 +59,8 b" default_config_file_name = u'ipcluster_config.py'"
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
62 This command automates the startup of these processes using a wide range of
63 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 66 you will typically do 'ipython profile create mycluster --parallel', then edit
@@ -116,7 +116,7 b' def find_launcher_class(clsname, kind):'
116 116 ==========
117 117 clsname : str
118 118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
120 120 WindowsHPC).
121 121 kind : str
122 122 Either 'EngineSet' or 'Controller'.
@@ -125,7 +125,7 b' def find_launcher_class(clsname, kind):'
125 125 # not a module, presume it's the raw name in apps.launcher
126 126 if kind and kind not in clsname:
127 127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
128 # just 'PBS' or 'MPI' etc prefix:
129 129 clsname = clsname + kind + 'Launcher'
130 130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 131 klass = import_item(clsname)
@@ -287,6 +287,7 b' class IPClusterEngines(BaseParallelApplication):'
287 287 Note that SSH does *not* move the connection files
288 288 around, so you will likely have to do this manually
289 289 unless the machines are on a shared file system.
290 HTCondor : use HTCondor to submit engines to a batch queue
290 291 WindowsHPC : use Windows HPC
291 292
292 293 If you are using one of IPython's builtin launchers, you can specify just the
@@ -488,6 +489,7 b' class IPClusterStart(IPClusterEngines):'
488 489 PBS : use PBS (qsub) to submit the controller to a batch queue
489 490 SGE : use SGE (qsub) to submit the controller to a batch queue
490 491 LSF : use LSF (bsub) to submit the controller to a batch queue
492 HTCondor : use HTCondor to submit the controller to a batch queue
491 493 SSH : use SSH to start the controller
492 494 WindowsHPC : use Windows HPC
493 495
@@ -1019,6 +1019,8 b' class BatchSystemLauncher(BaseLauncher):'
1019 1019 job_id_regexp = CRegExp('', config=True,
1020 1020 help="""A regular expression used to get the job id from the output of the
1021 1021 submit_command.""")
1022 job_id_regexp_group = Integer(0, config=True,
1023 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1022 1024 batch_template = Unicode('', config=True,
1023 1025 help="The string that is the batch script template itself.")
1024 1026 batch_template_file = Unicode(u'', config=True,
@@ -1047,6 +1049,7 b' class BatchSystemLauncher(BaseLauncher):'
1047 1049 batch_file = Unicode(u'')
1048 1050 # the format dict used with batch_template:
1049 1051 context = Dict()
1052
1050 1053 def _context_default(self):
1051 1054 """load the default context with the default values for the basic keys
1052 1055
@@ -1058,7 +1061,6 b' class BatchSystemLauncher(BaseLauncher):'
1058 1061 # the Formatter instance for rendering the templates:
1059 1062 formatter = Instance(EvalFormatter, (), {})
1060 1063
1061
1062 1064 def find_args(self):
1063 1065 return self.submit_command + [self.batch_file]
1064 1066
@@ -1072,7 +1074,7 b' class BatchSystemLauncher(BaseLauncher):'
1072 1074 """Take the output of the submit command and return the job id."""
1073 1075 m = self.job_id_regexp.search(output)
1074 1076 if m is not None:
1075 job_id = m.group()
1077 job_id = m.group(self.job_id_regexp_group)
1076 1078 else:
1077 1079 raise LauncherError("Job id couldn't be determined: %s" % output)
1078 1080 self.job_id = job_id
@@ -1090,28 +1092,32 b' class BatchSystemLauncher(BaseLauncher):'
1090 1092 if not self.batch_template:
1091 1093 # third (last) priority is default_template
1092 1094 self.batch_template = self.default_template
1093
1094 1095 # add jobarray or queue lines to user-specified template
1095 1096 # note that this is *only* when user did not specify a template.
1096 # print self.job_array_regexp.search(self.batch_template)
1097 if not self.job_array_regexp.search(self.batch_template):
1098 self.log.debug("adding job array settings to batch script")
1099 firstline, rest = self.batch_template.split('\n',1)
1100 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1101
1102 # print self.queue_regexp.search(self.batch_template)
1103 if self.queue and not self.queue_regexp.search(self.batch_template):
1104 self.log.debug("adding PBS queue settings to batch script")
1105 firstline, rest = self.batch_template.split('\n',1)
1106 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1107
1097 self._insert_queue_in_script()
1098 self._insert_job_array_in_script()
1108 1099 script_as_string = self.formatter.format(self.batch_template, **self.context)
1109 1100 self.log.debug('Writing batch script: %s', self.batch_file)
1110
1111 1101 with open(self.batch_file, 'w') as f:
1112 1102 f.write(script_as_string)
1113 1103 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1114 1104
1105 def _insert_queue_in_script(self):
1106 """Inserts a queue if required into the batch script.
1107 """
1108 if self.queue and not self.queue_regexp.search(self.batch_template):
1109 self.log.debug("adding PBS queue settings to batch script")
1110 firstline, rest = self.batch_template.split('\n',1)
1111 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1112
1113 def _insert_job_array_in_script(self):
1114 """Inserts a job array if required into the batch script.
1115 """
1116 if not self.job_array_regexp.search(self.batch_template):
1117 self.log.debug("adding job array settings to batch script")
1118 firstline, rest = self.batch_template.split('\n',1)
1119 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1120
1115 1121 def start(self, n):
1116 1122 """Start n copies of the process using a batch system."""
1117 1123 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
@@ -1160,7 +1166,6 b' class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):'
1160 1166 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1161 1167 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1162 1168
1163
1164 1169 def start(self):
1165 1170 """Start the controller by profile or profile_dir."""
1166 1171 return super(PBSControllerLauncher, self).start(1)
@@ -1176,9 +1181,6 b' class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):'
1176 1181 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1177 1182 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1178 1183
1179 def start(self, n):
1180 """Start n engines by profile or profile_dir."""
1181 return super(PBSEngineSetLauncher, self).start(n)
1182 1184
1183 1185 #SGE is very similar to PBS
1184 1186
@@ -1189,6 +1191,7 b' class SGELauncher(PBSLauncher):'
1189 1191 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1190 1192 queue_template = Unicode('#$ -q {queue}')
1191 1193
1194
1192 1195 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1193 1196 """Launch a controller using SGE."""
1194 1197
@@ -1204,6 +1207,7 b' class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):'
1204 1207 """Start the controller by profile or profile_dir."""
1205 1208 return super(SGEControllerLauncher, self).start(1)
1206 1209
1210
1207 1211 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1208 1212 """Launch Engines with SGE"""
1209 1213 batch_file_name = Unicode(u'sge_engines', config=True,
@@ -1214,10 +1218,6 b' class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):'
1214 1218 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1215 1219 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1216 1220
1217 def start(self, n):
1218 """Start n engines by profile or profile_dir."""
1219 return super(SGEEngineSetLauncher, self).start(n)
1220
1221 1221
1222 1222 # LSF launchers
1223 1223
@@ -1283,9 +1283,87 b' class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):'
1283 1283 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1284 1284 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1285 1285
1286 def start(self, n):
1287 """Start n engines by profile or profile_dir."""
1288 return super(LSFEngineSetLauncher, self).start(n)
1286
1287
1288 class HTCondorLauncher(BatchSystemLauncher):
1289 """A BatchSystemLauncher subclass for HTCondor.
1290
1291 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1292 that the python instance but otherwise is very similar to PBS. This is because
1293 HTCondor destroys sys.executable when launching remote processes - a launched
1294 python process depends on sys.executable to effectively evaluate its
1295 module search paths. Without it, regardless of which python interpreter you launch
1296 you will get the to built in module search paths.
1297
1298 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1299 this - the mechanism of shebanged scripts means that the python binary will be
1300 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1301 scripts on the remote node*. This means you need to take care that:
1302 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1303 of the python environment you wish to execute code in having top precedence.
1304 b. This functionality is untested on Windows.
1305
1306 If you need different behavior, consider making you own template.
1307 """
1308
1309 submit_command = List(['condor_submit'], config=True,
1310 help="The HTCondor submit command ['condor_submit']")
1311 delete_command = List(['condor_rm'], config=True,
1312 help="The HTCondor delete command ['condor_rm']")
1313 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1314 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1315 job_id_regexp_group = Integer(1, config=True,
1316 help="""The group we wish to match in job_id_regexp [1]""")
1317
1318 job_array_regexp = CRegExp('queue\W+\$')
1319 job_array_template = Unicode('queue {n}')
1320
1321
1322 def _insert_job_array_in_script(self):
1323 """Inserts a job array if required into the batch script.
1324 """
1325 if not self.job_array_regexp.search(self.batch_template):
1326 self.log.debug("adding job array settings to batch script")
1327 #HTCondor requires that the job array goes at the bottom of the script
1328 self.batch_template = '\n'.join([self.batch_template,
1329 self.job_array_template])
1330
1331 def _insert_queue_in_script(self):
1332 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1333 specified in the script.
1334 """
1335 pass
1336
1337
1338 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1339 """Launch a controller using HTCondor."""
1340
1341 batch_file_name = Unicode(u'htcondor_controller', config=True,
1342 help="batch file name for the controller job.")
1343 default_template = Unicode(r"""
1344 universe = vanilla
1345 executable = ipcontroller
1346 # by default we expect a shared file system
1347 transfer_executable = False
1348 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1349 """)
1350
1351 def start(self):
1352 """Start the controller by profile or profile_dir."""
1353 return super(HTCondorControllerLauncher, self).start(1)
1354
1355
1356 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1357 """Launch Engines using HTCondor"""
1358 batch_file_name = Unicode(u'htcondor_engines', config=True,
1359 help="batch file name for the engine(s) job.")
1360 default_template = Unicode("""
1361 universe = vanilla
1362 executable = ipengine
1363 # by default we expect a shared file system
1364 transfer_executable = False
1365 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1366 """)
1289 1367
1290 1368
1291 1369 #-----------------------------------------------------------------------------
@@ -1354,6 +1432,10 b' lsf_launchers = ['
1354 1432 LSFControllerLauncher,
1355 1433 LSFEngineSetLauncher,
1356 1434 ]
1435 htcondor_launchers = [
1436 HTCondorLauncher,
1437 HTCondorControllerLauncher,
1438 HTCondorEngineSetLauncher,
1439 ]
1357 1440 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1358 + pbs_launchers + sge_launchers + lsf_launchers
1359
1441 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
@@ -129,6 +129,9 b' class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):'
129 129 class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
130 130 launcher_class = launcher.LSFControllerLauncher
131 131
132 class TestHTCondorControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
133 launcher_class = launcher.HTCondorControllerLauncher
134
132 135 class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase):
133 136 launcher_class = launcher.SSHControllerLauncher
134 137
@@ -155,6 +158,9 b' class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):'
155 158 class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
156 159 launcher_class = launcher.LSFEngineSetLauncher
157 160
161 class TestHTCondorEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
162 launcher_class = launcher.HTCondorEngineSetLauncher
163
158 164 class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase):
159 165 launcher_class = launcher.SSHEngineSetLauncher
160 166
General Comments 0
You need to be logged in to leave comments. Login now