##// END OF EJS Templates
rename Condor -> HTCondor in all instances
James Booth -
Show More
@@ -1,620 +1,620
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag, catch_config_error
34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide range of
62 This command automates the startup of these processes using a wide range of
63 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, Condor,
63 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Utilities
109 # Utilities
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111
111
112 def find_launcher_class(clsname, kind):
112 def find_launcher_class(clsname, kind):
113 """Return a launcher for a given clsname and kind.
113 """Return a launcher for a given clsname and kind.
114
114
115 Parameters
115 Parameters
116 ==========
116 ==========
117 clsname : str
117 clsname : str
118 The full name of the launcher class, either with or without the
118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, Condor
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
120 WindowsHPC).
120 WindowsHPC).
121 kind : str
121 kind : str
122 Either 'EngineSet' or 'Controller'.
122 Either 'EngineSet' or 'Controller'.
123 """
123 """
124 if '.' not in clsname:
124 if '.' not in clsname:
125 # not a module, presume it's the raw name in apps.launcher
125 # not a module, presume it's the raw name in apps.launcher
126 if kind and kind not in clsname:
126 if kind and kind not in clsname:
127 # doesn't match necessary full class name, assume it's
127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' etc prefix:
128 # just 'PBS' or 'MPI' etc prefix:
129 clsname = clsname + kind + 'Launcher'
129 clsname = clsname + kind + 'Launcher'
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 klass = import_item(clsname)
131 klass = import_item(clsname)
132 return klass
132 return klass
133
133
134 #-----------------------------------------------------------------------------
134 #-----------------------------------------------------------------------------
135 # Main application
135 # Main application
136 #-----------------------------------------------------------------------------
136 #-----------------------------------------------------------------------------
137
137
138 start_help = """Start an IPython cluster for parallel computing
138 start_help = """Start an IPython cluster for parallel computing
139
139
140 Start an ipython cluster by its profile name or cluster
140 Start an ipython cluster by its profile name or cluster
141 directory. Cluster directories contain configuration, log and
141 directory. Cluster directories contain configuration, log and
142 security related files and are named using the convention
142 security related files and are named using the convention
143 'profile_<name>' and should be creating using the 'start'
143 'profile_<name>' and should be creating using the 'start'
144 subcommand of 'ipcluster'. If your cluster directory is in
144 subcommand of 'ipcluster'. If your cluster directory is in
145 the cwd or the ipython directory, you can simply refer to it
145 the cwd or the ipython directory, you can simply refer to it
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
147 otherwise use the 'profile-dir' option.
147 otherwise use the 'profile-dir' option.
148 """
148 """
149 stop_help = """Stop a running IPython cluster
149 stop_help = """Stop a running IPython cluster
150
150
151 Stop a running ipython cluster by its profile name or cluster
151 Stop a running ipython cluster by its profile name or cluster
152 directory. Cluster directories are named using the convention
152 directory. Cluster directories are named using the convention
153 'profile_<name>'. If your cluster directory is in
153 'profile_<name>'. If your cluster directory is in
154 the cwd or the ipython directory, you can simply refer to it
154 the cwd or the ipython directory, you can simply refer to it
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
156 use the '--profile-dir' option.
156 use the '--profile-dir' option.
157 """
157 """
158 engines_help = """Start engines connected to an existing IPython cluster
158 engines_help = """Start engines connected to an existing IPython cluster
159
159
160 Start one or more engines to connect to an existing Cluster
160 Start one or more engines to connect to an existing Cluster
161 by profile name or cluster directory.
161 by profile name or cluster directory.
162 Cluster directories contain configuration, log and
162 Cluster directories contain configuration, log and
163 security related files and are named using the convention
163 security related files and are named using the convention
164 'profile_<name>' and should be creating using the 'start'
164 'profile_<name>' and should be creating using the 'start'
165 subcommand of 'ipcluster'. If your cluster directory is in
165 subcommand of 'ipcluster'. If your cluster directory is in
166 the cwd or the ipython directory, you can simply refer to it
166 the cwd or the ipython directory, you can simply refer to it
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
168 otherwise use the 'profile-dir' option.
168 otherwise use the 'profile-dir' option.
169 """
169 """
170 stop_aliases = dict(
170 stop_aliases = dict(
171 signal='IPClusterStop.signal',
171 signal='IPClusterStop.signal',
172 )
172 )
173 stop_aliases.update(base_aliases)
173 stop_aliases.update(base_aliases)
174
174
175 class IPClusterStop(BaseParallelApplication):
175 class IPClusterStop(BaseParallelApplication):
176 name = u'ipcluster'
176 name = u'ipcluster'
177 description = stop_help
177 description = stop_help
178 examples = _stop_examples
178 examples = _stop_examples
179 config_file_name = Unicode(default_config_file_name)
179 config_file_name = Unicode(default_config_file_name)
180
180
181 signal = Integer(signal.SIGINT, config=True,
181 signal = Integer(signal.SIGINT, config=True,
182 help="signal to use for stopping processes.")
182 help="signal to use for stopping processes.")
183
183
184 aliases = Dict(stop_aliases)
184 aliases = Dict(stop_aliases)
185
185
186 def start(self):
186 def start(self):
187 """Start the app for the stop subcommand."""
187 """Start the app for the stop subcommand."""
188 try:
188 try:
189 pid = self.get_pid_from_file()
189 pid = self.get_pid_from_file()
190 except PIDFileError:
190 except PIDFileError:
191 self.log.critical(
191 self.log.critical(
192 'Could not read pid file, cluster is probably not running.'
192 'Could not read pid file, cluster is probably not running.'
193 )
193 )
194 # Here I exit with a unusual exit status that other processes
194 # Here I exit with a unusual exit status that other processes
195 # can watch for to learn how I existed.
195 # can watch for to learn how I existed.
196 self.remove_pid_file()
196 self.remove_pid_file()
197 self.exit(ALREADY_STOPPED)
197 self.exit(ALREADY_STOPPED)
198
198
199 if not self.check_pid(pid):
199 if not self.check_pid(pid):
200 self.log.critical(
200 self.log.critical(
201 'Cluster [pid=%r] is not running.' % pid
201 'Cluster [pid=%r] is not running.' % pid
202 )
202 )
203 self.remove_pid_file()
203 self.remove_pid_file()
204 # Here I exit with a unusual exit status that other processes
204 # Here I exit with a unusual exit status that other processes
205 # can watch for to learn how I existed.
205 # can watch for to learn how I existed.
206 self.exit(ALREADY_STOPPED)
206 self.exit(ALREADY_STOPPED)
207
207
208 elif os.name=='posix':
208 elif os.name=='posix':
209 sig = self.signal
209 sig = self.signal
210 self.log.info(
210 self.log.info(
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
212 )
212 )
213 try:
213 try:
214 os.kill(pid, sig)
214 os.kill(pid, sig)
215 except OSError:
215 except OSError:
216 self.log.error("Stopping cluster failed, assuming already dead.",
216 self.log.error("Stopping cluster failed, assuming already dead.",
217 exc_info=True)
217 exc_info=True)
218 self.remove_pid_file()
218 self.remove_pid_file()
219 elif os.name=='nt':
219 elif os.name=='nt':
220 try:
220 try:
221 # kill the whole tree
221 # kill the whole tree
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
223 except (CalledProcessError, OSError):
223 except (CalledProcessError, OSError):
224 self.log.error("Stopping cluster failed, assuming already dead.",
224 self.log.error("Stopping cluster failed, assuming already dead.",
225 exc_info=True)
225 exc_info=True)
226 self.remove_pid_file()
226 self.remove_pid_file()
227
227
228 engine_aliases = {}
228 engine_aliases = {}
229 engine_aliases.update(base_aliases)
229 engine_aliases.update(base_aliases)
230 engine_aliases.update(dict(
230 engine_aliases.update(dict(
231 n='IPClusterEngines.n',
231 n='IPClusterEngines.n',
232 engines = 'IPClusterEngines.engine_launcher_class',
232 engines = 'IPClusterEngines.engine_launcher_class',
233 daemonize = 'IPClusterEngines.daemonize',
233 daemonize = 'IPClusterEngines.daemonize',
234 ))
234 ))
235 engine_flags = {}
235 engine_flags = {}
236 engine_flags.update(base_flags)
236 engine_flags.update(base_flags)
237
237
238 engine_flags.update(dict(
238 engine_flags.update(dict(
239 daemonize=(
239 daemonize=(
240 {'IPClusterEngines' : {'daemonize' : True}},
240 {'IPClusterEngines' : {'daemonize' : True}},
241 """run the cluster into the background (not available on Windows)""",
241 """run the cluster into the background (not available on Windows)""",
242 )
242 )
243 ))
243 ))
244 class IPClusterEngines(BaseParallelApplication):
244 class IPClusterEngines(BaseParallelApplication):
245
245
246 name = u'ipcluster'
246 name = u'ipcluster'
247 description = engines_help
247 description = engines_help
248 examples = _engines_examples
248 examples = _engines_examples
249 usage = None
249 usage = None
250 config_file_name = Unicode(default_config_file_name)
250 config_file_name = Unicode(default_config_file_name)
251 default_log_level = logging.INFO
251 default_log_level = logging.INFO
252 classes = List()
252 classes = List()
253 def _classes_default(self):
253 def _classes_default(self):
254 from IPython.parallel.apps import launcher
254 from IPython.parallel.apps import launcher
255 launchers = launcher.all_launchers
255 launchers = launcher.all_launchers
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
257 return [ProfileDir]+eslaunchers
257 return [ProfileDir]+eslaunchers
258
258
259 n = Integer(num_cpus(), config=True,
259 n = Integer(num_cpus(), config=True,
260 help="""The number of engines to start. The default is to use one for each
260 help="""The number of engines to start. The default is to use one for each
261 CPU on your machine""")
261 CPU on your machine""")
262
262
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
264 def _engine_launcher_changed(self, name, old, new):
264 def _engine_launcher_changed(self, name, old, new):
265 if isinstance(new, basestring):
265 if isinstance(new, basestring):
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
267 " use engine_launcher_class" % self.__class__.__name__)
267 " use engine_launcher_class" % self.__class__.__name__)
268 self.engine_launcher_class = new
268 self.engine_launcher_class = new
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
270 config=True,
270 config=True,
271 help="""The class for launching a set of Engines. Change this value
271 help="""The class for launching a set of Engines. Change this value
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
273 Each launcher class has its own set of configuration options, for making sure
273 Each launcher class has its own set of configuration options, for making sure
274 it will work in your environment.
274 it will work in your environment.
275
275
276 You can also write your own launcher, and specify it's absolute import path,
276 You can also write your own launcher, and specify it's absolute import path,
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
277 as in 'mymodule.launcher.FTLEnginesLauncher`.
278
278
279 IPython's bundled examples include:
279 IPython's bundled examples include:
280
280
281 Local : start engines locally as subprocesses [default]
281 Local : start engines locally as subprocesses [default]
282 MPI : use mpiexec to launch engines in an MPI environment
282 MPI : use mpiexec to launch engines in an MPI environment
283 PBS : use PBS (qsub) to submit engines to a batch queue
283 PBS : use PBS (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
284 SGE : use SGE (qsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
285 LSF : use LSF (bsub) to submit engines to a batch queue
286 SSH : use SSH to start the controller
286 SSH : use SSH to start the controller
287 Note that SSH does *not* move the connection files
287 Note that SSH does *not* move the connection files
288 around, so you will likely have to do this manually
288 around, so you will likely have to do this manually
289 unless the machines are on a shared file system.
289 unless the machines are on a shared file system.
290 Condor : use HTCondor to submit engines to a batch queue
290 HTCondor : use HTCondor to submit engines to a batch queue
291 WindowsHPC : use Windows HPC
291 WindowsHPC : use Windows HPC
292
292
293 If you are using one of IPython's builtin launchers, you can specify just the
293 If you are using one of IPython's builtin launchers, you can specify just the
294 prefix, e.g:
294 prefix, e.g:
295
295
296 c.IPClusterEngines.engine_launcher_class = 'SSH'
296 c.IPClusterEngines.engine_launcher_class = 'SSH'
297
297
298 or:
298 or:
299
299
300 ipcluster start --engines=MPI
300 ipcluster start --engines=MPI
301
301
302 """
302 """
303 )
303 )
304 daemonize = Bool(False, config=True,
304 daemonize = Bool(False, config=True,
305 help="""Daemonize the ipcluster program. This implies --log-to-file.
305 help="""Daemonize the ipcluster program. This implies --log-to-file.
306 Not available on Windows.
306 Not available on Windows.
307 """)
307 """)
308
308
309 def _daemonize_changed(self, name, old, new):
309 def _daemonize_changed(self, name, old, new):
310 if new:
310 if new:
311 self.log_to_file = True
311 self.log_to_file = True
312
312
313 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
313 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
314 _stopping = False
314 _stopping = False
315
315
316 aliases = Dict(engine_aliases)
316 aliases = Dict(engine_aliases)
317 flags = Dict(engine_flags)
317 flags = Dict(engine_flags)
318
318
319 @catch_config_error
319 @catch_config_error
320 def initialize(self, argv=None):
320 def initialize(self, argv=None):
321 super(IPClusterEngines, self).initialize(argv)
321 super(IPClusterEngines, self).initialize(argv)
322 self.init_signal()
322 self.init_signal()
323 self.init_launchers()
323 self.init_launchers()
324
324
325 def init_launchers(self):
325 def init_launchers(self):
326 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
326 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
327
327
328 def init_signal(self):
328 def init_signal(self):
329 # Setup signals
329 # Setup signals
330 signal.signal(signal.SIGINT, self.sigint_handler)
330 signal.signal(signal.SIGINT, self.sigint_handler)
331
331
332 def build_launcher(self, clsname, kind=None):
332 def build_launcher(self, clsname, kind=None):
333 """import and instantiate a Launcher based on importstring"""
333 """import and instantiate a Launcher based on importstring"""
334 try:
334 try:
335 klass = find_launcher_class(clsname, kind)
335 klass = find_launcher_class(clsname, kind)
336 except (ImportError, KeyError):
336 except (ImportError, KeyError):
337 self.log.fatal("Could not import launcher class: %r"%clsname)
337 self.log.fatal("Could not import launcher class: %r"%clsname)
338 self.exit(1)
338 self.exit(1)
339
339
340 launcher = klass(
340 launcher = klass(
341 work_dir=u'.', config=self.config, log=self.log,
341 work_dir=u'.', config=self.config, log=self.log,
342 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
342 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
343 )
343 )
344 return launcher
344 return launcher
345
345
346 def engines_started_ok(self):
346 def engines_started_ok(self):
347 self.log.info("Engines appear to have started successfully")
347 self.log.info("Engines appear to have started successfully")
348 self.early_shutdown = 0
348 self.early_shutdown = 0
349
349
350 def start_engines(self):
350 def start_engines(self):
351 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
351 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
352 n = getattr(self.engine_launcher, 'engine_count', self.n)
352 n = getattr(self.engine_launcher, 'engine_count', self.n)
353 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
353 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
354 self.engine_launcher.start(self.n)
354 self.engine_launcher.start(self.n)
355 self.engine_launcher.on_stop(self.engines_stopped_early)
355 self.engine_launcher.on_stop(self.engines_stopped_early)
356 if self.early_shutdown:
356 if self.early_shutdown:
357 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
357 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
358
358
359 def engines_stopped_early(self, r):
359 def engines_stopped_early(self, r):
360 if self.early_shutdown and not self._stopping:
360 if self.early_shutdown and not self._stopping:
361 self.log.error("""
361 self.log.error("""
362 Engines shutdown early, they probably failed to connect.
362 Engines shutdown early, they probably failed to connect.
363
363
364 Check the engine log files for output.
364 Check the engine log files for output.
365
365
366 If your controller and engines are not on the same machine, you probably
366 If your controller and engines are not on the same machine, you probably
367 have to instruct the controller to listen on an interface other than localhost.
367 have to instruct the controller to listen on an interface other than localhost.
368
368
369 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
369 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
370
370
371 Be sure to read our security docs before instructing your controller to listen on
371 Be sure to read our security docs before instructing your controller to listen on
372 a public interface.
372 a public interface.
373 """)
373 """)
374 self.stop_launchers()
374 self.stop_launchers()
375
375
376 return self.engines_stopped(r)
376 return self.engines_stopped(r)
377
377
378 def engines_stopped(self, r):
378 def engines_stopped(self, r):
379 return self.loop.stop()
379 return self.loop.stop()
380
380
381 def stop_engines(self):
381 def stop_engines(self):
382 if self.engine_launcher.running:
382 if self.engine_launcher.running:
383 self.log.info("Stopping Engines...")
383 self.log.info("Stopping Engines...")
384 d = self.engine_launcher.stop()
384 d = self.engine_launcher.stop()
385 return d
385 return d
386 else:
386 else:
387 return None
387 return None
388
388
389 def stop_launchers(self, r=None):
389 def stop_launchers(self, r=None):
390 if not self._stopping:
390 if not self._stopping:
391 self._stopping = True
391 self._stopping = True
392 self.log.error("IPython cluster: stopping")
392 self.log.error("IPython cluster: stopping")
393 self.stop_engines()
393 self.stop_engines()
394 # Wait a few seconds to let things shut down.
394 # Wait a few seconds to let things shut down.
395 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
395 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
396 dc.start()
396 dc.start()
397
397
398 def sigint_handler(self, signum, frame):
398 def sigint_handler(self, signum, frame):
399 self.log.debug("SIGINT received, stopping launchers...")
399 self.log.debug("SIGINT received, stopping launchers...")
400 self.stop_launchers()
400 self.stop_launchers()
401
401
402 def start_logging(self):
402 def start_logging(self):
403 # Remove old log files of the controller and engine
403 # Remove old log files of the controller and engine
404 if self.clean_logs:
404 if self.clean_logs:
405 log_dir = self.profile_dir.log_dir
405 log_dir = self.profile_dir.log_dir
406 for f in os.listdir(log_dir):
406 for f in os.listdir(log_dir):
407 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
407 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
408 os.remove(os.path.join(log_dir, f))
408 os.remove(os.path.join(log_dir, f))
409 # This will remove old log files for ipcluster itself
409 # This will remove old log files for ipcluster itself
410 # super(IPBaseParallelApplication, self).start_logging()
410 # super(IPBaseParallelApplication, self).start_logging()
411
411
412 def start(self):
412 def start(self):
413 """Start the app for the engines subcommand."""
413 """Start the app for the engines subcommand."""
414 self.log.info("IPython cluster: started")
414 self.log.info("IPython cluster: started")
415 # First see if the cluster is already running
415 # First see if the cluster is already running
416
416
417 # Now log and daemonize
417 # Now log and daemonize
418 self.log.info(
418 self.log.info(
419 'Starting engines with [daemon=%r]' % self.daemonize
419 'Starting engines with [daemon=%r]' % self.daemonize
420 )
420 )
421 # TODO: Get daemonize working on Windows or as a Windows Server.
421 # TODO: Get daemonize working on Windows or as a Windows Server.
422 if self.daemonize:
422 if self.daemonize:
423 if os.name=='posix':
423 if os.name=='posix':
424 daemonize()
424 daemonize()
425
425
426 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
427 dc.start()
427 dc.start()
428 # Now write the new pid file AFTER our new forked pid is active.
428 # Now write the new pid file AFTER our new forked pid is active.
429 # self.write_pid_file()
429 # self.write_pid_file()
430 try:
430 try:
431 self.loop.start()
431 self.loop.start()
432 except KeyboardInterrupt:
432 except KeyboardInterrupt:
433 pass
433 pass
434 except zmq.ZMQError as e:
434 except zmq.ZMQError as e:
435 if e.errno == errno.EINTR:
435 if e.errno == errno.EINTR:
436 pass
436 pass
437 else:
437 else:
438 raise
438 raise
439
439
440 start_aliases = {}
440 start_aliases = {}
441 start_aliases.update(engine_aliases)
441 start_aliases.update(engine_aliases)
442 start_aliases.update(dict(
442 start_aliases.update(dict(
443 delay='IPClusterStart.delay',
443 delay='IPClusterStart.delay',
444 controller = 'IPClusterStart.controller_launcher_class',
444 controller = 'IPClusterStart.controller_launcher_class',
445 ))
445 ))
446 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
447
447
448 class IPClusterStart(IPClusterEngines):
448 class IPClusterStart(IPClusterEngines):
449
449
450 name = u'ipcluster'
450 name = u'ipcluster'
451 description = start_help
451 description = start_help
452 examples = _start_examples
452 examples = _start_examples
453 default_log_level = logging.INFO
453 default_log_level = logging.INFO
454 auto_create = Bool(True, config=True,
454 auto_create = Bool(True, config=True,
455 help="whether to create the profile_dir if it doesn't exist")
455 help="whether to create the profile_dir if it doesn't exist")
456 classes = List()
456 classes = List()
457 def _classes_default(self,):
457 def _classes_default(self,):
458 from IPython.parallel.apps import launcher
458 from IPython.parallel.apps import launcher
459 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
460
460
461 clean_logs = Bool(True, config=True,
461 clean_logs = Bool(True, config=True,
462 help="whether to cleanup old logs before starting")
462 help="whether to cleanup old logs before starting")
463
463
464 delay = CFloat(1., config=True,
464 delay = CFloat(1., config=True,
465 help="delay (in s) between starting the controller and the engines")
465 help="delay (in s) between starting the controller and the engines")
466
466
467 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
468 def _controller_launcher_changed(self, name, old, new):
468 def _controller_launcher_changed(self, name, old, new):
469 if isinstance(new, basestring):
469 if isinstance(new, basestring):
470 # old 0.11-style config
470 # old 0.11-style config
471 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
472 " use controller_launcher_class" % self.__class__.__name__)
472 " use controller_launcher_class" % self.__class__.__name__)
473 self.controller_launcher_class = new
473 self.controller_launcher_class = new
474 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
475 config=True,
475 config=True,
476 help="""The class for launching a Controller. Change this value if you want
476 help="""The class for launching a Controller. Change this value if you want
477 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
478
478
479 Each launcher class has its own set of configuration options, for making sure
479 Each launcher class has its own set of configuration options, for making sure
480 it will work in your environment.
480 it will work in your environment.
481
481
482 Note that using a batch launcher for the controller *does not* put it
482 Note that using a batch launcher for the controller *does not* put it
483 in the same batch job as the engines, so they will still start separately.
483 in the same batch job as the engines, so they will still start separately.
484
484
485 IPython's bundled examples include:
485 IPython's bundled examples include:
486
486
487 Local : start engines locally as subprocesses
487 Local : start engines locally as subprocesses
488 MPI : use mpiexec to launch the controller in an MPI universe
488 MPI : use mpiexec to launch the controller in an MPI universe
489 PBS : use PBS (qsub) to submit the controller to a batch queue
489 PBS : use PBS (qsub) to submit the controller to a batch queue
490 SGE : use SGE (qsub) to submit the controller to a batch queue
490 SGE : use SGE (qsub) to submit the controller to a batch queue
491 LSF : use LSF (bsub) to submit the controller to a batch queue
491 LSF : use LSF (bsub) to submit the controller to a batch queue
492 Condor: use HTCondor to submit the controller to a batch queue
492 HTCondor : use HTCondor to submit the controller to a batch queue
493 SSH : use SSH to start the controller
493 SSH : use SSH to start the controller
494 WindowsHPC : use Windows HPC
494 WindowsHPC : use Windows HPC
495
495
496 If you are using one of IPython's builtin launchers, you can specify just the
496 If you are using one of IPython's builtin launchers, you can specify just the
497 prefix, e.g:
497 prefix, e.g:
498
498
499 c.IPClusterStart.controller_launcher_class = 'SSH'
499 c.IPClusterStart.controller_launcher_class = 'SSH'
500
500
501 or:
501 or:
502
502
503 ipcluster start --controller=MPI
503 ipcluster start --controller=MPI
504
504
505 """
505 """
506 )
506 )
507 reset = Bool(False, config=True,
507 reset = Bool(False, config=True,
508 help="Whether to reset config files as part of '--create'."
508 help="Whether to reset config files as part of '--create'."
509 )
509 )
510
510
511 # flags = Dict(flags)
511 # flags = Dict(flags)
512 aliases = Dict(start_aliases)
512 aliases = Dict(start_aliases)
513
513
514 def init_launchers(self):
514 def init_launchers(self):
515 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
515 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
516 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
516 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
517
517
518 def engines_stopped(self, r):
518 def engines_stopped(self, r):
519 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
519 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
520 pass
520 pass
521
521
522 def start_controller(self):
522 def start_controller(self):
523 self.log.info("Starting Controller with %s", self.controller_launcher_class)
523 self.log.info("Starting Controller with %s", self.controller_launcher_class)
524 self.controller_launcher.on_stop(self.stop_launchers)
524 self.controller_launcher.on_stop(self.stop_launchers)
525 self.controller_launcher.start()
525 self.controller_launcher.start()
526
526
527 def stop_controller(self):
527 def stop_controller(self):
528 # self.log.info("In stop_controller")
528 # self.log.info("In stop_controller")
529 if self.controller_launcher and self.controller_launcher.running:
529 if self.controller_launcher and self.controller_launcher.running:
530 return self.controller_launcher.stop()
530 return self.controller_launcher.stop()
531
531
532 def stop_launchers(self, r=None):
532 def stop_launchers(self, r=None):
533 if not self._stopping:
533 if not self._stopping:
534 self.stop_controller()
534 self.stop_controller()
535 super(IPClusterStart, self).stop_launchers()
535 super(IPClusterStart, self).stop_launchers()
536
536
537 def start(self):
537 def start(self):
538 """Start the app for the start subcommand."""
538 """Start the app for the start subcommand."""
539 # First see if the cluster is already running
539 # First see if the cluster is already running
540 try:
540 try:
541 pid = self.get_pid_from_file()
541 pid = self.get_pid_from_file()
542 except PIDFileError:
542 except PIDFileError:
543 pass
543 pass
544 else:
544 else:
545 if self.check_pid(pid):
545 if self.check_pid(pid):
546 self.log.critical(
546 self.log.critical(
547 'Cluster is already running with [pid=%s]. '
547 'Cluster is already running with [pid=%s]. '
548 'use "ipcluster stop" to stop the cluster.' % pid
548 'use "ipcluster stop" to stop the cluster.' % pid
549 )
549 )
550 # Here I exit with a unusual exit status that other processes
550 # Here I exit with a unusual exit status that other processes
551 # can watch for to learn how I existed.
551 # can watch for to learn how I existed.
552 self.exit(ALREADY_STARTED)
552 self.exit(ALREADY_STARTED)
553 else:
553 else:
554 self.remove_pid_file()
554 self.remove_pid_file()
555
555
556
556
557 # Now log and daemonize
557 # Now log and daemonize
558 self.log.info(
558 self.log.info(
559 'Starting ipcluster with [daemon=%r]' % self.daemonize
559 'Starting ipcluster with [daemon=%r]' % self.daemonize
560 )
560 )
561 # TODO: Get daemonize working on Windows or as a Windows Server.
561 # TODO: Get daemonize working on Windows or as a Windows Server.
562 if self.daemonize:
562 if self.daemonize:
563 if os.name=='posix':
563 if os.name=='posix':
564 daemonize()
564 daemonize()
565
565
566 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
566 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
567 dc.start()
567 dc.start()
568 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
568 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
569 dc.start()
569 dc.start()
570 # Now write the new pid file AFTER our new forked pid is active.
570 # Now write the new pid file AFTER our new forked pid is active.
571 self.write_pid_file()
571 self.write_pid_file()
572 try:
572 try:
573 self.loop.start()
573 self.loop.start()
574 except KeyboardInterrupt:
574 except KeyboardInterrupt:
575 pass
575 pass
576 except zmq.ZMQError as e:
576 except zmq.ZMQError as e:
577 if e.errno == errno.EINTR:
577 if e.errno == errno.EINTR:
578 pass
578 pass
579 else:
579 else:
580 raise
580 raise
581 finally:
581 finally:
582 self.remove_pid_file()
582 self.remove_pid_file()
583
583
584 base='IPython.parallel.apps.ipclusterapp.IPCluster'
584 base='IPython.parallel.apps.ipclusterapp.IPCluster'
585
585
586 class IPClusterApp(BaseIPythonApplication):
586 class IPClusterApp(BaseIPythonApplication):
587 name = u'ipcluster'
587 name = u'ipcluster'
588 description = _description
588 description = _description
589 examples = _main_examples
589 examples = _main_examples
590
590
591 subcommands = {
591 subcommands = {
592 'start' : (base+'Start', start_help),
592 'start' : (base+'Start', start_help),
593 'stop' : (base+'Stop', stop_help),
593 'stop' : (base+'Stop', stop_help),
594 'engines' : (base+'Engines', engines_help),
594 'engines' : (base+'Engines', engines_help),
595 }
595 }
596
596
597 # no aliases or flags for parent App
597 # no aliases or flags for parent App
598 aliases = Dict()
598 aliases = Dict()
599 flags = Dict()
599 flags = Dict()
600
600
601 def start(self):
601 def start(self):
602 if self.subapp is None:
602 if self.subapp is None:
603 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
603 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
604 print
604 print
605 self.print_description()
605 self.print_description()
606 self.print_subcommands()
606 self.print_subcommands()
607 self.exit(1)
607 self.exit(1)
608 else:
608 else:
609 return self.subapp.start()
609 return self.subapp.start()
610
610
611 def launch_new_instance():
611 def launch_new_instance():
612 """Create and run the IPython cluster."""
612 """Create and run the IPython cluster."""
613 app = IPClusterApp.instance()
613 app = IPClusterApp.instance()
614 app.initialize()
614 app.initialize()
615 app.start()
615 app.start()
616
616
617
617
618 if __name__ == '__main__':
618 if __name__ == '__main__':
619 launch_new_instance()
619 launch_new_instance()
620
620
@@ -1,1441 +1,1441
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67
67
68 from .win32support import forward_read_events
68 from .win32support import forward_read_events
69
69
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71
71
72 WINDOWS = os.name == 'nt'
72 WINDOWS = os.name == 'nt'
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Paths to the kernel apps
75 # Paths to the kernel apps
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79
79
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81
81
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83
83
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Base launchers and errors
87 # Base launchers and errors
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90 class LauncherError(Exception):
90 class LauncherError(Exception):
91 pass
91 pass
92
92
93
93
94 class ProcessStateError(LauncherError):
94 class ProcessStateError(LauncherError):
95 pass
95 pass
96
96
97
97
98 class UnknownStatus(LauncherError):
98 class UnknownStatus(LauncherError):
99 pass
99 pass
100
100
101
101
102 class BaseLauncher(LoggingConfigurable):
102 class BaseLauncher(LoggingConfigurable):
103 """An asbtraction for starting, stopping and signaling a process."""
103 """An asbtraction for starting, stopping and signaling a process."""
104
104
105 # In all of the launchers, the work_dir is where child processes will be
105 # In all of the launchers, the work_dir is where child processes will be
106 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # run. This will usually be the profile_dir, but may not be. any work_dir
107 # passed into the __init__ method will override the config value.
107 # passed into the __init__ method will override the config value.
108 # This should not be used to set the work_dir for the actual engine
108 # This should not be used to set the work_dir for the actual engine
109 # and controller. Instead, use their own config files or the
109 # and controller. Instead, use their own config files or the
110 # controller_args, engine_args attributes of the launchers to add
110 # controller_args, engine_args attributes of the launchers to add
111 # the work_dir option.
111 # the work_dir option.
112 work_dir = Unicode(u'.')
112 work_dir = Unicode(u'.')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114
114
115 start_data = Any()
115 start_data = Any()
116 stop_data = Any()
116 stop_data = Any()
117
117
118 def _loop_default(self):
118 def _loop_default(self):
119 return ioloop.IOLoop.instance()
119 return ioloop.IOLoop.instance()
120
120
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 self.state = 'before' # can be before, running, after
123 self.state = 'before' # can be before, running, after
124 self.stop_callbacks = []
124 self.stop_callbacks = []
125 self.start_data = None
125 self.start_data = None
126 self.stop_data = None
126 self.stop_data = None
127
127
128 @property
128 @property
129 def args(self):
129 def args(self):
130 """A list of cmd and args that will be used to start the process.
130 """A list of cmd and args that will be used to start the process.
131
131
132 This is what is passed to :func:`spawnProcess` and the first element
132 This is what is passed to :func:`spawnProcess` and the first element
133 will be the process name.
133 will be the process name.
134 """
134 """
135 return self.find_args()
135 return self.find_args()
136
136
137 def find_args(self):
137 def find_args(self):
138 """The ``.args`` property calls this to find the args list.
138 """The ``.args`` property calls this to find the args list.
139
139
140 Subcommand should implement this to construct the cmd and args.
140 Subcommand should implement this to construct the cmd and args.
141 """
141 """
142 raise NotImplementedError('find_args must be implemented in a subclass')
142 raise NotImplementedError('find_args must be implemented in a subclass')
143
143
144 @property
144 @property
145 def arg_str(self):
145 def arg_str(self):
146 """The string form of the program arguments."""
146 """The string form of the program arguments."""
147 return ' '.join(self.args)
147 return ' '.join(self.args)
148
148
149 @property
149 @property
150 def running(self):
150 def running(self):
151 """Am I running."""
151 """Am I running."""
152 if self.state == 'running':
152 if self.state == 'running':
153 return True
153 return True
154 else:
154 else:
155 return False
155 return False
156
156
157 def start(self):
157 def start(self):
158 """Start the process."""
158 """Start the process."""
159 raise NotImplementedError('start must be implemented in a subclass')
159 raise NotImplementedError('start must be implemented in a subclass')
160
160
161 def stop(self):
161 def stop(self):
162 """Stop the process and notify observers of stopping.
162 """Stop the process and notify observers of stopping.
163
163
164 This method will return None immediately.
164 This method will return None immediately.
165 To observe the actual process stopping, see :meth:`on_stop`.
165 To observe the actual process stopping, see :meth:`on_stop`.
166 """
166 """
167 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
168
168
169 def on_stop(self, f):
169 def on_stop(self, f):
170 """Register a callback to be called with this Launcher's stop_data
170 """Register a callback to be called with this Launcher's stop_data
171 when the process actually finishes.
171 when the process actually finishes.
172 """
172 """
173 if self.state=='after':
173 if self.state=='after':
174 return f(self.stop_data)
174 return f(self.stop_data)
175 else:
175 else:
176 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
177
177
178 def notify_start(self, data):
178 def notify_start(self, data):
179 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
180
180
181 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
182 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
183 """
183 """
184
184
185 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.log.debug('Process %r started: %r', self.args[0], data)
186 self.start_data = data
186 self.start_data = data
187 self.state = 'running'
187 self.state = 'running'
188 return data
188 return data
189
189
190 def notify_stop(self, data):
190 def notify_stop(self, data):
191 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
192
192
193 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
194 this to trigger callbacks registered via :meth:`on_stop`."""
194 this to trigger callbacks registered via :meth:`on_stop`."""
195
195
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
197 self.stop_data = data
197 self.stop_data = data
198 self.state = 'after'
198 self.state = 'after'
199 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
200 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
201 d(data)
201 d(data)
202 return data
202 return data
203
203
204 def signal(self, sig):
204 def signal(self, sig):
205 """Signal the process.
205 """Signal the process.
206
206
207 Parameters
207 Parameters
208 ----------
208 ----------
209 sig : str or int
209 sig : str or int
210 'KILL', 'INT', etc., or any signal number
210 'KILL', 'INT', etc., or any signal number
211 """
211 """
212 raise NotImplementedError('signal must be implemented in a subclass')
212 raise NotImplementedError('signal must be implemented in a subclass')
213
213
214 class ClusterAppMixin(HasTraits):
214 class ClusterAppMixin(HasTraits):
215 """MixIn for cluster args as traits"""
215 """MixIn for cluster args as traits"""
216 profile_dir=Unicode('')
216 profile_dir=Unicode('')
217 cluster_id=Unicode('')
217 cluster_id=Unicode('')
218
218
219 @property
219 @property
220 def cluster_args(self):
220 def cluster_args(self):
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
222
222
223 class ControllerMixin(ClusterAppMixin):
223 class ControllerMixin(ClusterAppMixin):
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
225 help="""Popen command to launch ipcontroller.""")
225 help="""Popen command to launch ipcontroller.""")
226 # Command line arguments to ipcontroller.
226 # Command line arguments to ipcontroller.
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
228 help="""command-line args to pass to ipcontroller""")
228 help="""command-line args to pass to ipcontroller""")
229
229
230 class EngineMixin(ClusterAppMixin):
230 class EngineMixin(ClusterAppMixin):
231 engine_cmd = List(ipengine_cmd_argv, config=True,
231 engine_cmd = List(ipengine_cmd_argv, config=True,
232 help="""command to launch the Engine.""")
232 help="""command to launch the Engine.""")
233 # Command line arguments for ipengine.
233 # Command line arguments for ipengine.
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 help="command-line arguments to pass to ipengine"
235 help="command-line arguments to pass to ipengine"
236 )
236 )
237
237
238
238
239 #-----------------------------------------------------------------------------
239 #-----------------------------------------------------------------------------
240 # Local process launchers
240 # Local process launchers
241 #-----------------------------------------------------------------------------
241 #-----------------------------------------------------------------------------
242
242
243
243
244 class LocalProcessLauncher(BaseLauncher):
244 class LocalProcessLauncher(BaseLauncher):
245 """Start and stop an external process in an asynchronous manner.
245 """Start and stop an external process in an asynchronous manner.
246
246
247 This will launch the external process with a working directory of
247 This will launch the external process with a working directory of
248 ``self.work_dir``.
248 ``self.work_dir``.
249 """
249 """
250
250
251 # This is used to to construct self.args, which is passed to
251 # This is used to to construct self.args, which is passed to
252 # spawnProcess.
252 # spawnProcess.
253 cmd_and_args = List([])
253 cmd_and_args = List([])
254 poll_frequency = Integer(100) # in ms
254 poll_frequency = Integer(100) # in ms
255
255
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
257 super(LocalProcessLauncher, self).__init__(
257 super(LocalProcessLauncher, self).__init__(
258 work_dir=work_dir, config=config, **kwargs
258 work_dir=work_dir, config=config, **kwargs
259 )
259 )
260 self.process = None
260 self.process = None
261 self.poller = None
261 self.poller = None
262
262
263 def find_args(self):
263 def find_args(self):
264 return self.cmd_and_args
264 return self.cmd_and_args
265
265
266 def start(self):
266 def start(self):
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
268 if self.state == 'before':
268 if self.state == 'before':
269 self.process = Popen(self.args,
269 self.process = Popen(self.args,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
271 env=os.environ,
271 env=os.environ,
272 cwd=self.work_dir
272 cwd=self.work_dir
273 )
273 )
274 if WINDOWS:
274 if WINDOWS:
275 self.stdout = forward_read_events(self.process.stdout)
275 self.stdout = forward_read_events(self.process.stdout)
276 self.stderr = forward_read_events(self.process.stderr)
276 self.stderr = forward_read_events(self.process.stderr)
277 else:
277 else:
278 self.stdout = self.process.stdout.fileno()
278 self.stdout = self.process.stdout.fileno()
279 self.stderr = self.process.stderr.fileno()
279 self.stderr = self.process.stderr.fileno()
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
283 self.poller.start()
283 self.poller.start()
284 self.notify_start(self.process.pid)
284 self.notify_start(self.process.pid)
285 else:
285 else:
286 s = 'The process was already started and has state: %r' % self.state
286 s = 'The process was already started and has state: %r' % self.state
287 raise ProcessStateError(s)
287 raise ProcessStateError(s)
288
288
289 def stop(self):
289 def stop(self):
290 return self.interrupt_then_kill()
290 return self.interrupt_then_kill()
291
291
292 def signal(self, sig):
292 def signal(self, sig):
293 if self.state == 'running':
293 if self.state == 'running':
294 if WINDOWS and sig != SIGINT:
294 if WINDOWS and sig != SIGINT:
295 # use Windows tree-kill for better child cleanup
295 # use Windows tree-kill for better child cleanup
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
297 else:
297 else:
298 self.process.send_signal(sig)
298 self.process.send_signal(sig)
299
299
300 def interrupt_then_kill(self, delay=2.0):
300 def interrupt_then_kill(self, delay=2.0):
301 """Send INT, wait a delay and then send KILL."""
301 """Send INT, wait a delay and then send KILL."""
302 try:
302 try:
303 self.signal(SIGINT)
303 self.signal(SIGINT)
304 except Exception:
304 except Exception:
305 self.log.debug("interrupt failed")
305 self.log.debug("interrupt failed")
306 pass
306 pass
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
308 self.killer.start()
308 self.killer.start()
309
309
310 # callbacks, etc:
310 # callbacks, etc:
311
311
312 def handle_stdout(self, fd, events):
312 def handle_stdout(self, fd, events):
313 if WINDOWS:
313 if WINDOWS:
314 line = self.stdout.recv()
314 line = self.stdout.recv()
315 else:
315 else:
316 line = self.process.stdout.readline()
316 line = self.process.stdout.readline()
317 # a stopped process will be readable but return empty strings
317 # a stopped process will be readable but return empty strings
318 if line:
318 if line:
319 self.log.debug(line[:-1])
319 self.log.debug(line[:-1])
320 else:
320 else:
321 self.poll()
321 self.poll()
322
322
323 def handle_stderr(self, fd, events):
323 def handle_stderr(self, fd, events):
324 if WINDOWS:
324 if WINDOWS:
325 line = self.stderr.recv()
325 line = self.stderr.recv()
326 else:
326 else:
327 line = self.process.stderr.readline()
327 line = self.process.stderr.readline()
328 # a stopped process will be readable but return empty strings
328 # a stopped process will be readable but return empty strings
329 if line:
329 if line:
330 self.log.debug(line[:-1])
330 self.log.debug(line[:-1])
331 else:
331 else:
332 self.poll()
332 self.poll()
333
333
334 def poll(self):
334 def poll(self):
335 status = self.process.poll()
335 status = self.process.poll()
336 if status is not None:
336 if status is not None:
337 self.poller.stop()
337 self.poller.stop()
338 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stdout)
339 self.loop.remove_handler(self.stderr)
339 self.loop.remove_handler(self.stderr)
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
341 return status
341 return status
342
342
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
344 """Launch a controller as a regular external process."""
344 """Launch a controller as a regular external process."""
345
345
346 def find_args(self):
346 def find_args(self):
347 return self.controller_cmd + self.cluster_args + self.controller_args
347 return self.controller_cmd + self.cluster_args + self.controller_args
348
348
349 def start(self):
349 def start(self):
350 """Start the controller by profile_dir."""
350 """Start the controller by profile_dir."""
351 return super(LocalControllerLauncher, self).start()
351 return super(LocalControllerLauncher, self).start()
352
352
353
353
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
355 """Launch a single engine as a regular externall process."""
355 """Launch a single engine as a regular externall process."""
356
356
357 def find_args(self):
357 def find_args(self):
358 return self.engine_cmd + self.cluster_args + self.engine_args
358 return self.engine_cmd + self.cluster_args + self.engine_args
359
359
360
360
361 class LocalEngineSetLauncher(LocalEngineLauncher):
361 class LocalEngineSetLauncher(LocalEngineLauncher):
362 """Launch a set of engines as regular external processes."""
362 """Launch a set of engines as regular external processes."""
363
363
364 delay = CFloat(0.1, config=True,
364 delay = CFloat(0.1, config=True,
365 help="""delay (in seconds) between starting each engine after the first.
365 help="""delay (in seconds) between starting each engine after the first.
366 This can help force the engines to get their ids in order, or limit
366 This can help force the engines to get their ids in order, or limit
367 process flood when starting many engines."""
367 process flood when starting many engines."""
368 )
368 )
369
369
370 # launcher class
370 # launcher class
371 launcher_class = LocalEngineLauncher
371 launcher_class = LocalEngineLauncher
372
372
373 launchers = Dict()
373 launchers = Dict()
374 stop_data = Dict()
374 stop_data = Dict()
375
375
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 super(LocalEngineSetLauncher, self).__init__(
377 super(LocalEngineSetLauncher, self).__init__(
378 work_dir=work_dir, config=config, **kwargs
378 work_dir=work_dir, config=config, **kwargs
379 )
379 )
380 self.stop_data = {}
380 self.stop_data = {}
381
381
382 def start(self, n):
382 def start(self, n):
383 """Start n engines by profile or profile_dir."""
383 """Start n engines by profile or profile_dir."""
384 dlist = []
384 dlist = []
385 for i in range(n):
385 for i in range(n):
386 if i > 0:
386 if i > 0:
387 time.sleep(self.delay)
387 time.sleep(self.delay)
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
390 )
390 )
391
391
392 # Copy the engine args over to each engine launcher.
392 # Copy the engine args over to each engine launcher.
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
394 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
395 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
396 d = el.start()
396 d = el.start()
397 self.launchers[i] = el
397 self.launchers[i] = el
398 dlist.append(d)
398 dlist.append(d)
399 self.notify_start(dlist)
399 self.notify_start(dlist)
400 return dlist
400 return dlist
401
401
402 def find_args(self):
402 def find_args(self):
403 return ['engine set']
403 return ['engine set']
404
404
405 def signal(self, sig):
405 def signal(self, sig):
406 dlist = []
406 dlist = []
407 for el in self.launchers.itervalues():
407 for el in self.launchers.itervalues():
408 d = el.signal(sig)
408 d = el.signal(sig)
409 dlist.append(d)
409 dlist.append(d)
410 return dlist
410 return dlist
411
411
412 def interrupt_then_kill(self, delay=1.0):
412 def interrupt_then_kill(self, delay=1.0):
413 dlist = []
413 dlist = []
414 for el in self.launchers.itervalues():
414 for el in self.launchers.itervalues():
415 d = el.interrupt_then_kill(delay)
415 d = el.interrupt_then_kill(delay)
416 dlist.append(d)
416 dlist.append(d)
417 return dlist
417 return dlist
418
418
419 def stop(self):
419 def stop(self):
420 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
421
421
422 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
423 pid = data['pid']
423 pid = data['pid']
424 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
425 if el.process.pid == pid:
425 if el.process.pid == pid:
426 break
426 break
427 self.launchers.pop(idx)
427 self.launchers.pop(idx)
428 self.stop_data[idx] = data
428 self.stop_data[idx] = data
429 if not self.launchers:
429 if not self.launchers:
430 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
431
431
432
432
433 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
434 # MPI launchers
434 # MPI launchers
435 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
436
436
437
437
438 class MPILauncher(LocalProcessLauncher):
438 class MPILauncher(LocalProcessLauncher):
439 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
440
440
441 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
442 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
443 )
443 )
444 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
445 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
446 )
446 )
447 program = List(['date'],
447 program = List(['date'],
448 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
449 program_args = List([],
449 program_args = List([],
450 help="The command line argument to the program."
450 help="The command line argument to the program."
451 )
451 )
452 n = Integer(1)
452 n = Integer(1)
453
453
454 def __init__(self, *args, **kwargs):
454 def __init__(self, *args, **kwargs):
455 # deprecation for old MPIExec names:
455 # deprecation for old MPIExec names:
456 config = kwargs.get('config', {})
456 config = kwargs.get('config', {})
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
458 deprecated = config.get(oldname)
458 deprecated = config.get(oldname)
459 if deprecated:
459 if deprecated:
460 newname = oldname.replace('MPIExec', 'MPI')
460 newname = oldname.replace('MPIExec', 'MPI')
461 config[newname].update(deprecated)
461 config[newname].update(deprecated)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
463
463
464 super(MPILauncher, self).__init__(*args, **kwargs)
464 super(MPILauncher, self).__init__(*args, **kwargs)
465
465
466 def find_args(self):
466 def find_args(self):
467 """Build self.args using all the fields."""
467 """Build self.args using all the fields."""
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 self.program + self.program_args
469 self.program + self.program_args
470
470
471 def start(self, n):
471 def start(self, n):
472 """Start n instances of the program using mpiexec."""
472 """Start n instances of the program using mpiexec."""
473 self.n = n
473 self.n = n
474 return super(MPILauncher, self).start()
474 return super(MPILauncher, self).start()
475
475
476
476
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
478 """Launch a controller using mpiexec."""
478 """Launch a controller using mpiexec."""
479
479
480 # alias back to *non-configurable* program[_args] for use in find_args()
480 # alias back to *non-configurable* program[_args] for use in find_args()
481 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # this way all Controller/EngineSetLaunchers have the same form, rather
482 # than *some* having `program_args` and others `controller_args`
482 # than *some* having `program_args` and others `controller_args`
483 @property
483 @property
484 def program(self):
484 def program(self):
485 return self.controller_cmd
485 return self.controller_cmd
486
486
487 @property
487 @property
488 def program_args(self):
488 def program_args(self):
489 return self.cluster_args + self.controller_args
489 return self.cluster_args + self.controller_args
490
490
491 def start(self):
491 def start(self):
492 """Start the controller by profile_dir."""
492 """Start the controller by profile_dir."""
493 return super(MPIControllerLauncher, self).start(1)
493 return super(MPIControllerLauncher, self).start(1)
494
494
495
495
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
497 """Launch engines using mpiexec"""
497 """Launch engines using mpiexec"""
498
498
499 # alias back to *non-configurable* program[_args] for use in find_args()
499 # alias back to *non-configurable* program[_args] for use in find_args()
500 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # this way all Controller/EngineSetLaunchers have the same form, rather
501 # than *some* having `program_args` and others `controller_args`
501 # than *some* having `program_args` and others `controller_args`
502 @property
502 @property
503 def program(self):
503 def program(self):
504 return self.engine_cmd
504 return self.engine_cmd
505
505
506 @property
506 @property
507 def program_args(self):
507 def program_args(self):
508 return self.cluster_args + self.engine_args
508 return self.cluster_args + self.engine_args
509
509
510 def start(self, n):
510 def start(self, n):
511 """Start n engines by profile or profile_dir."""
511 """Start n engines by profile or profile_dir."""
512 self.n = n
512 self.n = n
513 return super(MPIEngineSetLauncher, self).start(n)
513 return super(MPIEngineSetLauncher, self).start(n)
514
514
515 # deprecated MPIExec names
515 # deprecated MPIExec names
516 class DeprecatedMPILauncher(object):
516 class DeprecatedMPILauncher(object):
517 def warn(self):
517 def warn(self):
518 oldname = self.__class__.__name__
518 oldname = self.__class__.__name__
519 newname = oldname.replace('MPIExec', 'MPI')
519 newname = oldname.replace('MPIExec', 'MPI')
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
521
521
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
523 """Deprecated, use MPILauncher"""
523 """Deprecated, use MPILauncher"""
524 def __init__(self, *args, **kwargs):
524 def __init__(self, *args, **kwargs):
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
526 self.warn()
526 self.warn()
527
527
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
529 """Deprecated, use MPIControllerLauncher"""
529 """Deprecated, use MPIControllerLauncher"""
530 def __init__(self, *args, **kwargs):
530 def __init__(self, *args, **kwargs):
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
532 self.warn()
532 self.warn()
533
533
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
535 """Deprecated, use MPIEngineSetLauncher"""
535 """Deprecated, use MPIEngineSetLauncher"""
536 def __init__(self, *args, **kwargs):
536 def __init__(self, *args, **kwargs):
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
538 self.warn()
538 self.warn()
539
539
540
540
541 #-----------------------------------------------------------------------------
541 #-----------------------------------------------------------------------------
542 # SSH launchers
542 # SSH launchers
543 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
544
544
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
546
546
547 class SSHLauncher(LocalProcessLauncher):
547 class SSHLauncher(LocalProcessLauncher):
548 """A minimal launcher for ssh.
548 """A minimal launcher for ssh.
549
549
550 To be useful this will probably have to be extended to use the ``sshx``
550 To be useful this will probably have to be extended to use the ``sshx``
551 idea for environment variables. There could be other things this needs
551 idea for environment variables. There could be other things this needs
552 as well.
552 as well.
553 """
553 """
554
554
555 ssh_cmd = List(['ssh'], config=True,
555 ssh_cmd = List(['ssh'], config=True,
556 help="command for starting ssh")
556 help="command for starting ssh")
557 ssh_args = List(['-tt'], config=True,
557 ssh_args = List(['-tt'], config=True,
558 help="args to pass to ssh")
558 help="args to pass to ssh")
559 scp_cmd = List(['scp'], config=True,
559 scp_cmd = List(['scp'], config=True,
560 help="command for sending files")
560 help="command for sending files")
561 program = List(['date'],
561 program = List(['date'],
562 help="Program to launch via ssh")
562 help="Program to launch via ssh")
563 program_args = List([],
563 program_args = List([],
564 help="args to pass to remote program")
564 help="args to pass to remote program")
565 hostname = Unicode('', config=True,
565 hostname = Unicode('', config=True,
566 help="hostname on which to launch the program")
566 help="hostname on which to launch the program")
567 user = Unicode('', config=True,
567 user = Unicode('', config=True,
568 help="username for ssh")
568 help="username for ssh")
569 location = Unicode('', config=True,
569 location = Unicode('', config=True,
570 help="user@hostname location for ssh in one setting")
570 help="user@hostname location for ssh in one setting")
571 to_fetch = List([], config=True,
571 to_fetch = List([], config=True,
572 help="List of (remote, local) files to fetch after starting")
572 help="List of (remote, local) files to fetch after starting")
573 to_send = List([], config=True,
573 to_send = List([], config=True,
574 help="List of (local, remote) files to send before starting")
574 help="List of (local, remote) files to send before starting")
575
575
576 def _hostname_changed(self, name, old, new):
576 def _hostname_changed(self, name, old, new):
577 if self.user:
577 if self.user:
578 self.location = u'%s@%s' % (self.user, new)
578 self.location = u'%s@%s' % (self.user, new)
579 else:
579 else:
580 self.location = new
580 self.location = new
581
581
582 def _user_changed(self, name, old, new):
582 def _user_changed(self, name, old, new):
583 self.location = u'%s@%s' % (new, self.hostname)
583 self.location = u'%s@%s' % (new, self.hostname)
584
584
585 def find_args(self):
585 def find_args(self):
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 list(map(pipes.quote, self.program + self.program_args))
587 list(map(pipes.quote, self.program + self.program_args))
588
588
589 def _send_file(self, local, remote):
589 def _send_file(self, local, remote):
590 """send a single file"""
590 """send a single file"""
591 remote = "%s:%s" % (self.location, remote)
591 remote = "%s:%s" % (self.location, remote)
592 for i in range(10):
592 for i in range(10):
593 if not os.path.exists(local):
593 if not os.path.exists(local):
594 self.log.debug("waiting for %s" % local)
594 self.log.debug("waiting for %s" % local)
595 time.sleep(1)
595 time.sleep(1)
596 else:
596 else:
597 break
597 break
598 self.log.info("sending %s to %s", local, remote)
598 self.log.info("sending %s to %s", local, remote)
599 check_output(self.scp_cmd + [local, remote])
599 check_output(self.scp_cmd + [local, remote])
600
600
601 def send_files(self):
601 def send_files(self):
602 """send our files (called before start)"""
602 """send our files (called before start)"""
603 if not self.to_send:
603 if not self.to_send:
604 return
604 return
605 for local_file, remote_file in self.to_send:
605 for local_file, remote_file in self.to_send:
606 self._send_file(local_file, remote_file)
606 self._send_file(local_file, remote_file)
607
607
608 def _fetch_file(self, remote, local):
608 def _fetch_file(self, remote, local):
609 """fetch a single file"""
609 """fetch a single file"""
610 full_remote = "%s:%s" % (self.location, remote)
610 full_remote = "%s:%s" % (self.location, remote)
611 self.log.info("fetching %s from %s", local, full_remote)
611 self.log.info("fetching %s from %s", local, full_remote)
612 for i in range(10):
612 for i in range(10):
613 # wait up to 10s for remote file to exist
613 # wait up to 10s for remote file to exist
614 check = check_output(self.ssh_cmd + self.ssh_args + \
614 check = check_output(self.ssh_cmd + self.ssh_args + \
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
616 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
616 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
617 if check == u'no':
617 if check == u'no':
618 time.sleep(1)
618 time.sleep(1)
619 elif check == u'yes':
619 elif check == u'yes':
620 break
620 break
621 check_output(self.scp_cmd + [full_remote, local])
621 check_output(self.scp_cmd + [full_remote, local])
622
622
623 def fetch_files(self):
623 def fetch_files(self):
624 """fetch remote files (called after start)"""
624 """fetch remote files (called after start)"""
625 if not self.to_fetch:
625 if not self.to_fetch:
626 return
626 return
627 for remote_file, local_file in self.to_fetch:
627 for remote_file, local_file in self.to_fetch:
628 self._fetch_file(remote_file, local_file)
628 self._fetch_file(remote_file, local_file)
629
629
630 def start(self, hostname=None, user=None):
630 def start(self, hostname=None, user=None):
631 if hostname is not None:
631 if hostname is not None:
632 self.hostname = hostname
632 self.hostname = hostname
633 if user is not None:
633 if user is not None:
634 self.user = user
634 self.user = user
635
635
636 self.send_files()
636 self.send_files()
637 super(SSHLauncher, self).start()
637 super(SSHLauncher, self).start()
638 self.fetch_files()
638 self.fetch_files()
639
639
640 def signal(self, sig):
640 def signal(self, sig):
641 if self.state == 'running':
641 if self.state == 'running':
642 # send escaped ssh connection-closer
642 # send escaped ssh connection-closer
643 self.process.stdin.write('~.')
643 self.process.stdin.write('~.')
644 self.process.stdin.flush()
644 self.process.stdin.flush()
645
645
646 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
646 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
647
647
648 remote_profile_dir = Unicode('', config=True,
648 remote_profile_dir = Unicode('', config=True,
649 help="""The remote profile_dir to use.
649 help="""The remote profile_dir to use.
650
650
651 If not specified, use calling profile, stripping out possible leading homedir.
651 If not specified, use calling profile, stripping out possible leading homedir.
652 """)
652 """)
653
653
654 def _profile_dir_changed(self, name, old, new):
654 def _profile_dir_changed(self, name, old, new):
655 if not self.remote_profile_dir:
655 if not self.remote_profile_dir:
656 # trigger remote_profile_dir_default logic again,
656 # trigger remote_profile_dir_default logic again,
657 # in case it was already triggered before profile_dir was set
657 # in case it was already triggered before profile_dir was set
658 self.remote_profile_dir = self._strip_home(new)
658 self.remote_profile_dir = self._strip_home(new)
659
659
660 @staticmethod
660 @staticmethod
661 def _strip_home(path):
661 def _strip_home(path):
662 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
662 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
663 home = get_home_dir()
663 home = get_home_dir()
664 if not home.endswith('/'):
664 if not home.endswith('/'):
665 home = home+'/'
665 home = home+'/'
666
666
667 if path.startswith(home):
667 if path.startswith(home):
668 return path[len(home):]
668 return path[len(home):]
669 else:
669 else:
670 return path
670 return path
671
671
672 def _remote_profile_dir_default(self):
672 def _remote_profile_dir_default(self):
673 return self._strip_home(self.profile_dir)
673 return self._strip_home(self.profile_dir)
674
674
675 def _cluster_id_changed(self, name, old, new):
675 def _cluster_id_changed(self, name, old, new):
676 if new:
676 if new:
677 raise ValueError("cluster id not supported by SSH launchers")
677 raise ValueError("cluster id not supported by SSH launchers")
678
678
679 @property
679 @property
680 def cluster_args(self):
680 def cluster_args(self):
681 return ['--profile-dir', self.remote_profile_dir]
681 return ['--profile-dir', self.remote_profile_dir]
682
682
683 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
683 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
684
684
685 # alias back to *non-configurable* program[_args] for use in find_args()
685 # alias back to *non-configurable* program[_args] for use in find_args()
686 # this way all Controller/EngineSetLaunchers have the same form, rather
686 # this way all Controller/EngineSetLaunchers have the same form, rather
687 # than *some* having `program_args` and others `controller_args`
687 # than *some* having `program_args` and others `controller_args`
688
688
689 def _controller_cmd_default(self):
689 def _controller_cmd_default(self):
690 return ['ipcontroller']
690 return ['ipcontroller']
691
691
692 @property
692 @property
693 def program(self):
693 def program(self):
694 return self.controller_cmd
694 return self.controller_cmd
695
695
696 @property
696 @property
697 def program_args(self):
697 def program_args(self):
698 return self.cluster_args + self.controller_args
698 return self.cluster_args + self.controller_args
699
699
700 def _to_fetch_default(self):
700 def _to_fetch_default(self):
701 return [
701 return [
702 (os.path.join(self.remote_profile_dir, 'security', cf),
702 (os.path.join(self.remote_profile_dir, 'security', cf),
703 os.path.join(self.profile_dir, 'security', cf),)
703 os.path.join(self.profile_dir, 'security', cf),)
704 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
704 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
705 ]
705 ]
706
706
707 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
707 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
708
708
709 # alias back to *non-configurable* program[_args] for use in find_args()
709 # alias back to *non-configurable* program[_args] for use in find_args()
710 # this way all Controller/EngineSetLaunchers have the same form, rather
710 # this way all Controller/EngineSetLaunchers have the same form, rather
711 # than *some* having `program_args` and others `controller_args`
711 # than *some* having `program_args` and others `controller_args`
712
712
713 def _engine_cmd_default(self):
713 def _engine_cmd_default(self):
714 return ['ipengine']
714 return ['ipengine']
715
715
716 @property
716 @property
717 def program(self):
717 def program(self):
718 return self.engine_cmd
718 return self.engine_cmd
719
719
720 @property
720 @property
721 def program_args(self):
721 def program_args(self):
722 return self.cluster_args + self.engine_args
722 return self.cluster_args + self.engine_args
723
723
724 def _to_send_default(self):
724 def _to_send_default(self):
725 return [
725 return [
726 (os.path.join(self.profile_dir, 'security', cf),
726 (os.path.join(self.profile_dir, 'security', cf),
727 os.path.join(self.remote_profile_dir, 'security', cf))
727 os.path.join(self.remote_profile_dir, 'security', cf))
728 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
728 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
729 ]
729 ]
730
730
731
731
732 class SSHEngineSetLauncher(LocalEngineSetLauncher):
732 class SSHEngineSetLauncher(LocalEngineSetLauncher):
733 launcher_class = SSHEngineLauncher
733 launcher_class = SSHEngineLauncher
734 engines = Dict(config=True,
734 engines = Dict(config=True,
735 help="""dict of engines to launch. This is a dict by hostname of ints,
735 help="""dict of engines to launch. This is a dict by hostname of ints,
736 corresponding to the number of engines to start on that host.""")
736 corresponding to the number of engines to start on that host.""")
737
737
738 def _engine_cmd_default(self):
738 def _engine_cmd_default(self):
739 return ['ipengine']
739 return ['ipengine']
740
740
741 @property
741 @property
742 def engine_count(self):
742 def engine_count(self):
743 """determine engine count from `engines` dict"""
743 """determine engine count from `engines` dict"""
744 count = 0
744 count = 0
745 for n in self.engines.itervalues():
745 for n in self.engines.itervalues():
746 if isinstance(n, (tuple,list)):
746 if isinstance(n, (tuple,list)):
747 n,args = n
747 n,args = n
748 count += n
748 count += n
749 return count
749 return count
750
750
751 def start(self, n):
751 def start(self, n):
752 """Start engines by profile or profile_dir.
752 """Start engines by profile or profile_dir.
753 `n` is ignored, and the `engines` config property is used instead.
753 `n` is ignored, and the `engines` config property is used instead.
754 """
754 """
755
755
756 dlist = []
756 dlist = []
757 for host, n in self.engines.iteritems():
757 for host, n in self.engines.iteritems():
758 if isinstance(n, (tuple, list)):
758 if isinstance(n, (tuple, list)):
759 n, args = n
759 n, args = n
760 else:
760 else:
761 args = copy.deepcopy(self.engine_args)
761 args = copy.deepcopy(self.engine_args)
762
762
763 if '@' in host:
763 if '@' in host:
764 user,host = host.split('@',1)
764 user,host = host.split('@',1)
765 else:
765 else:
766 user=None
766 user=None
767 for i in range(n):
767 for i in range(n):
768 if i > 0:
768 if i > 0:
769 time.sleep(self.delay)
769 time.sleep(self.delay)
770 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
770 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
771 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
771 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
772 )
772 )
773 if i > 0:
773 if i > 0:
774 # only send files for the first engine on each host
774 # only send files for the first engine on each host
775 el.to_send = []
775 el.to_send = []
776
776
777 # Copy the engine args over to each engine launcher.
777 # Copy the engine args over to each engine launcher.
778 el.engine_cmd = self.engine_cmd
778 el.engine_cmd = self.engine_cmd
779 el.engine_args = args
779 el.engine_args = args
780 el.on_stop(self._notice_engine_stopped)
780 el.on_stop(self._notice_engine_stopped)
781 d = el.start(user=user, hostname=host)
781 d = el.start(user=user, hostname=host)
782 self.launchers[ "%s/%i" % (host,i) ] = el
782 self.launchers[ "%s/%i" % (host,i) ] = el
783 dlist.append(d)
783 dlist.append(d)
784 self.notify_start(dlist)
784 self.notify_start(dlist)
785 return dlist
785 return dlist
786
786
787
787
788 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
788 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
789 """Launcher for calling
789 """Launcher for calling
790 `ipcluster engines` on a remote machine.
790 `ipcluster engines` on a remote machine.
791
791
792 Requires that remote profile is already configured.
792 Requires that remote profile is already configured.
793 """
793 """
794
794
795 n = Integer()
795 n = Integer()
796 ipcluster_cmd = List(['ipcluster'], config=True)
796 ipcluster_cmd = List(['ipcluster'], config=True)
797
797
798 @property
798 @property
799 def program(self):
799 def program(self):
800 return self.ipcluster_cmd + ['engines']
800 return self.ipcluster_cmd + ['engines']
801
801
802 @property
802 @property
803 def program_args(self):
803 def program_args(self):
804 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
804 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
805
805
806 def _to_send_default(self):
806 def _to_send_default(self):
807 return [
807 return [
808 (os.path.join(self.profile_dir, 'security', cf),
808 (os.path.join(self.profile_dir, 'security', cf),
809 os.path.join(self.remote_profile_dir, 'security', cf))
809 os.path.join(self.remote_profile_dir, 'security', cf))
810 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
810 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
811 ]
811 ]
812
812
813 def start(self, n):
813 def start(self, n):
814 self.n = n
814 self.n = n
815 super(SSHProxyEngineSetLauncher, self).start()
815 super(SSHProxyEngineSetLauncher, self).start()
816
816
817
817
818 #-----------------------------------------------------------------------------
818 #-----------------------------------------------------------------------------
819 # Windows HPC Server 2008 scheduler launchers
819 # Windows HPC Server 2008 scheduler launchers
820 #-----------------------------------------------------------------------------
820 #-----------------------------------------------------------------------------
821
821
822
822
823 # This is only used on Windows.
823 # This is only used on Windows.
824 def find_job_cmd():
824 def find_job_cmd():
825 if WINDOWS:
825 if WINDOWS:
826 try:
826 try:
827 return find_cmd('job')
827 return find_cmd('job')
828 except (FindCmdError, ImportError):
828 except (FindCmdError, ImportError):
829 # ImportError will be raised if win32api is not installed
829 # ImportError will be raised if win32api is not installed
830 return 'job'
830 return 'job'
831 else:
831 else:
832 return 'job'
832 return 'job'
833
833
834
834
835 class WindowsHPCLauncher(BaseLauncher):
835 class WindowsHPCLauncher(BaseLauncher):
836
836
837 job_id_regexp = CRegExp(r'\d+', config=True,
837 job_id_regexp = CRegExp(r'\d+', config=True,
838 help="""A regular expression used to get the job id from the output of the
838 help="""A regular expression used to get the job id from the output of the
839 submit_command. """
839 submit_command. """
840 )
840 )
841 job_file_name = Unicode(u'ipython_job.xml', config=True,
841 job_file_name = Unicode(u'ipython_job.xml', config=True,
842 help="The filename of the instantiated job script.")
842 help="The filename of the instantiated job script.")
843 # The full path to the instantiated job script. This gets made dynamically
843 # The full path to the instantiated job script. This gets made dynamically
844 # by combining the work_dir with the job_file_name.
844 # by combining the work_dir with the job_file_name.
845 job_file = Unicode(u'')
845 job_file = Unicode(u'')
846 scheduler = Unicode('', config=True,
846 scheduler = Unicode('', config=True,
847 help="The hostname of the scheduler to submit the job to.")
847 help="The hostname of the scheduler to submit the job to.")
848 job_cmd = Unicode(find_job_cmd(), config=True,
848 job_cmd = Unicode(find_job_cmd(), config=True,
849 help="The command for submitting jobs.")
849 help="The command for submitting jobs.")
850
850
851 def __init__(self, work_dir=u'.', config=None, **kwargs):
851 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 super(WindowsHPCLauncher, self).__init__(
852 super(WindowsHPCLauncher, self).__init__(
853 work_dir=work_dir, config=config, **kwargs
853 work_dir=work_dir, config=config, **kwargs
854 )
854 )
855
855
856 @property
856 @property
857 def job_file(self):
857 def job_file(self):
858 return os.path.join(self.work_dir, self.job_file_name)
858 return os.path.join(self.work_dir, self.job_file_name)
859
859
860 def write_job_file(self, n):
860 def write_job_file(self, n):
861 raise NotImplementedError("Implement write_job_file in a subclass.")
861 raise NotImplementedError("Implement write_job_file in a subclass.")
862
862
863 def find_args(self):
863 def find_args(self):
864 return [u'job.exe']
864 return [u'job.exe']
865
865
866 def parse_job_id(self, output):
866 def parse_job_id(self, output):
867 """Take the output of the submit command and return the job id."""
867 """Take the output of the submit command and return the job id."""
868 m = self.job_id_regexp.search(output)
868 m = self.job_id_regexp.search(output)
869 if m is not None:
869 if m is not None:
870 job_id = m.group()
870 job_id = m.group()
871 else:
871 else:
872 raise LauncherError("Job id couldn't be determined: %s" % output)
872 raise LauncherError("Job id couldn't be determined: %s" % output)
873 self.job_id = job_id
873 self.job_id = job_id
874 self.log.info('Job started with id: %r', job_id)
874 self.log.info('Job started with id: %r', job_id)
875 return job_id
875 return job_id
876
876
877 def start(self, n):
877 def start(self, n):
878 """Start n copies of the process using the Win HPC job scheduler."""
878 """Start n copies of the process using the Win HPC job scheduler."""
879 self.write_job_file(n)
879 self.write_job_file(n)
880 args = [
880 args = [
881 'submit',
881 'submit',
882 '/jobfile:%s' % self.job_file,
882 '/jobfile:%s' % self.job_file,
883 '/scheduler:%s' % self.scheduler
883 '/scheduler:%s' % self.scheduler
884 ]
884 ]
885 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
885 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
886
886
887 output = check_output([self.job_cmd]+args,
887 output = check_output([self.job_cmd]+args,
888 env=os.environ,
888 env=os.environ,
889 cwd=self.work_dir,
889 cwd=self.work_dir,
890 stderr=STDOUT
890 stderr=STDOUT
891 )
891 )
892 output = output.decode(DEFAULT_ENCODING, 'replace')
892 output = output.decode(DEFAULT_ENCODING, 'replace')
893 job_id = self.parse_job_id(output)
893 job_id = self.parse_job_id(output)
894 self.notify_start(job_id)
894 self.notify_start(job_id)
895 return job_id
895 return job_id
896
896
897 def stop(self):
897 def stop(self):
898 args = [
898 args = [
899 'cancel',
899 'cancel',
900 self.job_id,
900 self.job_id,
901 '/scheduler:%s' % self.scheduler
901 '/scheduler:%s' % self.scheduler
902 ]
902 ]
903 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
903 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904 try:
904 try:
905 output = check_output([self.job_cmd]+args,
905 output = check_output([self.job_cmd]+args,
906 env=os.environ,
906 env=os.environ,
907 cwd=self.work_dir,
907 cwd=self.work_dir,
908 stderr=STDOUT
908 stderr=STDOUT
909 )
909 )
910 output = output.decode(DEFAULT_ENCODING, 'replace')
910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 except:
911 except:
912 output = u'The job already appears to be stopped: %r' % self.job_id
912 output = u'The job already appears to be stopped: %r' % self.job_id
913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
914 return output
914 return output
915
915
916
916
917 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
917 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
918
918
919 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
919 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
920 help="WinHPC xml job file.")
920 help="WinHPC xml job file.")
921 controller_args = List([], config=False,
921 controller_args = List([], config=False,
922 help="extra args to pass to ipcontroller")
922 help="extra args to pass to ipcontroller")
923
923
924 def write_job_file(self, n):
924 def write_job_file(self, n):
925 job = IPControllerJob(config=self.config)
925 job = IPControllerJob(config=self.config)
926
926
927 t = IPControllerTask(config=self.config)
927 t = IPControllerTask(config=self.config)
928 # The tasks work directory is *not* the actual work directory of
928 # The tasks work directory is *not* the actual work directory of
929 # the controller. It is used as the base path for the stdout/stderr
929 # the controller. It is used as the base path for the stdout/stderr
930 # files that the scheduler redirects to.
930 # files that the scheduler redirects to.
931 t.work_directory = self.profile_dir
931 t.work_directory = self.profile_dir
932 # Add the profile_dir and from self.start().
932 # Add the profile_dir and from self.start().
933 t.controller_args.extend(self.cluster_args)
933 t.controller_args.extend(self.cluster_args)
934 t.controller_args.extend(self.controller_args)
934 t.controller_args.extend(self.controller_args)
935 job.add_task(t)
935 job.add_task(t)
936
936
937 self.log.debug("Writing job description file: %s", self.job_file)
937 self.log.debug("Writing job description file: %s", self.job_file)
938 job.write(self.job_file)
938 job.write(self.job_file)
939
939
940 @property
940 @property
941 def job_file(self):
941 def job_file(self):
942 return os.path.join(self.profile_dir, self.job_file_name)
942 return os.path.join(self.profile_dir, self.job_file_name)
943
943
944 def start(self):
944 def start(self):
945 """Start the controller by profile_dir."""
945 """Start the controller by profile_dir."""
946 return super(WindowsHPCControllerLauncher, self).start(1)
946 return super(WindowsHPCControllerLauncher, self).start(1)
947
947
948
948
949 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
949 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
950
950
951 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
951 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
952 help="jobfile for ipengines job")
952 help="jobfile for ipengines job")
953 engine_args = List([], config=False,
953 engine_args = List([], config=False,
954 help="extra args to pas to ipengine")
954 help="extra args to pas to ipengine")
955
955
956 def write_job_file(self, n):
956 def write_job_file(self, n):
957 job = IPEngineSetJob(config=self.config)
957 job = IPEngineSetJob(config=self.config)
958
958
959 for i in range(n):
959 for i in range(n):
960 t = IPEngineTask(config=self.config)
960 t = IPEngineTask(config=self.config)
961 # The tasks work directory is *not* the actual work directory of
961 # The tasks work directory is *not* the actual work directory of
962 # the engine. It is used as the base path for the stdout/stderr
962 # the engine. It is used as the base path for the stdout/stderr
963 # files that the scheduler redirects to.
963 # files that the scheduler redirects to.
964 t.work_directory = self.profile_dir
964 t.work_directory = self.profile_dir
965 # Add the profile_dir and from self.start().
965 # Add the profile_dir and from self.start().
966 t.engine_args.extend(self.cluster_args)
966 t.engine_args.extend(self.cluster_args)
967 t.engine_args.extend(self.engine_args)
967 t.engine_args.extend(self.engine_args)
968 job.add_task(t)
968 job.add_task(t)
969
969
970 self.log.debug("Writing job description file: %s", self.job_file)
970 self.log.debug("Writing job description file: %s", self.job_file)
971 job.write(self.job_file)
971 job.write(self.job_file)
972
972
973 @property
973 @property
974 def job_file(self):
974 def job_file(self):
975 return os.path.join(self.profile_dir, self.job_file_name)
975 return os.path.join(self.profile_dir, self.job_file_name)
976
976
977 def start(self, n):
977 def start(self, n):
978 """Start the controller by profile_dir."""
978 """Start the controller by profile_dir."""
979 return super(WindowsHPCEngineSetLauncher, self).start(n)
979 return super(WindowsHPCEngineSetLauncher, self).start(n)
980
980
981
981
982 #-----------------------------------------------------------------------------
982 #-----------------------------------------------------------------------------
983 # Batch (PBS) system launchers
983 # Batch (PBS) system launchers
984 #-----------------------------------------------------------------------------
984 #-----------------------------------------------------------------------------
985
985
986 class BatchClusterAppMixin(ClusterAppMixin):
986 class BatchClusterAppMixin(ClusterAppMixin):
987 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
987 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
988 def _profile_dir_changed(self, name, old, new):
988 def _profile_dir_changed(self, name, old, new):
989 self.context[name] = new
989 self.context[name] = new
990 _cluster_id_changed = _profile_dir_changed
990 _cluster_id_changed = _profile_dir_changed
991
991
992 def _profile_dir_default(self):
992 def _profile_dir_default(self):
993 self.context['profile_dir'] = ''
993 self.context['profile_dir'] = ''
994 return ''
994 return ''
995 def _cluster_id_default(self):
995 def _cluster_id_default(self):
996 self.context['cluster_id'] = ''
996 self.context['cluster_id'] = ''
997 return ''
997 return ''
998
998
999
999
1000 class BatchSystemLauncher(BaseLauncher):
1000 class BatchSystemLauncher(BaseLauncher):
1001 """Launch an external process using a batch system.
1001 """Launch an external process using a batch system.
1002
1002
1003 This class is designed to work with UNIX batch systems like PBS, LSF,
1003 This class is designed to work with UNIX batch systems like PBS, LSF,
1004 GridEngine, etc. The overall model is that there are different commands
1004 GridEngine, etc. The overall model is that there are different commands
1005 like qsub, qdel, etc. that handle the starting and stopping of the process.
1005 like qsub, qdel, etc. that handle the starting and stopping of the process.
1006
1006
1007 This class also has the notion of a batch script. The ``batch_template``
1007 This class also has the notion of a batch script. The ``batch_template``
1008 attribute can be set to a string that is a template for the batch script.
1008 attribute can be set to a string that is a template for the batch script.
1009 This template is instantiated using string formatting. Thus the template can
1009 This template is instantiated using string formatting. Thus the template can
1010 use {n} fot the number of instances. Subclasses can add additional variables
1010 use {n} fot the number of instances. Subclasses can add additional variables
1011 to the template dict.
1011 to the template dict.
1012 """
1012 """
1013
1013
1014 # Subclasses must fill these in. See PBSEngineSet
1014 # Subclasses must fill these in. See PBSEngineSet
1015 submit_command = List([''], config=True,
1015 submit_command = List([''], config=True,
1016 help="The name of the command line program used to submit jobs.")
1016 help="The name of the command line program used to submit jobs.")
1017 delete_command = List([''], config=True,
1017 delete_command = List([''], config=True,
1018 help="The name of the command line program used to delete jobs.")
1018 help="The name of the command line program used to delete jobs.")
1019 job_id_regexp = CRegExp('', config=True,
1019 job_id_regexp = CRegExp('', config=True,
1020 help="""A regular expression used to get the job id from the output of the
1020 help="""A regular expression used to get the job id from the output of the
1021 submit_command.""")
1021 submit_command.""")
1022 job_id_regexp_group = Integer(0, config=True,
1022 job_id_regexp_group = Integer(0, config=True,
1023 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1023 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1024 batch_template = Unicode('', config=True,
1024 batch_template = Unicode('', config=True,
1025 help="The string that is the batch script template itself.")
1025 help="The string that is the batch script template itself.")
1026 batch_template_file = Unicode(u'', config=True,
1026 batch_template_file = Unicode(u'', config=True,
1027 help="The file that contains the batch template.")
1027 help="The file that contains the batch template.")
1028 batch_file_name = Unicode(u'batch_script', config=True,
1028 batch_file_name = Unicode(u'batch_script', config=True,
1029 help="The filename of the instantiated batch script.")
1029 help="The filename of the instantiated batch script.")
1030 queue = Unicode(u'', config=True,
1030 queue = Unicode(u'', config=True,
1031 help="The PBS Queue.")
1031 help="The PBS Queue.")
1032
1032
1033 def _queue_changed(self, name, old, new):
1033 def _queue_changed(self, name, old, new):
1034 self.context[name] = new
1034 self.context[name] = new
1035
1035
1036 n = Integer(1)
1036 n = Integer(1)
1037 _n_changed = _queue_changed
1037 _n_changed = _queue_changed
1038
1038
1039 # not configurable, override in subclasses
1039 # not configurable, override in subclasses
1040 # PBS Job Array regex
1040 # PBS Job Array regex
1041 job_array_regexp = CRegExp('')
1041 job_array_regexp = CRegExp('')
1042 job_array_template = Unicode('')
1042 job_array_template = Unicode('')
1043 # PBS Queue regex
1043 # PBS Queue regex
1044 queue_regexp = CRegExp('')
1044 queue_regexp = CRegExp('')
1045 queue_template = Unicode('')
1045 queue_template = Unicode('')
1046 # The default batch template, override in subclasses
1046 # The default batch template, override in subclasses
1047 default_template = Unicode('')
1047 default_template = Unicode('')
1048 # The full path to the instantiated batch script.
1048 # The full path to the instantiated batch script.
1049 batch_file = Unicode(u'')
1049 batch_file = Unicode(u'')
1050 # the format dict used with batch_template:
1050 # the format dict used with batch_template:
1051 context = Dict()
1051 context = Dict()
1052
1052
1053 def _context_default(self):
1053 def _context_default(self):
1054 """load the default context with the default values for the basic keys
1054 """load the default context with the default values for the basic keys
1055
1055
1056 because the _trait_changed methods only load the context if they
1056 because the _trait_changed methods only load the context if they
1057 are set to something other than the default value.
1057 are set to something other than the default value.
1058 """
1058 """
1059 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1059 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1060
1060
1061 # the Formatter instance for rendering the templates:
1061 # the Formatter instance for rendering the templates:
1062 formatter = Instance(EvalFormatter, (), {})
1062 formatter = Instance(EvalFormatter, (), {})
1063
1063
1064 def find_args(self):
1064 def find_args(self):
1065 return self.submit_command + [self.batch_file]
1065 return self.submit_command + [self.batch_file]
1066
1066
1067 def __init__(self, work_dir=u'.', config=None, **kwargs):
1067 def __init__(self, work_dir=u'.', config=None, **kwargs):
1068 super(BatchSystemLauncher, self).__init__(
1068 super(BatchSystemLauncher, self).__init__(
1069 work_dir=work_dir, config=config, **kwargs
1069 work_dir=work_dir, config=config, **kwargs
1070 )
1070 )
1071 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1071 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1072
1072
1073 def parse_job_id(self, output):
1073 def parse_job_id(self, output):
1074 """Take the output of the submit command and return the job id."""
1074 """Take the output of the submit command and return the job id."""
1075 m = self.job_id_regexp.search(output)
1075 m = self.job_id_regexp.search(output)
1076 if m is not None:
1076 if m is not None:
1077 job_id = m.group(self.job_id_regexp_group)
1077 job_id = m.group(self.job_id_regexp_group)
1078 else:
1078 else:
1079 raise LauncherError("Job id couldn't be determined: %s" % output)
1079 raise LauncherError("Job id couldn't be determined: %s" % output)
1080 self.job_id = job_id
1080 self.job_id = job_id
1081 self.log.info('Job submitted with job id: %r', job_id)
1081 self.log.info('Job submitted with job id: %r', job_id)
1082 return job_id
1082 return job_id
1083
1083
1084 def write_batch_script(self, n):
1084 def write_batch_script(self, n):
1085 """Instantiate and write the batch script to the work_dir."""
1085 """Instantiate and write the batch script to the work_dir."""
1086 self.n = n
1086 self.n = n
1087 # first priority is batch_template if set
1087 # first priority is batch_template if set
1088 if self.batch_template_file and not self.batch_template:
1088 if self.batch_template_file and not self.batch_template:
1089 # second priority is batch_template_file
1089 # second priority is batch_template_file
1090 with open(self.batch_template_file) as f:
1090 with open(self.batch_template_file) as f:
1091 self.batch_template = f.read()
1091 self.batch_template = f.read()
1092 if not self.batch_template:
1092 if not self.batch_template:
1093 # third (last) priority is default_template
1093 # third (last) priority is default_template
1094 self.batch_template = self.default_template
1094 self.batch_template = self.default_template
1095 # add jobarray or queue lines to user-specified template
1095 # add jobarray or queue lines to user-specified template
1096 # note that this is *only* when user did not specify a template.
1096 # note that this is *only* when user did not specify a template.
1097 self._insert_queue_in_script()
1097 self._insert_queue_in_script()
1098 self._insert_job_array_in_script()
1098 self._insert_job_array_in_script()
1099 script_as_string = self.formatter.format(self.batch_template, **self.context)
1099 script_as_string = self.formatter.format(self.batch_template, **self.context)
1100 self.log.debug('Writing batch script: %s', self.batch_file)
1100 self.log.debug('Writing batch script: %s', self.batch_file)
1101 with open(self.batch_file, 'w') as f:
1101 with open(self.batch_file, 'w') as f:
1102 f.write(script_as_string)
1102 f.write(script_as_string)
1103 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1103 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1104
1104
1105 def _insert_queue_in_script(self):
1105 def _insert_queue_in_script(self):
1106 """Inserts a queue if required into the batch script.
1106 """Inserts a queue if required into the batch script.
1107 """
1107 """
1108 if self.queue and not self.queue_regexp.search(self.batch_template):
1108 if self.queue and not self.queue_regexp.search(self.batch_template):
1109 self.log.debug("adding PBS queue settings to batch script")
1109 self.log.debug("adding PBS queue settings to batch script")
1110 firstline, rest = self.batch_template.split('\n',1)
1110 firstline, rest = self.batch_template.split('\n',1)
1111 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1111 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1112
1112
1113 def _insert_job_array_in_script(self):
1113 def _insert_job_array_in_script(self):
1114 """Inserts a job array if required into the batch script.
1114 """Inserts a job array if required into the batch script.
1115 """
1115 """
1116 if not self.job_array_regexp.search(self.batch_template):
1116 if not self.job_array_regexp.search(self.batch_template):
1117 self.log.debug("adding job array settings to batch script")
1117 self.log.debug("adding job array settings to batch script")
1118 firstline, rest = self.batch_template.split('\n',1)
1118 firstline, rest = self.batch_template.split('\n',1)
1119 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1119 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1120
1120
1121 def start(self, n):
1121 def start(self, n):
1122 """Start n copies of the process using a batch system."""
1122 """Start n copies of the process using a batch system."""
1123 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1123 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1124 # Here we save profile_dir in the context so they
1124 # Here we save profile_dir in the context so they
1125 # can be used in the batch script template as {profile_dir}
1125 # can be used in the batch script template as {profile_dir}
1126 self.write_batch_script(n)
1126 self.write_batch_script(n)
1127 output = check_output(self.args, env=os.environ)
1127 output = check_output(self.args, env=os.environ)
1128 output = output.decode(DEFAULT_ENCODING, 'replace')
1128 output = output.decode(DEFAULT_ENCODING, 'replace')
1129
1129
1130 job_id = self.parse_job_id(output)
1130 job_id = self.parse_job_id(output)
1131 self.notify_start(job_id)
1131 self.notify_start(job_id)
1132 return job_id
1132 return job_id
1133
1133
1134 def stop(self):
1134 def stop(self):
1135 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1135 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1136 output = output.decode(DEFAULT_ENCODING, 'replace')
1136 output = output.decode(DEFAULT_ENCODING, 'replace')
1137 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1137 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1138 return output
1138 return output
1139
1139
1140
1140
1141 class PBSLauncher(BatchSystemLauncher):
1141 class PBSLauncher(BatchSystemLauncher):
1142 """A BatchSystemLauncher subclass for PBS."""
1142 """A BatchSystemLauncher subclass for PBS."""
1143
1143
1144 submit_command = List(['qsub'], config=True,
1144 submit_command = List(['qsub'], config=True,
1145 help="The PBS submit command ['qsub']")
1145 help="The PBS submit command ['qsub']")
1146 delete_command = List(['qdel'], config=True,
1146 delete_command = List(['qdel'], config=True,
1147 help="The PBS delete command ['qsub']")
1147 help="The PBS delete command ['qsub']")
1148 job_id_regexp = CRegExp(r'\d+', config=True,
1148 job_id_regexp = CRegExp(r'\d+', config=True,
1149 help="Regular expresion for identifying the job ID [r'\d+']")
1149 help="Regular expresion for identifying the job ID [r'\d+']")
1150
1150
1151 batch_file = Unicode(u'')
1151 batch_file = Unicode(u'')
1152 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1152 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1153 job_array_template = Unicode('#PBS -t 1-{n}')
1153 job_array_template = Unicode('#PBS -t 1-{n}')
1154 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1154 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1155 queue_template = Unicode('#PBS -q {queue}')
1155 queue_template = Unicode('#PBS -q {queue}')
1156
1156
1157
1157
1158 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1158 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1159 """Launch a controller using PBS."""
1159 """Launch a controller using PBS."""
1160
1160
1161 batch_file_name = Unicode(u'pbs_controller', config=True,
1161 batch_file_name = Unicode(u'pbs_controller', config=True,
1162 help="batch file name for the controller job.")
1162 help="batch file name for the controller job.")
1163 default_template= Unicode("""#!/bin/sh
1163 default_template= Unicode("""#!/bin/sh
1164 #PBS -V
1164 #PBS -V
1165 #PBS -N ipcontroller
1165 #PBS -N ipcontroller
1166 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1166 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1167 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1167 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1168
1168
1169 def start(self):
1169 def start(self):
1170 """Start the controller by profile or profile_dir."""
1170 """Start the controller by profile or profile_dir."""
1171 return super(PBSControllerLauncher, self).start(1)
1171 return super(PBSControllerLauncher, self).start(1)
1172
1172
1173
1173
1174 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1174 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1175 """Launch Engines using PBS"""
1175 """Launch Engines using PBS"""
1176 batch_file_name = Unicode(u'pbs_engines', config=True,
1176 batch_file_name = Unicode(u'pbs_engines', config=True,
1177 help="batch file name for the engine(s) job.")
1177 help="batch file name for the engine(s) job.")
1178 default_template= Unicode(u"""#!/bin/sh
1178 default_template= Unicode(u"""#!/bin/sh
1179 #PBS -V
1179 #PBS -V
1180 #PBS -N ipengine
1180 #PBS -N ipengine
1181 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1181 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1182 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1182 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1183
1183
1184
1184
1185 #SGE is very similar to PBS
1185 #SGE is very similar to PBS
1186
1186
1187 class SGELauncher(PBSLauncher):
1187 class SGELauncher(PBSLauncher):
1188 """Sun GridEngine is a PBS clone with slightly different syntax"""
1188 """Sun GridEngine is a PBS clone with slightly different syntax"""
1189 job_array_regexp = CRegExp('#\$\W+\-t')
1189 job_array_regexp = CRegExp('#\$\W+\-t')
1190 job_array_template = Unicode('#$ -t 1-{n}')
1190 job_array_template = Unicode('#$ -t 1-{n}')
1191 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1191 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1192 queue_template = Unicode('#$ -q {queue}')
1192 queue_template = Unicode('#$ -q {queue}')
1193
1193
1194
1194
1195 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1195 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1196 """Launch a controller using SGE."""
1196 """Launch a controller using SGE."""
1197
1197
1198 batch_file_name = Unicode(u'sge_controller', config=True,
1198 batch_file_name = Unicode(u'sge_controller', config=True,
1199 help="batch file name for the ipontroller job.")
1199 help="batch file name for the ipontroller job.")
1200 default_template= Unicode(u"""#$ -V
1200 default_template= Unicode(u"""#$ -V
1201 #$ -S /bin/sh
1201 #$ -S /bin/sh
1202 #$ -N ipcontroller
1202 #$ -N ipcontroller
1203 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1204 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1204 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1205
1205
1206 def start(self):
1206 def start(self):
1207 """Start the controller by profile or profile_dir."""
1207 """Start the controller by profile or profile_dir."""
1208 return super(SGEControllerLauncher, self).start(1)
1208 return super(SGEControllerLauncher, self).start(1)
1209
1209
1210
1210
1211 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1211 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1212 """Launch Engines with SGE"""
1212 """Launch Engines with SGE"""
1213 batch_file_name = Unicode(u'sge_engines', config=True,
1213 batch_file_name = Unicode(u'sge_engines', config=True,
1214 help="batch file name for the engine(s) job.")
1214 help="batch file name for the engine(s) job.")
1215 default_template = Unicode("""#$ -V
1215 default_template = Unicode("""#$ -V
1216 #$ -S /bin/sh
1216 #$ -S /bin/sh
1217 #$ -N ipengine
1217 #$ -N ipengine
1218 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1218 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1219 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1219 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1220
1220
1221
1221
1222 # LSF launchers
1222 # LSF launchers
1223
1223
1224 class LSFLauncher(BatchSystemLauncher):
1224 class LSFLauncher(BatchSystemLauncher):
1225 """A BatchSystemLauncher subclass for LSF."""
1225 """A BatchSystemLauncher subclass for LSF."""
1226
1226
1227 submit_command = List(['bsub'], config=True,
1227 submit_command = List(['bsub'], config=True,
1228 help="The PBS submit command ['bsub']")
1228 help="The PBS submit command ['bsub']")
1229 delete_command = List(['bkill'], config=True,
1229 delete_command = List(['bkill'], config=True,
1230 help="The PBS delete command ['bkill']")
1230 help="The PBS delete command ['bkill']")
1231 job_id_regexp = CRegExp(r'\d+', config=True,
1231 job_id_regexp = CRegExp(r'\d+', config=True,
1232 help="Regular expresion for identifying the job ID [r'\d+']")
1232 help="Regular expresion for identifying the job ID [r'\d+']")
1233
1233
1234 batch_file = Unicode(u'')
1234 batch_file = Unicode(u'')
1235 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1235 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1236 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1236 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1237 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1237 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1238 queue_template = Unicode('#BSUB -q {queue}')
1238 queue_template = Unicode('#BSUB -q {queue}')
1239
1239
1240 def start(self, n):
1240 def start(self, n):
1241 """Start n copies of the process using LSF batch system.
1241 """Start n copies of the process using LSF batch system.
1242 This cant inherit from the base class because bsub expects
1242 This cant inherit from the base class because bsub expects
1243 to be piped a shell script in order to honor the #BSUB directives :
1243 to be piped a shell script in order to honor the #BSUB directives :
1244 bsub < script
1244 bsub < script
1245 """
1245 """
1246 # Here we save profile_dir in the context so they
1246 # Here we save profile_dir in the context so they
1247 # can be used in the batch script template as {profile_dir}
1247 # can be used in the batch script template as {profile_dir}
1248 self.write_batch_script(n)
1248 self.write_batch_script(n)
1249 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1249 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1250 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1250 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1251 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1251 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1252 output,err = p.communicate()
1252 output,err = p.communicate()
1253 output = output.decode(DEFAULT_ENCODING, 'replace')
1253 output = output.decode(DEFAULT_ENCODING, 'replace')
1254 job_id = self.parse_job_id(output)
1254 job_id = self.parse_job_id(output)
1255 self.notify_start(job_id)
1255 self.notify_start(job_id)
1256 return job_id
1256 return job_id
1257
1257
1258
1258
1259 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1259 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1260 """Launch a controller using LSF."""
1260 """Launch a controller using LSF."""
1261
1261
1262 batch_file_name = Unicode(u'lsf_controller', config=True,
1262 batch_file_name = Unicode(u'lsf_controller', config=True,
1263 help="batch file name for the controller job.")
1263 help="batch file name for the controller job.")
1264 default_template= Unicode("""#!/bin/sh
1264 default_template= Unicode("""#!/bin/sh
1265 #BSUB -J ipcontroller
1265 #BSUB -J ipcontroller
1266 #BSUB -oo ipcontroller.o.%%J
1266 #BSUB -oo ipcontroller.o.%%J
1267 #BSUB -eo ipcontroller.e.%%J
1267 #BSUB -eo ipcontroller.e.%%J
1268 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1268 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1269 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1269 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1270
1270
1271 def start(self):
1271 def start(self):
1272 """Start the controller by profile or profile_dir."""
1272 """Start the controller by profile or profile_dir."""
1273 return super(LSFControllerLauncher, self).start(1)
1273 return super(LSFControllerLauncher, self).start(1)
1274
1274
1275
1275
1276 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1276 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1277 """Launch Engines using LSF"""
1277 """Launch Engines using LSF"""
1278 batch_file_name = Unicode(u'lsf_engines', config=True,
1278 batch_file_name = Unicode(u'lsf_engines', config=True,
1279 help="batch file name for the engine(s) job.")
1279 help="batch file name for the engine(s) job.")
1280 default_template= Unicode(u"""#!/bin/sh
1280 default_template= Unicode(u"""#!/bin/sh
1281 #BSUB -oo ipengine.o.%%J
1281 #BSUB -oo ipengine.o.%%J
1282 #BSUB -eo ipengine.e.%%J
1282 #BSUB -eo ipengine.e.%%J
1283 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1283 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1284 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1284 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1285
1285
1286
1286
1287
1287
1288 class CondorLauncher(BatchSystemLauncher):
1288 class HTCondorLauncher(BatchSystemLauncher):
1289 """A BatchSystemLauncher subclass for Condor.
1289 """A BatchSystemLauncher subclass for HTCondor.
1290
1290
1291 Condor requires that we launch the ipengine/ipcontroller scripts rather
1291 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1292 that the python instance but otherwise is very similar to PBS. This is because
1292 that the python instance but otherwise is very similar to PBS. This is because
1293 HTCondor destroys sys.executable when launching remote processes - a launched
1293 HTCondor destroys sys.executable when launching remote processes - a launched
1294 python process depends on sys.executable to effectively evaluate its
1294 python process depends on sys.executable to effectively evaluate its
1295 module search paths. Without it, regardless of which python interpreter you launch
1295 module search paths. Without it, regardless of which python interpreter you launch
1296 you will get the to built in module search paths.
1296 you will get the to built in module search paths.
1297
1297
1298 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1298 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1299 this - the mechanism of shebanged scripts means that the python binary will be
1299 this - the mechanism of shebanged scripts means that the python binary will be
1300 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1300 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1301 scripts on the remote node*. This means you need to take care that:
1301 scripts on the remote node*. This means you need to take care that:
1302 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1302 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1303 of the python environment you wish to execute code in having top precedence.
1303 of the python environment you wish to execute code in having top precedence.
1304 b. This functionality is untested on Windows.
1304 b. This functionality is untested on Windows.
1305
1305
1306 If you need different behavior, consider making you own template.
1306 If you need different behavior, consider making you own template.
1307 """
1307 """
1308
1308
1309 submit_command = List(['condor_submit'], config=True,
1309 submit_command = List(['condor_submit'], config=True,
1310 help="The Condor submit command ['condor_submit']")
1310 help="The HTCondor submit command ['condor_submit']")
1311 delete_command = List(['condor_rm'], config=True,
1311 delete_command = List(['condor_rm'], config=True,
1312 help="The Condor delete command ['condor_rm']")
1312 help="The HTCondor delete command ['condor_rm']")
1313 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1313 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1314 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1314 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1315 job_id_regexp_group = Integer(1, config=True,
1315 job_id_regexp_group = Integer(1, config=True,
1316 help="""The group we wish to match in job_id_regexp [1]""")
1316 help="""The group we wish to match in job_id_regexp [1]""")
1317
1317
1318 job_array_regexp = CRegExp('queue\W+\$')
1318 job_array_regexp = CRegExp('queue\W+\$')
1319 job_array_template = Unicode('queue {n}')
1319 job_array_template = Unicode('queue {n}')
1320
1320
1321
1321
1322 def _insert_job_array_in_script(self):
1322 def _insert_job_array_in_script(self):
1323 """Inserts a job array if required into the batch script.
1323 """Inserts a job array if required into the batch script.
1324 """
1324 """
1325 if not self.job_array_regexp.search(self.batch_template):
1325 if not self.job_array_regexp.search(self.batch_template):
1326 self.log.debug("adding job array settings to batch script")
1326 self.log.debug("adding job array settings to batch script")
1327 #Condor requires that the job array goes at the bottom of the script
1327 #HTCondor requires that the job array goes at the bottom of the script
1328 self.batch_template = '\n'.join([self.batch_template,
1328 self.batch_template = '\n'.join([self.batch_template,
1329 self.job_array_template])
1329 self.job_array_template])
1330
1330
1331 def _insert_queue_in_script(self):
1331 def _insert_queue_in_script(self):
1332 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1332 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1333 specified in the script.
1333 specified in the script.
1334 """
1334 """
1335 pass
1335 pass
1336
1336
1337
1337
1338 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1338 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1339 """Launch a controller using Condor."""
1339 """Launch a controller using HTCondor."""
1340
1340
1341 batch_file_name = Unicode(u'condor_controller', config=True,
1341 batch_file_name = Unicode(u'htcondor_controller', config=True,
1342 help="batch file name for the controller job.")
1342 help="batch file name for the controller job.")
1343 default_template = Unicode(r"""
1343 default_template = Unicode(r"""
1344 universe = vanilla
1344 universe = vanilla
1345 executable = ipcontroller
1345 executable = ipcontroller
1346 # by default we expect a shared file system
1346 # by default we expect a shared file system
1347 transfer_executable = False
1347 transfer_executable = False
1348 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1348 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1349 """)
1349 """)
1350
1350
1351 def start(self):
1351 def start(self):
1352 """Start the controller by profile or profile_dir."""
1352 """Start the controller by profile or profile_dir."""
1353 return super(CondorControllerLauncher, self).start(1)
1353 return super(HTCondorControllerLauncher, self).start(1)
1354
1354
1355
1355
1356 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1356 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1357 """Launch Engines using Condor"""
1357 """Launch Engines using HTCondor"""
1358 batch_file_name = Unicode(u'condor_engines', config=True,
1358 batch_file_name = Unicode(u'htcondor_engines', config=True,
1359 help="batch file name for the engine(s) job.")
1359 help="batch file name for the engine(s) job.")
1360 default_template = Unicode("""
1360 default_template = Unicode("""
1361 universe = vanilla
1361 universe = vanilla
1362 executable = ipengine
1362 executable = ipengine
1363 # by default we expect a shared file system
1363 # by default we expect a shared file system
1364 transfer_executable = False
1364 transfer_executable = False
1365 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1365 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1366 """)
1366 """)
1367
1367
1368
1368
1369 #-----------------------------------------------------------------------------
1369 #-----------------------------------------------------------------------------
1370 # A launcher for ipcluster itself!
1370 # A launcher for ipcluster itself!
1371 #-----------------------------------------------------------------------------
1371 #-----------------------------------------------------------------------------
1372
1372
1373
1373
1374 class IPClusterLauncher(LocalProcessLauncher):
1374 class IPClusterLauncher(LocalProcessLauncher):
1375 """Launch the ipcluster program in an external process."""
1375 """Launch the ipcluster program in an external process."""
1376
1376
1377 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1377 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1378 help="Popen command for ipcluster")
1378 help="Popen command for ipcluster")
1379 ipcluster_args = List(
1379 ipcluster_args = List(
1380 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1380 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1381 help="Command line arguments to pass to ipcluster.")
1381 help="Command line arguments to pass to ipcluster.")
1382 ipcluster_subcommand = Unicode('start')
1382 ipcluster_subcommand = Unicode('start')
1383 profile = Unicode('default')
1383 profile = Unicode('default')
1384 n = Integer(2)
1384 n = Integer(2)
1385
1385
1386 def find_args(self):
1386 def find_args(self):
1387 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1387 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1388 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1388 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1389 self.ipcluster_args
1389 self.ipcluster_args
1390
1390
1391 def start(self):
1391 def start(self):
1392 return super(IPClusterLauncher, self).start()
1392 return super(IPClusterLauncher, self).start()
1393
1393
1394 #-----------------------------------------------------------------------------
1394 #-----------------------------------------------------------------------------
1395 # Collections of launchers
1395 # Collections of launchers
1396 #-----------------------------------------------------------------------------
1396 #-----------------------------------------------------------------------------
1397
1397
1398 local_launchers = [
1398 local_launchers = [
1399 LocalControllerLauncher,
1399 LocalControllerLauncher,
1400 LocalEngineLauncher,
1400 LocalEngineLauncher,
1401 LocalEngineSetLauncher,
1401 LocalEngineSetLauncher,
1402 ]
1402 ]
1403 mpi_launchers = [
1403 mpi_launchers = [
1404 MPILauncher,
1404 MPILauncher,
1405 MPIControllerLauncher,
1405 MPIControllerLauncher,
1406 MPIEngineSetLauncher,
1406 MPIEngineSetLauncher,
1407 ]
1407 ]
1408 ssh_launchers = [
1408 ssh_launchers = [
1409 SSHLauncher,
1409 SSHLauncher,
1410 SSHControllerLauncher,
1410 SSHControllerLauncher,
1411 SSHEngineLauncher,
1411 SSHEngineLauncher,
1412 SSHEngineSetLauncher,
1412 SSHEngineSetLauncher,
1413 SSHProxyEngineSetLauncher,
1413 SSHProxyEngineSetLauncher,
1414 ]
1414 ]
1415 winhpc_launchers = [
1415 winhpc_launchers = [
1416 WindowsHPCLauncher,
1416 WindowsHPCLauncher,
1417 WindowsHPCControllerLauncher,
1417 WindowsHPCControllerLauncher,
1418 WindowsHPCEngineSetLauncher,
1418 WindowsHPCEngineSetLauncher,
1419 ]
1419 ]
1420 pbs_launchers = [
1420 pbs_launchers = [
1421 PBSLauncher,
1421 PBSLauncher,
1422 PBSControllerLauncher,
1422 PBSControllerLauncher,
1423 PBSEngineSetLauncher,
1423 PBSEngineSetLauncher,
1424 ]
1424 ]
1425 sge_launchers = [
1425 sge_launchers = [
1426 SGELauncher,
1426 SGELauncher,
1427 SGEControllerLauncher,
1427 SGEControllerLauncher,
1428 SGEEngineSetLauncher,
1428 SGEEngineSetLauncher,
1429 ]
1429 ]
1430 lsf_launchers = [
1430 lsf_launchers = [
1431 LSFLauncher,
1431 LSFLauncher,
1432 LSFControllerLauncher,
1432 LSFControllerLauncher,
1433 LSFEngineSetLauncher,
1433 LSFEngineSetLauncher,
1434 ]
1434 ]
1435 condor_launchers = [
1435 htcondor_launchers = [
1436 CondorLauncher,
1436 HTCondorLauncher,
1437 CondorControllerLauncher,
1437 HTCondorControllerLauncher,
1438 CondorEngineSetLauncher,
1438 HTCondorEngineSetLauncher,
1439 ]
1439 ]
1440 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1440 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1441 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
1441 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
@@ -1,185 +1,185
1 """Tests for launchers
1 """Tests for launchers
2
2
3 Doesn't actually start any subprocesses, but goes through the motions of constructing
3 Doesn't actually start any subprocesses, but goes through the motions of constructing
4 objects, which should test basic config.
4 objects, which should test basic config.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2013 The IPython Development Team
12 # Copyright (C) 2013 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import logging
22 import logging
23 import os
23 import os
24 import shutil
24 import shutil
25 import sys
25 import sys
26 import tempfile
26 import tempfile
27
27
28 from unittest import TestCase
28 from unittest import TestCase
29
29
30 from nose import SkipTest
30 from nose import SkipTest
31
31
32 from IPython.config import Config
32 from IPython.config import Config
33
33
34 from IPython.parallel.apps import launcher
34 from IPython.parallel.apps import launcher
35
35
36 from IPython.testing import decorators as dec
36 from IPython.testing import decorators as dec
37
37
38
38
39 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
40 # TestCase Mixins
40 # TestCase Mixins
41 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
42
42
43 class LauncherTest:
43 class LauncherTest:
44 """Mixin for generic launcher tests"""
44 """Mixin for generic launcher tests"""
45 def setUp(self):
45 def setUp(self):
46 self.profile_dir = tempfile.mkdtemp(prefix="profile_")
46 self.profile_dir = tempfile.mkdtemp(prefix="profile_")
47
47
48 def tearDown(self):
48 def tearDown(self):
49 shutil.rmtree(self.profile_dir)
49 shutil.rmtree(self.profile_dir)
50
50
51 @property
51 @property
52 def config(self):
52 def config(self):
53 return Config()
53 return Config()
54
54
55 def build_launcher(self, **kwargs):
55 def build_launcher(self, **kwargs):
56 kw = dict(
56 kw = dict(
57 work_dir=self.profile_dir,
57 work_dir=self.profile_dir,
58 profile_dir=self.profile_dir,
58 profile_dir=self.profile_dir,
59 config=self.config,
59 config=self.config,
60 cluster_id='',
60 cluster_id='',
61 log=logging.getLogger(),
61 log=logging.getLogger(),
62 )
62 )
63 kw.update(kwargs)
63 kw.update(kwargs)
64 return self.launcher_class(**kw)
64 return self.launcher_class(**kw)
65
65
66 def test_profile_dir_arg(self):
66 def test_profile_dir_arg(self):
67 launcher = self.build_launcher()
67 launcher = self.build_launcher()
68 self.assertTrue("--profile-dir" in launcher.cluster_args)
68 self.assertTrue("--profile-dir" in launcher.cluster_args)
69 self.assertTrue(self.profile_dir in launcher.cluster_args)
69 self.assertTrue(self.profile_dir in launcher.cluster_args)
70
70
71 def test_cluster_id_arg(self):
71 def test_cluster_id_arg(self):
72 launcher = self.build_launcher()
72 launcher = self.build_launcher()
73 self.assertTrue("--cluster-id" in launcher.cluster_args)
73 self.assertTrue("--cluster-id" in launcher.cluster_args)
74 idx = launcher.cluster_args.index("--cluster-id")
74 idx = launcher.cluster_args.index("--cluster-id")
75 self.assertEqual(launcher.cluster_args[idx+1], '')
75 self.assertEqual(launcher.cluster_args[idx+1], '')
76 launcher.cluster_id = 'foo'
76 launcher.cluster_id = 'foo'
77 self.assertEqual(launcher.cluster_args[idx+1], 'foo')
77 self.assertEqual(launcher.cluster_args[idx+1], 'foo')
78
78
79 def test_args(self):
79 def test_args(self):
80 launcher = self.build_launcher()
80 launcher = self.build_launcher()
81 for arg in launcher.args:
81 for arg in launcher.args:
82 self.assertTrue(isinstance(arg, basestring), str(arg))
82 self.assertTrue(isinstance(arg, basestring), str(arg))
83
83
84 class BatchTest:
84 class BatchTest:
85 """Tests for batch-system launchers (LSF, SGE, PBS)"""
85 """Tests for batch-system launchers (LSF, SGE, PBS)"""
86 def test_batch_template(self):
86 def test_batch_template(self):
87 launcher = self.build_launcher()
87 launcher = self.build_launcher()
88 batch_file = os.path.join(self.profile_dir, launcher.batch_file_name)
88 batch_file = os.path.join(self.profile_dir, launcher.batch_file_name)
89 self.assertEqual(launcher.batch_file, batch_file)
89 self.assertEqual(launcher.batch_file, batch_file)
90 launcher.write_batch_script(1)
90 launcher.write_batch_script(1)
91 self.assertTrue(os.path.isfile(batch_file))
91 self.assertTrue(os.path.isfile(batch_file))
92
92
93 class SSHTest:
93 class SSHTest:
94 """Tests for SSH launchers"""
94 """Tests for SSH launchers"""
95 def test_cluster_id_arg(self):
95 def test_cluster_id_arg(self):
96 raise SkipTest("SSH Launchers don't support cluster ID")
96 raise SkipTest("SSH Launchers don't support cluster ID")
97
97
98 def test_remote_profile_dir(self):
98 def test_remote_profile_dir(self):
99 cfg = Config()
99 cfg = Config()
100 launcher_cfg = getattr(cfg, self.launcher_class.__name__)
100 launcher_cfg = getattr(cfg, self.launcher_class.__name__)
101 launcher_cfg.remote_profile_dir = "foo"
101 launcher_cfg.remote_profile_dir = "foo"
102 launcher = self.build_launcher(config=cfg)
102 launcher = self.build_launcher(config=cfg)
103 self.assertEqual(launcher.remote_profile_dir, "foo")
103 self.assertEqual(launcher.remote_profile_dir, "foo")
104
104
105 def test_remote_profile_dir_default(self):
105 def test_remote_profile_dir_default(self):
106 launcher = self.build_launcher()
106 launcher = self.build_launcher()
107 self.assertEqual(launcher.remote_profile_dir, self.profile_dir)
107 self.assertEqual(launcher.remote_profile_dir, self.profile_dir)
108
108
109 #-------------------------------------------------------------------------------
109 #-------------------------------------------------------------------------------
110 # Controller Launcher Tests
110 # Controller Launcher Tests
111 #-------------------------------------------------------------------------------
111 #-------------------------------------------------------------------------------
112
112
113 class ControllerLauncherTest(LauncherTest):
113 class ControllerLauncherTest(LauncherTest):
114 """Tests for Controller Launchers"""
114 """Tests for Controller Launchers"""
115 pass
115 pass
116
116
117 class TestLocalControllerLauncher(ControllerLauncherTest, TestCase):
117 class TestLocalControllerLauncher(ControllerLauncherTest, TestCase):
118 launcher_class = launcher.LocalControllerLauncher
118 launcher_class = launcher.LocalControllerLauncher
119
119
120 class TestMPIControllerLauncher(ControllerLauncherTest, TestCase):
120 class TestMPIControllerLauncher(ControllerLauncherTest, TestCase):
121 launcher_class = launcher.MPIControllerLauncher
121 launcher_class = launcher.MPIControllerLauncher
122
122
123 class TestPBSControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
123 class TestPBSControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
124 launcher_class = launcher.PBSControllerLauncher
124 launcher_class = launcher.PBSControllerLauncher
125
125
126 class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
126 class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
127 launcher_class = launcher.SGEControllerLauncher
127 launcher_class = launcher.SGEControllerLauncher
128
128
129 class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
129 class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
130 launcher_class = launcher.LSFControllerLauncher
130 launcher_class = launcher.LSFControllerLauncher
131
131
132 class TestCondorControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
132 class TestHTCondorControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
133 launcher_class = launcher.CondorControllerLauncher
133 launcher_class = launcher.HTCondorControllerLauncher
134
134
135 class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase):
135 class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase):
136 launcher_class = launcher.SSHControllerLauncher
136 launcher_class = launcher.SSHControllerLauncher
137
137
138 #-------------------------------------------------------------------------------
138 #-------------------------------------------------------------------------------
139 # Engine Set Launcher Tests
139 # Engine Set Launcher Tests
140 #-------------------------------------------------------------------------------
140 #-------------------------------------------------------------------------------
141
141
142 class EngineSetLauncherTest(LauncherTest):
142 class EngineSetLauncherTest(LauncherTest):
143 """Tests for EngineSet launchers"""
143 """Tests for EngineSet launchers"""
144 pass
144 pass
145
145
146 class TestLocalEngineSetLauncher(EngineSetLauncherTest, TestCase):
146 class TestLocalEngineSetLauncher(EngineSetLauncherTest, TestCase):
147 launcher_class = launcher.LocalEngineSetLauncher
147 launcher_class = launcher.LocalEngineSetLauncher
148
148
149 class TestMPIEngineSetLauncher(EngineSetLauncherTest, TestCase):
149 class TestMPIEngineSetLauncher(EngineSetLauncherTest, TestCase):
150 launcher_class = launcher.MPIEngineSetLauncher
150 launcher_class = launcher.MPIEngineSetLauncher
151
151
152 class TestPBSEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
152 class TestPBSEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
153 launcher_class = launcher.PBSEngineSetLauncher
153 launcher_class = launcher.PBSEngineSetLauncher
154
154
155 class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
155 class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
156 launcher_class = launcher.SGEEngineSetLauncher
156 launcher_class = launcher.SGEEngineSetLauncher
157
157
158 class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
158 class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
159 launcher_class = launcher.LSFEngineSetLauncher
159 launcher_class = launcher.LSFEngineSetLauncher
160
160
161 class TestCondorEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
161 class TestHTCondorEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
162 launcher_class = launcher.CondorEngineSetLauncher
162 launcher_class = launcher.HTCondorEngineSetLauncher
163
163
164 class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase):
164 class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase):
165 launcher_class = launcher.SSHEngineSetLauncher
165 launcher_class = launcher.SSHEngineSetLauncher
166
166
167 def test_cluster_id_arg(self):
167 def test_cluster_id_arg(self):
168 raise SkipTest("SSH Launchers don't support cluster ID")
168 raise SkipTest("SSH Launchers don't support cluster ID")
169
169
170 class TestSSHProxyEngineSetLauncher(SSHTest, LauncherTest, TestCase):
170 class TestSSHProxyEngineSetLauncher(SSHTest, LauncherTest, TestCase):
171 launcher_class = launcher.SSHProxyEngineSetLauncher
171 launcher_class = launcher.SSHProxyEngineSetLauncher
172
172
173 class TestSSHEngineLauncher(SSHTest, LauncherTest, TestCase):
173 class TestSSHEngineLauncher(SSHTest, LauncherTest, TestCase):
174 launcher_class = launcher.SSHEngineLauncher
174 launcher_class = launcher.SSHEngineLauncher
175
175
176 #-------------------------------------------------------------------------------
176 #-------------------------------------------------------------------------------
177 # Windows Launcher Tests
177 # Windows Launcher Tests
178 #-------------------------------------------------------------------------------
178 #-------------------------------------------------------------------------------
179
179
180 if sys.platform.startswith("win"):
180 if sys.platform.startswith("win"):
181 class TestWinHPCControllerLauncher(ControllerLauncherTest, TestCase):
181 class TestWinHPCControllerLauncher(ControllerLauncherTest, TestCase):
182 launcher_class = launcher.WindowsHPCControllerLauncher
182 launcher_class = launcher.WindowsHPCControllerLauncher
183
183
184 class TestWinHPCEngineSetLauncher(EngineSetLauncherTest, TestCase):
184 class TestWinHPCEngineSetLauncher(EngineSetLauncherTest, TestCase):
185 launcher_class = launcher.WindowsHPCEngineSetLauncher
185 launcher_class = launcher.WindowsHPCEngineSetLauncher
General Comments 0
You need to be logged in to leave comments. Login now