##// END OF EJS Templates
add cluster_id support to ipcluster/launchers...
MinRK -
Show More
@@ -1,529 +1,525
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 start_help = """Start an IPython cluster for parallel computing
111 start_help = """Start an IPython cluster for parallel computing
112
112
113 Start an ipython cluster by its profile name or cluster
113 Start an ipython cluster by its profile name or cluster
114 directory. Cluster directories contain configuration, log and
114 directory. Cluster directories contain configuration, log and
115 security related files and are named using the convention
115 security related files and are named using the convention
116 'profile_<name>' and should be creating using the 'start'
116 'profile_<name>' and should be creating using the 'start'
117 subcommand of 'ipcluster'. If your cluster directory is in
117 subcommand of 'ipcluster'. If your cluster directory is in
118 the cwd or the ipython directory, you can simply refer to it
118 the cwd or the ipython directory, you can simply refer to it
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 otherwise use the 'profile-dir' option.
120 otherwise use the 'profile-dir' option.
121 """
121 """
122 stop_help = """Stop a running IPython cluster
122 stop_help = """Stop a running IPython cluster
123
123
124 Stop a running ipython cluster by its profile name or cluster
124 Stop a running ipython cluster by its profile name or cluster
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'profile_<name>'. If your cluster directory is in
126 'profile_<name>'. If your cluster directory is in
127 the cwd or the ipython directory, you can simply refer to it
127 the cwd or the ipython directory, you can simply refer to it
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 use the '--profile-dir' option.
129 use the '--profile-dir' option.
130 """
130 """
131 engines_help = """Start engines connected to an existing IPython cluster
131 engines_help = """Start engines connected to an existing IPython cluster
132
132
133 Start one or more engines to connect to an existing Cluster
133 Start one or more engines to connect to an existing Cluster
134 by profile name or cluster directory.
134 by profile name or cluster directory.
135 Cluster directories contain configuration, log and
135 Cluster directories contain configuration, log and
136 security related files and are named using the convention
136 security related files and are named using the convention
137 'profile_<name>' and should be creating using the 'start'
137 'profile_<name>' and should be creating using the 'start'
138 subcommand of 'ipcluster'. If your cluster directory is in
138 subcommand of 'ipcluster'. If your cluster directory is in
139 the cwd or the ipython directory, you can simply refer to it
139 the cwd or the ipython directory, you can simply refer to it
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 otherwise use the 'profile-dir' option.
141 otherwise use the 'profile-dir' option.
142 """
142 """
143 stop_aliases = dict(
143 stop_aliases = dict(
144 signal='IPClusterStop.signal',
144 signal='IPClusterStop.signal',
145 )
145 )
146 stop_aliases.update(base_aliases)
146 stop_aliases.update(base_aliases)
147
147
148 class IPClusterStop(BaseParallelApplication):
148 class IPClusterStop(BaseParallelApplication):
149 name = u'ipcluster'
149 name = u'ipcluster'
150 description = stop_help
150 description = stop_help
151 examples = _stop_examples
151 examples = _stop_examples
152 config_file_name = Unicode(default_config_file_name)
152 config_file_name = Unicode(default_config_file_name)
153
153
154 signal = Int(signal.SIGINT, config=True,
154 signal = Int(signal.SIGINT, config=True,
155 help="signal to use for stopping processes.")
155 help="signal to use for stopping processes.")
156
156
157 aliases = Dict(stop_aliases)
157 aliases = Dict(stop_aliases)
158
158
159 def start(self):
159 def start(self):
160 """Start the app for the stop subcommand."""
160 """Start the app for the stop subcommand."""
161 try:
161 try:
162 pid = self.get_pid_from_file()
162 pid = self.get_pid_from_file()
163 except PIDFileError:
163 except PIDFileError:
164 self.log.critical(
164 self.log.critical(
165 'Could not read pid file, cluster is probably not running.'
165 'Could not read pid file, cluster is probably not running.'
166 )
166 )
167 # Here I exit with a unusual exit status that other processes
167 # Here I exit with a unusual exit status that other processes
168 # can watch for to learn how I existed.
168 # can watch for to learn how I existed.
169 self.remove_pid_file()
169 self.remove_pid_file()
170 self.exit(ALREADY_STOPPED)
170 self.exit(ALREADY_STOPPED)
171
171
172 if not self.check_pid(pid):
172 if not self.check_pid(pid):
173 self.log.critical(
173 self.log.critical(
174 'Cluster [pid=%r] is not running.' % pid
174 'Cluster [pid=%r] is not running.' % pid
175 )
175 )
176 self.remove_pid_file()
176 self.remove_pid_file()
177 # Here I exit with a unusual exit status that other processes
177 # Here I exit with a unusual exit status that other processes
178 # can watch for to learn how I existed.
178 # can watch for to learn how I existed.
179 self.exit(ALREADY_STOPPED)
179 self.exit(ALREADY_STOPPED)
180
180
181 elif os.name=='posix':
181 elif os.name=='posix':
182 sig = self.signal
182 sig = self.signal
183 self.log.info(
183 self.log.info(
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 )
185 )
186 try:
186 try:
187 os.kill(pid, sig)
187 os.kill(pid, sig)
188 except OSError:
188 except OSError:
189 self.log.error("Stopping cluster failed, assuming already dead.",
189 self.log.error("Stopping cluster failed, assuming already dead.",
190 exc_info=True)
190 exc_info=True)
191 self.remove_pid_file()
191 self.remove_pid_file()
192 elif os.name=='nt':
192 elif os.name=='nt':
193 try:
193 try:
194 # kill the whole tree
194 # kill the whole tree
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 except (CalledProcessError, OSError):
196 except (CalledProcessError, OSError):
197 self.log.error("Stopping cluster failed, assuming already dead.",
197 self.log.error("Stopping cluster failed, assuming already dead.",
198 exc_info=True)
198 exc_info=True)
199 self.remove_pid_file()
199 self.remove_pid_file()
200
200
201 engine_aliases = {}
201 engine_aliases = {}
202 engine_aliases.update(base_aliases)
202 engine_aliases.update(base_aliases)
203 engine_aliases.update(dict(
203 engine_aliases.update(dict(
204 n='IPClusterEngines.n',
204 n='IPClusterEngines.n',
205 engines = 'IPClusterEngines.engine_launcher_class',
205 engines = 'IPClusterEngines.engine_launcher_class',
206 daemonize = 'IPClusterEngines.daemonize',
206 daemonize = 'IPClusterEngines.daemonize',
207 ))
207 ))
208 engine_flags = {}
208 engine_flags = {}
209 engine_flags.update(base_flags)
209 engine_flags.update(base_flags)
210
210
211 engine_flags.update(dict(
211 engine_flags.update(dict(
212 daemonize=(
212 daemonize=(
213 {'IPClusterEngines' : {'daemonize' : True}},
213 {'IPClusterEngines' : {'daemonize' : True}},
214 """run the cluster into the background (not available on Windows)""",
214 """run the cluster into the background (not available on Windows)""",
215 )
215 )
216 ))
216 ))
217 class IPClusterEngines(BaseParallelApplication):
217 class IPClusterEngines(BaseParallelApplication):
218
218
219 name = u'ipcluster'
219 name = u'ipcluster'
220 description = engines_help
220 description = engines_help
221 examples = _engines_examples
221 examples = _engines_examples
222 usage = None
222 usage = None
223 config_file_name = Unicode(default_config_file_name)
223 config_file_name = Unicode(default_config_file_name)
224 default_log_level = logging.INFO
224 default_log_level = logging.INFO
225 classes = List()
225 classes = List()
226 def _classes_default(self):
226 def _classes_default(self):
227 from IPython.parallel.apps import launcher
227 from IPython.parallel.apps import launcher
228 launchers = launcher.all_launchers
228 launchers = launcher.all_launchers
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 return [ProfileDir]+eslaunchers
230 return [ProfileDir]+eslaunchers
231
231
232 n = Int(num_cpus(), config=True,
232 n = Int(num_cpus(), config=True,
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 config=True,
237 config=True,
238 help="""The class for launching a set of Engines. Change this value
238 help="""The class for launching a set of Engines. Change this value
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 Each launcher class has its own set of configuration options, for making sure
240 Each launcher class has its own set of configuration options, for making sure
241 it will work in your environment.
241 it will work in your environment.
242
242
243 You can also write your own launcher, and specify it's absolute import path,
243 You can also write your own launcher, and specify it's absolute import path,
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245
245
246 Examples include:
246 Examples include:
247
247
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 SSHEngineSetLauncher : use SSH to start the controller
252 SSHEngineSetLauncher : use SSH to start the controller
253 Note that SSH does *not* move the connection files
253 Note that SSH does *not* move the connection files
254 around, so you will likely have to do this manually
254 around, so you will likely have to do this manually
255 unless the machines are on a shared file system.
255 unless the machines are on a shared file system.
256 WindowsHPCEngineSetLauncher : use Windows HPC
256 WindowsHPCEngineSetLauncher : use Windows HPC
257 """
257 """
258 )
258 )
259 daemonize = Bool(False, config=True,
259 daemonize = Bool(False, config=True,
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 Not available on Windows.
261 Not available on Windows.
262 """)
262 """)
263
263
264 def _daemonize_changed(self, name, old, new):
264 def _daemonize_changed(self, name, old, new):
265 if new:
265 if new:
266 self.log_to_file = True
266 self.log_to_file = True
267
267
268 aliases = Dict(engine_aliases)
268 aliases = Dict(engine_aliases)
269 flags = Dict(engine_flags)
269 flags = Dict(engine_flags)
270 _stopping = False
270 _stopping = False
271
271
272 def initialize(self, argv=None):
272 def initialize(self, argv=None):
273 super(IPClusterEngines, self).initialize(argv)
273 super(IPClusterEngines, self).initialize(argv)
274 self.init_signal()
274 self.init_signal()
275 self.init_launchers()
275 self.init_launchers()
276
276
277 def init_launchers(self):
277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280
280
281 def init_signal(self):
281 def init_signal(self):
282 # Setup signals
282 # Setup signals
283 signal.signal(signal.SIGINT, self.sigint_handler)
283 signal.signal(signal.SIGINT, self.sigint_handler)
284
284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname):
286 """import and instantiate a Launcher based on importstring"""
286 """import and instantiate a Launcher based on importstring"""
287 if '.' not in clsname:
287 if '.' not in clsname:
288 # not a module, presume it's the raw name in apps.launcher
288 # not a module, presume it's the raw name in apps.launcher
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
290 # print repr(clsname)
291 try:
291 try:
292 klass = import_item(clsname)
292 klass = import_item(clsname)
293 except (ImportError, KeyError):
293 except (ImportError, KeyError):
294 self.log.fatal("Could not import launcher class: %r"%clsname)
294 self.log.fatal("Could not import launcher class: %r"%clsname)
295 self.exit(1)
295 self.exit(1)
296
296
297 launcher = klass(
297 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
298 work_dir=u'.', config=self.config, log=self.log,
299 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 )
300 )
300 return launcher
301 return launcher
301
302
302 def start_engines(self):
303 def start_engines(self):
303 self.log.info("Starting %i engines"%self.n)
304 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
305 self.engine_launcher.start(self.n)
305 self.n,
306 self.profile_dir.location
307 )
308
306
309 def stop_engines(self):
307 def stop_engines(self):
310 self.log.info("Stopping Engines...")
308 self.log.info("Stopping Engines...")
311 if self.engine_launcher.running:
309 if self.engine_launcher.running:
312 d = self.engine_launcher.stop()
310 d = self.engine_launcher.stop()
313 return d
311 return d
314 else:
312 else:
315 return None
313 return None
316
314
317 def stop_launchers(self, r=None):
315 def stop_launchers(self, r=None):
318 if not self._stopping:
316 if not self._stopping:
319 self._stopping = True
317 self._stopping = True
320 self.log.error("IPython cluster: stopping")
318 self.log.error("IPython cluster: stopping")
321 self.stop_engines()
319 self.stop_engines()
322 # Wait a few seconds to let things shut down.
320 # Wait a few seconds to let things shut down.
323 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
321 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 dc.start()
322 dc.start()
325
323
326 def sigint_handler(self, signum, frame):
324 def sigint_handler(self, signum, frame):
327 self.log.debug("SIGINT received, stopping launchers...")
325 self.log.debug("SIGINT received, stopping launchers...")
328 self.stop_launchers()
326 self.stop_launchers()
329
327
330 def start_logging(self):
328 def start_logging(self):
331 # Remove old log files of the controller and engine
329 # Remove old log files of the controller and engine
332 if self.clean_logs:
330 if self.clean_logs:
333 log_dir = self.profile_dir.log_dir
331 log_dir = self.profile_dir.log_dir
334 for f in os.listdir(log_dir):
332 for f in os.listdir(log_dir):
335 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
333 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 os.remove(os.path.join(log_dir, f))
334 os.remove(os.path.join(log_dir, f))
337 # This will remove old log files for ipcluster itself
335 # This will remove old log files for ipcluster itself
338 # super(IPBaseParallelApplication, self).start_logging()
336 # super(IPBaseParallelApplication, self).start_logging()
339
337
340 def start(self):
338 def start(self):
341 """Start the app for the engines subcommand."""
339 """Start the app for the engines subcommand."""
342 self.log.info("IPython cluster: started")
340 self.log.info("IPython cluster: started")
343 # First see if the cluster is already running
341 # First see if the cluster is already running
344
342
345 # Now log and daemonize
343 # Now log and daemonize
346 self.log.info(
344 self.log.info(
347 'Starting engines with [daemon=%r]' % self.daemonize
345 'Starting engines with [daemon=%r]' % self.daemonize
348 )
346 )
349 # TODO: Get daemonize working on Windows or as a Windows Server.
347 # TODO: Get daemonize working on Windows or as a Windows Server.
350 if self.daemonize:
348 if self.daemonize:
351 if os.name=='posix':
349 if os.name=='posix':
352 daemonize()
350 daemonize()
353
351
354 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
352 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 dc.start()
353 dc.start()
356 # Now write the new pid file AFTER our new forked pid is active.
354 # Now write the new pid file AFTER our new forked pid is active.
357 # self.write_pid_file()
355 # self.write_pid_file()
358 try:
356 try:
359 self.loop.start()
357 self.loop.start()
360 except KeyboardInterrupt:
358 except KeyboardInterrupt:
361 pass
359 pass
362 except zmq.ZMQError as e:
360 except zmq.ZMQError as e:
363 if e.errno == errno.EINTR:
361 if e.errno == errno.EINTR:
364 pass
362 pass
365 else:
363 else:
366 raise
364 raise
367
365
368 start_aliases = {}
366 start_aliases = {}
369 start_aliases.update(engine_aliases)
367 start_aliases.update(engine_aliases)
370 start_aliases.update(dict(
368 start_aliases.update(dict(
371 delay='IPClusterStart.delay',
369 delay='IPClusterStart.delay',
372 controller = 'IPClusterStart.controller_launcher_class',
370 controller = 'IPClusterStart.controller_launcher_class',
373 ))
371 ))
374 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
372 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375
373
376 # set inherited Start keys directly, to ensure command-line args get higher priority
374 # set inherited Start keys directly, to ensure command-line args get higher priority
377 # than config file options.
375 # than config file options.
378 for key,value in start_aliases.items():
376 for key,value in start_aliases.items():
379 if value.startswith('IPClusterEngines'):
377 if value.startswith('IPClusterEngines'):
380 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
378 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381
379
382 class IPClusterStart(IPClusterEngines):
380 class IPClusterStart(IPClusterEngines):
383
381
384 name = u'ipcluster'
382 name = u'ipcluster'
385 description = start_help
383 description = start_help
386 examples = _start_examples
384 examples = _start_examples
387 default_log_level = logging.INFO
385 default_log_level = logging.INFO
388 auto_create = Bool(True, config=True,
386 auto_create = Bool(True, config=True,
389 help="whether to create the profile_dir if it doesn't exist")
387 help="whether to create the profile_dir if it doesn't exist")
390 classes = List()
388 classes = List()
391 def _classes_default(self,):
389 def _classes_default(self,):
392 from IPython.parallel.apps import launcher
390 from IPython.parallel.apps import launcher
393 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
391 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394
392
395 clean_logs = Bool(True, config=True,
393 clean_logs = Bool(True, config=True,
396 help="whether to cleanup old logs before starting")
394 help="whether to cleanup old logs before starting")
397
395
398 delay = CFloat(1., config=True,
396 delay = CFloat(1., config=True,
399 help="delay (in s) between starting the controller and the engines")
397 help="delay (in s) between starting the controller and the engines")
400
398
401 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
399 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 config=True,
400 config=True,
403 helep="""The class for launching a Controller. Change this value if you want
401 helep="""The class for launching a Controller. Change this value if you want
404 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
402 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405
403
406 Each launcher class has its own set of configuration options, for making sure
404 Each launcher class has its own set of configuration options, for making sure
407 it will work in your environment.
405 it will work in your environment.
408
406
409 Examples include:
407 Examples include:
410
408
411 LocalControllerLauncher : start engines locally as subprocesses
409 LocalControllerLauncher : start engines locally as subprocesses
412 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
410 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
411 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
412 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 SSHControllerLauncher : use SSH to start the controller
413 SSHControllerLauncher : use SSH to start the controller
416 WindowsHPCControllerLauncher : use Windows HPC
414 WindowsHPCControllerLauncher : use Windows HPC
417 """
415 """
418 )
416 )
419 reset = Bool(False, config=True,
417 reset = Bool(False, config=True,
420 help="Whether to reset config files as part of '--create'."
418 help="Whether to reset config files as part of '--create'."
421 )
419 )
422
420
423 # flags = Dict(flags)
421 # flags = Dict(flags)
424 aliases = Dict(start_aliases)
422 aliases = Dict(start_aliases)
425
423
426 def init_launchers(self):
424 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
425 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
426 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 self.controller_launcher.on_stop(self.stop_launchers)
427 self.controller_launcher.on_stop(self.stop_launchers)
430
428
431 def start_controller(self):
429 def start_controller(self):
432 self.controller_launcher.start(
430 self.controller_launcher.start()
433 self.profile_dir.location
434 )
435
431
436 def stop_controller(self):
432 def stop_controller(self):
437 # self.log.info("In stop_controller")
433 # self.log.info("In stop_controller")
438 if self.controller_launcher and self.controller_launcher.running:
434 if self.controller_launcher and self.controller_launcher.running:
439 return self.controller_launcher.stop()
435 return self.controller_launcher.stop()
440
436
441 def stop_launchers(self, r=None):
437 def stop_launchers(self, r=None):
442 if not self._stopping:
438 if not self._stopping:
443 self.stop_controller()
439 self.stop_controller()
444 super(IPClusterStart, self).stop_launchers()
440 super(IPClusterStart, self).stop_launchers()
445
441
446 def start(self):
442 def start(self):
447 """Start the app for the start subcommand."""
443 """Start the app for the start subcommand."""
448 # First see if the cluster is already running
444 # First see if the cluster is already running
449 try:
445 try:
450 pid = self.get_pid_from_file()
446 pid = self.get_pid_from_file()
451 except PIDFileError:
447 except PIDFileError:
452 pass
448 pass
453 else:
449 else:
454 if self.check_pid(pid):
450 if self.check_pid(pid):
455 self.log.critical(
451 self.log.critical(
456 'Cluster is already running with [pid=%s]. '
452 'Cluster is already running with [pid=%s]. '
457 'use "ipcluster stop" to stop the cluster.' % pid
453 'use "ipcluster stop" to stop the cluster.' % pid
458 )
454 )
459 # Here I exit with a unusual exit status that other processes
455 # Here I exit with a unusual exit status that other processes
460 # can watch for to learn how I existed.
456 # can watch for to learn how I existed.
461 self.exit(ALREADY_STARTED)
457 self.exit(ALREADY_STARTED)
462 else:
458 else:
463 self.remove_pid_file()
459 self.remove_pid_file()
464
460
465
461
466 # Now log and daemonize
462 # Now log and daemonize
467 self.log.info(
463 self.log.info(
468 'Starting ipcluster with [daemon=%r]' % self.daemonize
464 'Starting ipcluster with [daemon=%r]' % self.daemonize
469 )
465 )
470 # TODO: Get daemonize working on Windows or as a Windows Server.
466 # TODO: Get daemonize working on Windows or as a Windows Server.
471 if self.daemonize:
467 if self.daemonize:
472 if os.name=='posix':
468 if os.name=='posix':
473 daemonize()
469 daemonize()
474
470
475 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
471 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
476 dc.start()
472 dc.start()
477 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
473 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
478 dc.start()
474 dc.start()
479 # Now write the new pid file AFTER our new forked pid is active.
475 # Now write the new pid file AFTER our new forked pid is active.
480 self.write_pid_file()
476 self.write_pid_file()
481 try:
477 try:
482 self.loop.start()
478 self.loop.start()
483 except KeyboardInterrupt:
479 except KeyboardInterrupt:
484 pass
480 pass
485 except zmq.ZMQError as e:
481 except zmq.ZMQError as e:
486 if e.errno == errno.EINTR:
482 if e.errno == errno.EINTR:
487 pass
483 pass
488 else:
484 else:
489 raise
485 raise
490 finally:
486 finally:
491 self.remove_pid_file()
487 self.remove_pid_file()
492
488
493 base='IPython.parallel.apps.ipclusterapp.IPCluster'
489 base='IPython.parallel.apps.ipclusterapp.IPCluster'
494
490
495 class IPClusterApp(Application):
491 class IPClusterApp(Application):
496 name = u'ipcluster'
492 name = u'ipcluster'
497 description = _description
493 description = _description
498 examples = _main_examples
494 examples = _main_examples
499
495
500 subcommands = {
496 subcommands = {
501 'start' : (base+'Start', start_help),
497 'start' : (base+'Start', start_help),
502 'stop' : (base+'Stop', stop_help),
498 'stop' : (base+'Stop', stop_help),
503 'engines' : (base+'Engines', engines_help),
499 'engines' : (base+'Engines', engines_help),
504 }
500 }
505
501
506 # no aliases or flags for parent App
502 # no aliases or flags for parent App
507 aliases = Dict()
503 aliases = Dict()
508 flags = Dict()
504 flags = Dict()
509
505
510 def start(self):
506 def start(self):
511 if self.subapp is None:
507 if self.subapp is None:
512 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
508 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
513 print
509 print
514 self.print_description()
510 self.print_description()
515 self.print_subcommands()
511 self.print_subcommands()
516 self.exit(1)
512 self.exit(1)
517 else:
513 else:
518 return self.subapp.start()
514 return self.subapp.start()
519
515
520 def launch_new_instance():
516 def launch_new_instance():
521 """Create and run the IPython cluster."""
517 """Create and run the IPython cluster."""
522 app = IPClusterApp.instance()
518 app = IPClusterApp.instance()
523 app.initialize()
519 app.initialize()
524 app.start()
520 app.start()
525
521
526
522
527 if __name__ == '__main__':
523 if __name__ == '__main__':
528 launch_new_instance()
524 launch_new_instance()
529
525
@@ -1,1152 +1,1169
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
65
64 from .win32support import forward_read_events
66 from .win32support import forward_read_events
65
67
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67
69
68 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
69
71
70 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
71 # Paths to the kernel apps
73 # Paths to the kernel apps
72 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
73
75
74
76
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
77 ))
79 ))
78
80
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
81 ))
83 ))
82
84
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
85 ))
87 ))
86
88
87 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
88 # Base launchers and errors
90 # Base launchers and errors
89 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
90
92
91
93
92 class LauncherError(Exception):
94 class LauncherError(Exception):
93 pass
95 pass
94
96
95
97
96 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
97 pass
99 pass
98
100
99
101
100 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
101 pass
103 pass
102
104
103
105
104 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
105 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
106
108
107 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
108 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
109 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
110 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
111 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
112 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
113 # the work_dir option.
115 # the work_dir option.
114 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116
118
117 start_data = Any()
119 start_data = Any()
118 stop_data = Any()
120 stop_data = Any()
119
121
120 def _loop_default(self):
122 def _loop_default(self):
121 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
122
124
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
126 self.stop_callbacks = []
128 self.stop_callbacks = []
127 self.start_data = None
129 self.start_data = None
128 self.stop_data = None
130 self.stop_data = None
129
131
130 @property
132 @property
131 def args(self):
133 def args(self):
132 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
133
135
134 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
135 will be the process name.
137 will be the process name.
136 """
138 """
137 return self.find_args()
139 return self.find_args()
138
140
139 def find_args(self):
141 def find_args(self):
140 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
141
143
142 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
143 """
145 """
144 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
145
147
146 @property
148 @property
147 def arg_str(self):
149 def arg_str(self):
148 """The string form of the program arguments."""
150 """The string form of the program arguments."""
149 return ' '.join(self.args)
151 return ' '.join(self.args)
150
152
151 @property
153 @property
152 def running(self):
154 def running(self):
153 """Am I running."""
155 """Am I running."""
154 if self.state == 'running':
156 if self.state == 'running':
155 return True
157 return True
156 else:
158 else:
157 return False
159 return False
158
160
159 def start(self):
161 def start(self):
160 """Start the process."""
162 """Start the process."""
161 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
162
164
163 def stop(self):
165 def stop(self):
164 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
165
167
166 This method will return None immediately.
168 This method will return None immediately.
167 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
168 """
170 """
169 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
170
172
171 def on_stop(self, f):
173 def on_stop(self, f):
172 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
173 when the process actually finishes.
175 when the process actually finishes.
174 """
176 """
175 if self.state=='after':
177 if self.state=='after':
176 return f(self.stop_data)
178 return f(self.stop_data)
177 else:
179 else:
178 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
179
181
180 def notify_start(self, data):
182 def notify_start(self, data):
181 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
182
184
183 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
185 """
187 """
186
188
187 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
190 self.start_data = data
189 self.state = 'running'
191 self.state = 'running'
190 return data
192 return data
191
193
192 def notify_stop(self, data):
194 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
194
196
195 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
197
199
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
201 self.stop_data = data
200 self.state = 'after'
202 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
203 d(data)
205 d(data)
204 return data
206 return data
205
207
206 def signal(self, sig):
208 def signal(self, sig):
207 """Signal the process.
209 """Signal the process.
208
210
209 Parameters
211 Parameters
210 ----------
212 ----------
211 sig : str or int
213 sig : str or int
212 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
213 """
215 """
214 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
215
217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216
245
217 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
218 # Local process launchers
247 # Local process launchers
219 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
220
249
221
250
222 class LocalProcessLauncher(BaseLauncher):
251 class LocalProcessLauncher(BaseLauncher):
223 """Start and stop an external process in an asynchronous manner.
252 """Start and stop an external process in an asynchronous manner.
224
253
225 This will launch the external process with a working directory of
254 This will launch the external process with a working directory of
226 ``self.work_dir``.
255 ``self.work_dir``.
227 """
256 """
228
257
229 # This is used to to construct self.args, which is passed to
258 # This is used to to construct self.args, which is passed to
230 # spawnProcess.
259 # spawnProcess.
231 cmd_and_args = List([])
260 cmd_and_args = List([])
232 poll_frequency = Int(100) # in ms
261 poll_frequency = Int(100) # in ms
233
262
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 super(LocalProcessLauncher, self).__init__(
264 super(LocalProcessLauncher, self).__init__(
236 work_dir=work_dir, config=config, **kwargs
265 work_dir=work_dir, config=config, **kwargs
237 )
266 )
238 self.process = None
267 self.process = None
239 self.poller = None
268 self.poller = None
240
269
241 def find_args(self):
270 def find_args(self):
242 return self.cmd_and_args
271 return self.cmd_and_args
243
272
244 def start(self):
273 def start(self):
245 if self.state == 'before':
274 if self.state == 'before':
246 self.process = Popen(self.args,
275 self.process = Popen(self.args,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 env=os.environ,
277 env=os.environ,
249 cwd=self.work_dir
278 cwd=self.work_dir
250 )
279 )
251 if WINDOWS:
280 if WINDOWS:
252 self.stdout = forward_read_events(self.process.stdout)
281 self.stdout = forward_read_events(self.process.stdout)
253 self.stderr = forward_read_events(self.process.stderr)
282 self.stderr = forward_read_events(self.process.stderr)
254 else:
283 else:
255 self.stdout = self.process.stdout.fileno()
284 self.stdout = self.process.stdout.fileno()
256 self.stderr = self.process.stderr.fileno()
285 self.stderr = self.process.stderr.fileno()
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
260 self.poller.start()
289 self.poller.start()
261 self.notify_start(self.process.pid)
290 self.notify_start(self.process.pid)
262 else:
291 else:
263 s = 'The process was already started and has state: %r' % self.state
292 s = 'The process was already started and has state: %r' % self.state
264 raise ProcessStateError(s)
293 raise ProcessStateError(s)
265
294
266 def stop(self):
295 def stop(self):
267 return self.interrupt_then_kill()
296 return self.interrupt_then_kill()
268
297
269 def signal(self, sig):
298 def signal(self, sig):
270 if self.state == 'running':
299 if self.state == 'running':
271 if WINDOWS and sig != SIGINT:
300 if WINDOWS and sig != SIGINT:
272 # use Windows tree-kill for better child cleanup
301 # use Windows tree-kill for better child cleanup
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
274 else:
303 else:
275 self.process.send_signal(sig)
304 self.process.send_signal(sig)
276
305
277 def interrupt_then_kill(self, delay=2.0):
306 def interrupt_then_kill(self, delay=2.0):
278 """Send INT, wait a delay and then send KILL."""
307 """Send INT, wait a delay and then send KILL."""
279 try:
308 try:
280 self.signal(SIGINT)
309 self.signal(SIGINT)
281 except Exception:
310 except Exception:
282 self.log.debug("interrupt failed")
311 self.log.debug("interrupt failed")
283 pass
312 pass
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
285 self.killer.start()
314 self.killer.start()
286
315
287 # callbacks, etc:
316 # callbacks, etc:
288
317
289 def handle_stdout(self, fd, events):
318 def handle_stdout(self, fd, events):
290 if WINDOWS:
319 if WINDOWS:
291 line = self.stdout.recv()
320 line = self.stdout.recv()
292 else:
321 else:
293 line = self.process.stdout.readline()
322 line = self.process.stdout.readline()
294 # a stopped process will be readable but return empty strings
323 # a stopped process will be readable but return empty strings
295 if line:
324 if line:
296 self.log.info(line[:-1])
325 self.log.info(line[:-1])
297 else:
326 else:
298 self.poll()
327 self.poll()
299
328
300 def handle_stderr(self, fd, events):
329 def handle_stderr(self, fd, events):
301 if WINDOWS:
330 if WINDOWS:
302 line = self.stderr.recv()
331 line = self.stderr.recv()
303 else:
332 else:
304 line = self.process.stderr.readline()
333 line = self.process.stderr.readline()
305 # a stopped process will be readable but return empty strings
334 # a stopped process will be readable but return empty strings
306 if line:
335 if line:
307 self.log.error(line[:-1])
336 self.log.error(line[:-1])
308 else:
337 else:
309 self.poll()
338 self.poll()
310
339
311 def poll(self):
340 def poll(self):
312 status = self.process.poll()
341 status = self.process.poll()
313 if status is not None:
342 if status is not None:
314 self.poller.stop()
343 self.poller.stop()
315 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stdout)
316 self.loop.remove_handler(self.stderr)
345 self.loop.remove_handler(self.stderr)
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 return status
347 return status
319
348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
322
351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 def find_args(self):
352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331
354
332 def start(self, profile_dir):
355 def start(self):
333 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
338
359
339
360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
342
363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 def find_args(self):
364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358
366
359
367
360 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
361 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
362
370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
371 process flood when starting many engines."""
374 process flood when starting many engines."""
372 )
375 )
373
376
374 # launcher class
377 # launcher class
375 launcher_class = LocalEngineLauncher
378 launcher_class = LocalEngineLauncher
376
379
377 launchers = Dict()
380 launchers = Dict()
378 stop_data = Dict()
381 stop_data = Dict()
379
382
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 super(LocalEngineSetLauncher, self).__init__(
384 super(LocalEngineSetLauncher, self).__init__(
382 work_dir=work_dir, config=config, **kwargs
385 work_dir=work_dir, config=config, **kwargs
383 )
386 )
384 self.stop_data = {}
387 self.stop_data = {}
385
388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 dlist = []
391 dlist = []
390 for i in range(n):
392 for i in range(n):
391 if i > 0:
393 if i > 0:
392 time.sleep(self.delay)
394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 if i==0:
404 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
406 self.launchers[i] = el
401 dlist.append(d)
407 dlist.append(d)
402 self.notify_start(dlist)
408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
409 return dlist
407
410
408 def find_args(self):
411 def find_args(self):
409 return ['engine set']
412 return ['engine set']
410
413
411 def signal(self, sig):
414 def signal(self, sig):
412 dlist = []
415 dlist = []
413 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
414 d = el.signal(sig)
417 d = el.signal(sig)
415 dlist.append(d)
418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
419 return dlist
418
420
419 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
422 dlist = []
421 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
426 return dlist
426
427
427 def stop(self):
428 def stop(self):
428 return self.interrupt_then_kill()
429 return self.interrupt_then_kill()
429
430
430 def _notice_engine_stopped(self, data):
431 def _notice_engine_stopped(self, data):
431 pid = data['pid']
432 pid = data['pid']
432 for idx,el in self.launchers.iteritems():
433 for idx,el in self.launchers.iteritems():
433 if el.process.pid == pid:
434 if el.process.pid == pid:
434 break
435 break
435 self.launchers.pop(idx)
436 self.launchers.pop(idx)
436 self.stop_data[idx] = data
437 self.stop_data[idx] = data
437 if not self.launchers:
438 if not self.launchers:
438 self.notify_stop(self.stop_data)
439 self.notify_stop(self.stop_data)
439
440
440
441
441 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
442 # MPIExec launchers
443 # MPIExec launchers
443 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
444
445
445
446
446 class MPIExecLauncher(LocalProcessLauncher):
447 class MPIExecLauncher(LocalProcessLauncher):
447 """Launch an external process using mpiexec."""
448 """Launch an external process using mpiexec."""
448
449
449 mpi_cmd = List(['mpiexec'], config=True,
450 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
451 help="The mpiexec command to use in starting the process."
451 )
452 )
452 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
454 )
455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 help="The command line argument to the program."
459 help="The command line argument to the program."
459 )
460 )
460 n = Int(1)
461 n = Int(1)
461
462
462 def find_args(self):
463 def find_args(self):
463 """Build self.args using all the fields."""
464 """Build self.args using all the fields."""
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 self.program + self.program_args
466 self.program + self.program_args
466
467
467 def start(self, n):
468 def start(self, n):
468 """Start n instances of the program using mpiexec."""
469 """Start n instances of the program using mpiexec."""
469 self.n = n
470 self.n = n
470 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
471
472
472
473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
475
476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 help="Popen command to launch the Contropper"
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 )
479 # than *some* having `program_args` and others `controller_args`
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 @property
480 help="Command line arguments to pass to ipcontroller."
481 def program(self):
481 )
482 return self.controller_cmd
482 n = Int(1)
483
483
484 def start(self, profile_dir):
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
487
488 def start(self):
485 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
490
492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
493
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
495
496
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
500 @property
501 def program(self):
502 return self.engine_cmd
497
503
498 program = List(ipengine_cmd_argv, config=True,
504 @property
499 help="Popen command for ipengine"
505 def program_args(self):
500 )
506 return self.cluster_args + self.engine_args
501 program_args = List(
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
504 )
505 n = Int(1)
506
507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
510 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
514
513
515 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
516 # SSH launchers
515 # SSH launchers
517 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
518
517
519 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
520
519
521 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
522 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
523
522
524 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
525 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
526 as well.
525 as well.
527 """
526 """
528
527
529 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
529 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 help="Program to launch via ssh")
533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 help="args to pass to remote program")
535 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
539 user = Unicode('', config=True,
538 user = Unicode('', config=True,
540 help="username for ssh")
539 help="username for ssh")
541 location = Unicode('', config=True,
540 location = Unicode('', config=True,
542 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
543
542
544 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
545 if self.user:
544 if self.user:
546 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
547 else:
546 else:
548 self.location = new
547 self.location = new
549
548
550 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
551 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
552
551
553 def find_args(self):
552 def find_args(self):
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
554 self.program + self.program_args
556
555
557 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
557 if hostname is not None:
560 self.hostname = hostname
558 self.hostname = hostname
561 if user is not None:
559 if user is not None:
562 self.user = user
560 self.user = user
563
561
564 return super(SSHLauncher, self).start()
562 return super(SSHLauncher, self).start()
565
563
566 def signal(self, sig):
564 def signal(self, sig):
567 if self.state == 'running':
565 if self.state == 'running':
568 # send escaped ssh connection-closer
566 # send escaped ssh connection-closer
569 self.process.stdin.write('~.')
567 self.process.stdin.write('~.')
570 self.process.stdin.flush()
568 self.process.stdin.flush()
571
569
572
570
573
571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
575
573
576 program = List(ipcontroller_cmd_argv, config=True,
574 # alias back to *non-configurable* program[_args] for use in find_args()
577 help="remote ipcontroller command.")
575 # this way all Controller/EngineSetLaunchers have the same form, rather
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
576 # than *some* having `program_args` and others `controller_args`
579 help="Command line arguments to ipcontroller.")
577 @property
578 def program(self):
579 return self.controller_cmd
580
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
584
585
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
591 @property
592 def program(self):
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
581
598
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
586 program_args = List(
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
589 )
590
599
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
593 engines = Dict(config=True,
602 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
596
605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
600 """
609 """
601
610
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
611 dlist = []
604 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
606 n, args = n
614 n, args = n
607 else:
615 else:
608 args = copy.deepcopy(self.engine_args)
616 args = copy.deepcopy(self.engine_args)
609
617
610 if '@' in host:
618 if '@' in host:
611 user,host = host.split('@',1)
619 user,host = host.split('@',1)
612 else:
620 else:
613 user=None
621 user=None
614 for i in range(n):
622 for i in range(n):
615 if i > 0:
623 if i > 0:
616 time.sleep(self.delay)
624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618
628
619 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
620 i
630 el.engine_cmd = self.engine_cmd
621 el.program_args = args
631 el.engine_args = args
622 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 if i==0:
634 if i==0:
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 self.launchers[host+str(i)] = el
636 self.launchers[host+str(i)] = el
627 dlist.append(d)
637 dlist.append(d)
628 self.notify_start(dlist)
638 self.notify_start(dlist)
629 return dlist
639 return dlist
630
640
631
641
632
642
633 #-----------------------------------------------------------------------------
643 #-----------------------------------------------------------------------------
634 # Windows HPC Server 2008 scheduler launchers
644 # Windows HPC Server 2008 scheduler launchers
635 #-----------------------------------------------------------------------------
645 #-----------------------------------------------------------------------------
636
646
637
647
638 # This is only used on Windows.
648 # This is only used on Windows.
639 def find_job_cmd():
649 def find_job_cmd():
640 if WINDOWS:
650 if WINDOWS:
641 try:
651 try:
642 return find_cmd('job')
652 return find_cmd('job')
643 except (FindCmdError, ImportError):
653 except (FindCmdError, ImportError):
644 # ImportError will be raised if win32api is not installed
654 # ImportError will be raised if win32api is not installed
645 return 'job'
655 return 'job'
646 else:
656 else:
647 return 'job'
657 return 'job'
648
658
649
659
650 class WindowsHPCLauncher(BaseLauncher):
660 class WindowsHPCLauncher(BaseLauncher):
651
661
652 job_id_regexp = Unicode(r'\d+', config=True,
662 job_id_regexp = Unicode(r'\d+', config=True,
653 help="""A regular expression used to get the job id from the output of the
663 help="""A regular expression used to get the job id from the output of the
654 submit_command. """
664 submit_command. """
655 )
665 )
656 job_file_name = Unicode(u'ipython_job.xml', config=True,
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
657 help="The filename of the instantiated job script.")
667 help="The filename of the instantiated job script.")
658 # The full path to the instantiated job script. This gets made dynamically
668 # The full path to the instantiated job script. This gets made dynamically
659 # by combining the work_dir with the job_file_name.
669 # by combining the work_dir with the job_file_name.
660 job_file = Unicode(u'')
670 job_file = Unicode(u'')
661 scheduler = Unicode('', config=True,
671 scheduler = Unicode('', config=True,
662 help="The hostname of the scheduler to submit the job to.")
672 help="The hostname of the scheduler to submit the job to.")
663 job_cmd = Unicode(find_job_cmd(), config=True,
673 job_cmd = Unicode(find_job_cmd(), config=True,
664 help="The command for submitting jobs.")
674 help="The command for submitting jobs.")
665
675
666 def __init__(self, work_dir=u'.', config=None, **kwargs):
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
667 super(WindowsHPCLauncher, self).__init__(
677 super(WindowsHPCLauncher, self).__init__(
668 work_dir=work_dir, config=config, **kwargs
678 work_dir=work_dir, config=config, **kwargs
669 )
679 )
670
680
671 @property
681 @property
672 def job_file(self):
682 def job_file(self):
673 return os.path.join(self.work_dir, self.job_file_name)
683 return os.path.join(self.work_dir, self.job_file_name)
674
684
675 def write_job_file(self, n):
685 def write_job_file(self, n):
676 raise NotImplementedError("Implement write_job_file in a subclass.")
686 raise NotImplementedError("Implement write_job_file in a subclass.")
677
687
678 def find_args(self):
688 def find_args(self):
679 return [u'job.exe']
689 return [u'job.exe']
680
690
681 def parse_job_id(self, output):
691 def parse_job_id(self, output):
682 """Take the output of the submit command and return the job id."""
692 """Take the output of the submit command and return the job id."""
683 m = re.search(self.job_id_regexp, output)
693 m = re.search(self.job_id_regexp, output)
684 if m is not None:
694 if m is not None:
685 job_id = m.group()
695 job_id = m.group()
686 else:
696 else:
687 raise LauncherError("Job id couldn't be determined: %s" % output)
697 raise LauncherError("Job id couldn't be determined: %s" % output)
688 self.job_id = job_id
698 self.job_id = job_id
689 self.log.info('Job started with job id: %r' % job_id)
699 self.log.info('Job started with job id: %r' % job_id)
690 return job_id
700 return job_id
691
701
692 def start(self, n):
702 def start(self, n):
693 """Start n copies of the process using the Win HPC job scheduler."""
703 """Start n copies of the process using the Win HPC job scheduler."""
694 self.write_job_file(n)
704 self.write_job_file(n)
695 args = [
705 args = [
696 'submit',
706 'submit',
697 '/jobfile:%s' % self.job_file,
707 '/jobfile:%s' % self.job_file,
698 '/scheduler:%s' % self.scheduler
708 '/scheduler:%s' % self.scheduler
699 ]
709 ]
700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
701
711
702 output = check_output([self.job_cmd]+args,
712 output = check_output([self.job_cmd]+args,
703 env=os.environ,
713 env=os.environ,
704 cwd=self.work_dir,
714 cwd=self.work_dir,
705 stderr=STDOUT
715 stderr=STDOUT
706 )
716 )
707 job_id = self.parse_job_id(output)
717 job_id = self.parse_job_id(output)
708 self.notify_start(job_id)
718 self.notify_start(job_id)
709 return job_id
719 return job_id
710
720
711 def stop(self):
721 def stop(self):
712 args = [
722 args = [
713 'cancel',
723 'cancel',
714 self.job_id,
724 self.job_id,
715 '/scheduler:%s' % self.scheduler
725 '/scheduler:%s' % self.scheduler
716 ]
726 ]
717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
718 try:
728 try:
719 output = check_output([self.job_cmd]+args,
729 output = check_output([self.job_cmd]+args,
720 env=os.environ,
730 env=os.environ,
721 cwd=self.work_dir,
731 cwd=self.work_dir,
722 stderr=STDOUT
732 stderr=STDOUT
723 )
733 )
724 except:
734 except:
725 output = 'The job already appears to be stoppped: %r' % self.job_id
735 output = 'The job already appears to be stoppped: %r' % self.job_id
726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
727 return output
737 return output
728
738
729
739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731
741
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
736
746
737 def write_job_file(self, n):
747 def write_job_file(self, n):
738 job = IPControllerJob(config=self.config)
748 job = IPControllerJob(config=self.config)
739
749
740 t = IPControllerTask(config=self.config)
750 t = IPControllerTask(config=self.config)
741 # The tasks work directory is *not* the actual work directory of
751 # The tasks work directory is *not* the actual work directory of
742 # the controller. It is used as the base path for the stdout/stderr
752 # the controller. It is used as the base path for the stdout/stderr
743 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
744 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
745 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 job.add_task(t)
758 job.add_task(t)
748
759
749 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
750 job.write(self.job_file)
761 job.write(self.job_file)
751
762
752 @property
763 @property
753 def job_file(self):
764 def job_file(self):
754 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
755
766
756 def start(self, profile_dir):
767 def start(self):
757 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
761
770
762
771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764
773
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
769
778
770 def write_job_file(self, n):
779 def write_job_file(self, n):
771 job = IPEngineSetJob(config=self.config)
780 job = IPEngineSetJob(config=self.config)
772
781
773 for i in range(n):
782 for i in range(n):
774 t = IPEngineTask(config=self.config)
783 t = IPEngineTask(config=self.config)
775 # The tasks work directory is *not* the actual work directory of
784 # The tasks work directory is *not* the actual work directory of
776 # the engine. It is used as the base path for the stdout/stderr
785 # the engine. It is used as the base path for the stdout/stderr
777 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
778 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
779 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 job.add_task(t)
791 job.add_task(t)
782
792
783 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
784 job.write(self.job_file)
794 job.write(self.job_file)
785
795
786 @property
796 @property
787 def job_file(self):
797 def job_file(self):
788 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
789
799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795
803
796
804
797 #-----------------------------------------------------------------------------
805 #-----------------------------------------------------------------------------
798 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
799 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
800
808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates context dict, rather than args"""
811 context = Dict({'profile_dir':'', 'cluster_id':''})
812 def _profile_dir_changed(self, name, old, new):
813 self.context[name] = new
814 _cluster_id_changed = _profile_dir_changed
815
801 class BatchSystemLauncher(BaseLauncher):
816 class BatchSystemLauncher(BaseLauncher):
802 """Launch an external process using a batch system.
817 """Launch an external process using a batch system.
803
818
804 This class is designed to work with UNIX batch systems like PBS, LSF,
819 This class is designed to work with UNIX batch systems like PBS, LSF,
805 GridEngine, etc. The overall model is that there are different commands
820 GridEngine, etc. The overall model is that there are different commands
806 like qsub, qdel, etc. that handle the starting and stopping of the process.
821 like qsub, qdel, etc. that handle the starting and stopping of the process.
807
822
808 This class also has the notion of a batch script. The ``batch_template``
823 This class also has the notion of a batch script. The ``batch_template``
809 attribute can be set to a string that is a template for the batch script.
824 attribute can be set to a string that is a template for the batch script.
810 This template is instantiated using string formatting. Thus the template can
825 This template is instantiated using string formatting. Thus the template can
811 use {n} fot the number of instances. Subclasses can add additional variables
826 use {n} fot the number of instances. Subclasses can add additional variables
812 to the template dict.
827 to the template dict.
813 """
828 """
814
829
815 # Subclasses must fill these in. See PBSEngineSet
830 # Subclasses must fill these in. See PBSEngineSet
816 submit_command = List([''], config=True,
831 submit_command = List([''], config=True,
817 help="The name of the command line program used to submit jobs.")
832 help="The name of the command line program used to submit jobs.")
818 delete_command = List([''], config=True,
833 delete_command = List([''], config=True,
819 help="The name of the command line program used to delete jobs.")
834 help="The name of the command line program used to delete jobs.")
820 job_id_regexp = Unicode('', config=True,
835 job_id_regexp = Unicode('', config=True,
821 help="""A regular expression used to get the job id from the output of the
836 help="""A regular expression used to get the job id from the output of the
822 submit_command.""")
837 submit_command.""")
823 batch_template = Unicode('', config=True,
838 batch_template = Unicode('', config=True,
824 help="The string that is the batch script template itself.")
839 help="The string that is the batch script template itself.")
825 batch_template_file = Unicode(u'', config=True,
840 batch_template_file = Unicode(u'', config=True,
826 help="The file that contains the batch template.")
841 help="The file that contains the batch template.")
827 batch_file_name = Unicode(u'batch_script', config=True,
842 batch_file_name = Unicode(u'batch_script', config=True,
828 help="The filename of the instantiated batch script.")
843 help="The filename of the instantiated batch script.")
829 queue = Unicode(u'', config=True,
844 queue = Unicode(u'', config=True,
830 help="The PBS Queue.")
845 help="The PBS Queue.")
831
846
847 def _queue_changed(self, name, old, new):
848 self.context[name] = new
849
850 n = Int(1)
851 _n_changed = _queue_changed
852
832 # not configurable, override in subclasses
853 # not configurable, override in subclasses
833 # PBS Job Array regex
854 # PBS Job Array regex
834 job_array_regexp = Unicode('')
855 job_array_regexp = Unicode('')
835 job_array_template = Unicode('')
856 job_array_template = Unicode('')
836 # PBS Queue regex
857 # PBS Queue regex
837 queue_regexp = Unicode('')
858 queue_regexp = Unicode('')
838 queue_template = Unicode('')
859 queue_template = Unicode('')
839 # The default batch template, override in subclasses
860 # The default batch template, override in subclasses
840 default_template = Unicode('')
861 default_template = Unicode('')
841 # The full path to the instantiated batch script.
862 # The full path to the instantiated batch script.
842 batch_file = Unicode(u'')
863 batch_file = Unicode(u'')
843 # the format dict used with batch_template:
864 # the format dict used with batch_template:
844 context = Dict()
865 context = Dict()
845 # the Formatter instance for rendering the templates:
866 # the Formatter instance for rendering the templates:
846 formatter = Instance(EvalFormatter, (), {})
867 formatter = Instance(EvalFormatter, (), {})
847
868
848
869
849 def find_args(self):
870 def find_args(self):
850 return self.submit_command + [self.batch_file]
871 return self.submit_command + [self.batch_file]
851
872
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
873 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 super(BatchSystemLauncher, self).__init__(
874 super(BatchSystemLauncher, self).__init__(
854 work_dir=work_dir, config=config, **kwargs
875 work_dir=work_dir, config=config, **kwargs
855 )
876 )
856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
877 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
857
878
858 def parse_job_id(self, output):
879 def parse_job_id(self, output):
859 """Take the output of the submit command and return the job id."""
880 """Take the output of the submit command and return the job id."""
860 m = re.search(self.job_id_regexp, output)
881 m = re.search(self.job_id_regexp, output)
861 if m is not None:
882 if m is not None:
862 job_id = m.group()
883 job_id = m.group()
863 else:
884 else:
864 raise LauncherError("Job id couldn't be determined: %s" % output)
885 raise LauncherError("Job id couldn't be determined: %s" % output)
865 self.job_id = job_id
886 self.job_id = job_id
866 self.log.info('Job submitted with job id: %r' % job_id)
887 self.log.info('Job submitted with job id: %r' % job_id)
867 return job_id
888 return job_id
868
889
869 def write_batch_script(self, n):
890 def write_batch_script(self, n):
870 """Instantiate and write the batch script to the work_dir."""
891 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
892 self.n = n
872 self.context['queue'] = self.queue
873 # first priority is batch_template if set
893 # first priority is batch_template if set
874 if self.batch_template_file and not self.batch_template:
894 if self.batch_template_file and not self.batch_template:
875 # second priority is batch_template_file
895 # second priority is batch_template_file
876 with open(self.batch_template_file) as f:
896 with open(self.batch_template_file) as f:
877 self.batch_template = f.read()
897 self.batch_template = f.read()
878 if not self.batch_template:
898 if not self.batch_template:
879 # third (last) priority is default_template
899 # third (last) priority is default_template
880 self.batch_template = self.default_template
900 self.batch_template = self.default_template
881
901
882 # add jobarray or queue lines to user-specified template
902 # add jobarray or queue lines to user-specified template
883 # note that this is *only* when user did not specify a template.
903 # note that this is *only* when user did not specify a template.
884 regex = re.compile(self.job_array_regexp)
904 regex = re.compile(self.job_array_regexp)
885 # print regex.search(self.batch_template)
905 # print regex.search(self.batch_template)
886 if not regex.search(self.batch_template):
906 if not regex.search(self.batch_template):
887 self.log.info("adding job array settings to batch script")
907 self.log.info("adding job array settings to batch script")
888 firstline, rest = self.batch_template.split('\n',1)
908 firstline, rest = self.batch_template.split('\n',1)
889 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
909 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
890
910
891 regex = re.compile(self.queue_regexp)
911 regex = re.compile(self.queue_regexp)
892 # print regex.search(self.batch_template)
912 # print regex.search(self.batch_template)
893 if self.queue and not regex.search(self.batch_template):
913 if self.queue and not regex.search(self.batch_template):
894 self.log.info("adding PBS queue settings to batch script")
914 self.log.info("adding PBS queue settings to batch script")
895 firstline, rest = self.batch_template.split('\n',1)
915 firstline, rest = self.batch_template.split('\n',1)
896 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
916 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
897
917
898 script_as_string = self.formatter.format(self.batch_template, **self.context)
918 script_as_string = self.formatter.format(self.batch_template, **self.context)
899 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
919 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
900
920
901 with open(self.batch_file, 'w') as f:
921 with open(self.batch_file, 'w') as f:
902 f.write(script_as_string)
922 f.write(script_as_string)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
923 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904
924
905 def start(self, n, profile_dir):
925 def start(self, n):
906 """Start n copies of the process using a batch system."""
926 """Start n copies of the process using a batch system."""
907 # Here we save profile_dir in the context so they
927 # Here we save profile_dir in the context so they
908 # can be used in the batch script template as {profile_dir}
928 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 self.write_batch_script(n)
929 self.write_batch_script(n)
912 output = check_output(self.args, env=os.environ)
930 output = check_output(self.args, env=os.environ)
913
931
914 job_id = self.parse_job_id(output)
932 job_id = self.parse_job_id(output)
915 self.notify_start(job_id)
933 self.notify_start(job_id)
916 return job_id
934 return job_id
917
935
918 def stop(self):
936 def stop(self):
919 output = check_output(self.delete_command+[self.job_id], env=os.environ)
937 output = check_output(self.delete_command+[self.job_id], env=os.environ)
920 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
938 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
921 return output
939 return output
922
940
923
941
924 class PBSLauncher(BatchSystemLauncher):
942 class PBSLauncher(BatchSystemLauncher):
925 """A BatchSystemLauncher subclass for PBS."""
943 """A BatchSystemLauncher subclass for PBS."""
926
944
927 submit_command = List(['qsub'], config=True,
945 submit_command = List(['qsub'], config=True,
928 help="The PBS submit command ['qsub']")
946 help="The PBS submit command ['qsub']")
929 delete_command = List(['qdel'], config=True,
947 delete_command = List(['qdel'], config=True,
930 help="The PBS delete command ['qsub']")
948 help="The PBS delete command ['qsub']")
931 job_id_regexp = Unicode(r'\d+', config=True,
949 job_id_regexp = Unicode(r'\d+', config=True,
932 help="Regular expresion for identifying the job ID [r'\d+']")
950 help="Regular expresion for identifying the job ID [r'\d+']")
933
951
934 batch_file = Unicode(u'')
952 batch_file = Unicode(u'')
935 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
953 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
936 job_array_template = Unicode('#PBS -t 1-{n}')
954 job_array_template = Unicode('#PBS -t 1-{n}')
937 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
955 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
938 queue_template = Unicode('#PBS -q {queue}')
956 queue_template = Unicode('#PBS -q {queue}')
939
957
940
958
941 class PBSControllerLauncher(PBSLauncher):
959 class PBSControllerLauncher(BatchClusterAppMixin, PBSLauncher):
942 """Launch a controller using PBS."""
960 """Launch a controller using PBS."""
943
961
944 batch_file_name = Unicode(u'pbs_controller', config=True,
962 batch_file_name = Unicode(u'pbs_controller', config=True,
945 help="batch file name for the controller job.")
963 help="batch file name for the controller job.")
946 default_template= Unicode("""#!/bin/sh
964 default_template= Unicode("""#!/bin/sh
947 #PBS -V
965 #PBS -V
948 #PBS -N ipcontroller
966 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
967 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 """%(' '.join(ipcontroller_cmd_argv)))
968 """%(' '.join(ipcontroller_cmd_argv)))
951
969
952 def start(self, profile_dir):
970
971 def start(self):
953 """Start the controller by profile or profile_dir."""
972 """Start the controller by profile or profile_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
973 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
974 return super(PBSControllerLauncher, self).start(1)
956
975
957
976
958 class PBSEngineSetLauncher(PBSLauncher):
977 class PBSEngineSetLauncher(BatchClusterAppMixin, PBSLauncher):
959 """Launch Engines using PBS"""
978 """Launch Engines using PBS"""
960 batch_file_name = Unicode(u'pbs_engines', config=True,
979 batch_file_name = Unicode(u'pbs_engines', config=True,
961 help="batch file name for the engine(s) job.")
980 help="batch file name for the engine(s) job.")
962 default_template= Unicode(u"""#!/bin/sh
981 default_template= Unicode(u"""#!/bin/sh
963 #PBS -V
982 #PBS -V
964 #PBS -N ipengine
983 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
984 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 """%(' '.join(ipengine_cmd_argv)))
985 """%(' '.join(ipengine_cmd_argv)))
967
986
968 def start(self, n, profile_dir):
987 def start(self, n):
969 """Start n engines by profile or profile_dir."""
988 """Start n engines by profile or profile_dir."""
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
989 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
990 return super(PBSEngineSetLauncher, self).start(n)
972
991
973 #SGE is very similar to PBS
992 #SGE is very similar to PBS
974
993
975 class SGELauncher(PBSLauncher):
994 class SGELauncher(PBSLauncher):
976 """Sun GridEngine is a PBS clone with slightly different syntax"""
995 """Sun GridEngine is a PBS clone with slightly different syntax"""
977 job_array_regexp = Unicode('#\$\W+\-t')
996 job_array_regexp = Unicode('#\$\W+\-t')
978 job_array_template = Unicode('#$ -t 1-{n}')
997 job_array_template = Unicode('#$ -t 1-{n}')
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
998 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 queue_template = Unicode('#$ -q {queue}')
999 queue_template = Unicode('#$ -q {queue}')
981
1000
982 class SGEControllerLauncher(SGELauncher):
1001 class SGEControllerLauncher(BatchClusterAppMixin, SGELauncher):
983 """Launch a controller using SGE."""
1002 """Launch a controller using SGE."""
984
1003
985 batch_file_name = Unicode(u'sge_controller', config=True,
1004 batch_file_name = Unicode(u'sge_controller', config=True,
986 help="batch file name for the ipontroller job.")
1005 help="batch file name for the ipontroller job.")
987 default_template= Unicode(u"""#$ -V
1006 default_template= Unicode(u"""#$ -V
988 #$ -S /bin/sh
1007 #$ -S /bin/sh
989 #$ -N ipcontroller
1008 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1009 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 """%(' '.join(ipcontroller_cmd_argv)))
1010 """%(' '.join(ipcontroller_cmd_argv)))
992
1011
993 def start(self, profile_dir):
1012 def start(self):
994 """Start the controller by profile or profile_dir."""
1013 """Start the controller by profile or profile_dir."""
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1014 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1015 return super(SGEControllerLauncher, self).start(1)
997
1016
998 class SGEEngineSetLauncher(SGELauncher):
1017 class SGEEngineSetLauncher(BatchClusterAppMixin, SGELauncher):
999 """Launch Engines with SGE"""
1018 """Launch Engines with SGE"""
1000 batch_file_name = Unicode(u'sge_engines', config=True,
1019 batch_file_name = Unicode(u'sge_engines', config=True,
1001 help="batch file name for the engine(s) job.")
1020 help="batch file name for the engine(s) job.")
1002 default_template = Unicode("""#$ -V
1021 default_template = Unicode("""#$ -V
1003 #$ -S /bin/sh
1022 #$ -S /bin/sh
1004 #$ -N ipengine
1023 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1024 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 """%(' '.join(ipengine_cmd_argv)))
1025 """%(' '.join(ipengine_cmd_argv)))
1007
1026
1008 def start(self, n, profile_dir):
1027 def start(self, n):
1009 """Start n engines by profile or profile_dir."""
1028 """Start n engines by profile or profile_dir."""
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1029 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1030 return super(SGEEngineSetLauncher, self).start(n)
1012
1031
1013
1032
1014 # LSF launchers
1033 # LSF launchers
1015
1034
1016 class LSFLauncher(BatchSystemLauncher):
1035 class LSFLauncher(BatchSystemLauncher):
1017 """A BatchSystemLauncher subclass for LSF."""
1036 """A BatchSystemLauncher subclass for LSF."""
1018
1037
1019 submit_command = List(['bsub'], config=True,
1038 submit_command = List(['bsub'], config=True,
1020 help="The PBS submit command ['bsub']")
1039 help="The PBS submit command ['bsub']")
1021 delete_command = List(['bkill'], config=True,
1040 delete_command = List(['bkill'], config=True,
1022 help="The PBS delete command ['bkill']")
1041 help="The PBS delete command ['bkill']")
1023 job_id_regexp = Unicode(r'\d+', config=True,
1042 job_id_regexp = Unicode(r'\d+', config=True,
1024 help="Regular expresion for identifying the job ID [r'\d+']")
1043 help="Regular expresion for identifying the job ID [r'\d+']")
1025
1044
1026 batch_file = Unicode(u'')
1045 batch_file = Unicode(u'')
1027 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1046 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1028 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1047 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1048 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 queue_template = Unicode('#BSUB -q {queue}')
1049 queue_template = Unicode('#BSUB -q {queue}')
1031
1050
1032 def start(self, n, profile_dir):
1051 def start(self, n):
1033 """Start n copies of the process using LSF batch system.
1052 """Start n copies of the process using LSF batch system.
1034 This cant inherit from the base class because bsub expects
1053 This cant inherit from the base class because bsub expects
1035 to be piped a shell script in order to honor the #BSUB directives :
1054 to be piped a shell script in order to honor the #BSUB directives :
1036 bsub < script
1055 bsub < script
1037 """
1056 """
1038 # Here we save profile_dir in the context so they
1057 # Here we save profile_dir in the context so they
1039 # can be used in the batch script template as {profile_dir}
1058 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 self.write_batch_script(n)
1059 self.write_batch_script(n)
1043 #output = check_output(self.args, env=os.environ)
1060 #output = check_output(self.args, env=os.environ)
1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1061 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1045 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1062 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1046 output,err = p.communicate()
1063 output,err = p.communicate()
1047 job_id = self.parse_job_id(output)
1064 job_id = self.parse_job_id(output)
1048 self.notify_start(job_id)
1065 self.notify_start(job_id)
1049 return job_id
1066 return job_id
1050
1067
1051
1068
1052 class LSFControllerLauncher(LSFLauncher):
1069 class LSFControllerLauncher(BatchClusterAppMixin, LSFLauncher):
1053 """Launch a controller using LSF."""
1070 """Launch a controller using LSF."""
1054
1071
1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1072 batch_file_name = Unicode(u'lsf_controller', config=True,
1056 help="batch file name for the controller job.")
1073 help="batch file name for the controller job.")
1057 default_template= Unicode("""#!/bin/sh
1074 default_template= Unicode("""#!/bin/sh
1058 #BSUB -J ipcontroller
1075 #BSUB -J ipcontroller
1059 #BSUB -oo ipcontroller.o.%%J
1076 #BSUB -oo ipcontroller.o.%%J
1060 #BSUB -eo ipcontroller.e.%%J
1077 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1078 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 """%(' '.join(ipcontroller_cmd_argv)))
1079 """%(' '.join(ipcontroller_cmd_argv)))
1063
1080
1064 def start(self, profile_dir):
1081 def start(self):
1065 """Start the controller by profile or profile_dir."""
1082 """Start the controller by profile or profile_dir."""
1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1083 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1084 return super(LSFControllerLauncher, self).start(1)
1068
1085
1069
1086
1070 class LSFEngineSetLauncher(LSFLauncher):
1087 class LSFEngineSetLauncher(BatchClusterAppMixin, LSFLauncher):
1071 """Launch Engines using LSF"""
1088 """Launch Engines using LSF"""
1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1089 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 help="batch file name for the engine(s) job.")
1090 help="batch file name for the engine(s) job.")
1074 default_template= Unicode(u"""#!/bin/sh
1091 default_template= Unicode(u"""#!/bin/sh
1075 #BSUB -oo ipengine.o.%%J
1092 #BSUB -oo ipengine.o.%%J
1076 #BSUB -eo ipengine.e.%%J
1093 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1094 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 """%(' '.join(ipengine_cmd_argv)))
1095 """%(' '.join(ipengine_cmd_argv)))
1079
1096
1080 def start(self, n, profile_dir):
1097 def start(self, n):
1081 """Start n engines by profile or profile_dir."""
1098 """Start n engines by profile or profile_dir."""
1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1099 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1100 return super(LSFEngineSetLauncher, self).start(n)
1084
1101
1085
1102
1086 #-----------------------------------------------------------------------------
1103 #-----------------------------------------------------------------------------
1087 # A launcher for ipcluster itself!
1104 # A launcher for ipcluster itself!
1088 #-----------------------------------------------------------------------------
1105 #-----------------------------------------------------------------------------
1089
1106
1090
1107
1091 class IPClusterLauncher(LocalProcessLauncher):
1108 class IPClusterLauncher(LocalProcessLauncher):
1092 """Launch the ipcluster program in an external process."""
1109 """Launch the ipcluster program in an external process."""
1093
1110
1094 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1111 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1095 help="Popen command for ipcluster")
1112 help="Popen command for ipcluster")
1096 ipcluster_args = List(
1113 ipcluster_args = List(
1097 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1114 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1098 help="Command line arguments to pass to ipcluster.")
1115 help="Command line arguments to pass to ipcluster.")
1099 ipcluster_subcommand = Unicode('start')
1116 ipcluster_subcommand = Unicode('start')
1100 ipcluster_n = Int(2)
1117 ipcluster_n = Int(2)
1101
1118
1102 def find_args(self):
1119 def find_args(self):
1103 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1120 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1104 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1121 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1105
1122
1106 def start(self):
1123 def start(self):
1107 self.log.info("Starting ipcluster: %r" % self.args)
1124 self.log.info("Starting ipcluster: %r" % self.args)
1108 return super(IPClusterLauncher, self).start()
1125 return super(IPClusterLauncher, self).start()
1109
1126
1110 #-----------------------------------------------------------------------------
1127 #-----------------------------------------------------------------------------
1111 # Collections of launchers
1128 # Collections of launchers
1112 #-----------------------------------------------------------------------------
1129 #-----------------------------------------------------------------------------
1113
1130
1114 local_launchers = [
1131 local_launchers = [
1115 LocalControllerLauncher,
1132 LocalControllerLauncher,
1116 LocalEngineLauncher,
1133 LocalEngineLauncher,
1117 LocalEngineSetLauncher,
1134 LocalEngineSetLauncher,
1118 ]
1135 ]
1119 mpi_launchers = [
1136 mpi_launchers = [
1120 MPIExecLauncher,
1137 MPIExecLauncher,
1121 MPIExecControllerLauncher,
1138 MPIExecControllerLauncher,
1122 MPIExecEngineSetLauncher,
1139 MPIExecEngineSetLauncher,
1123 ]
1140 ]
1124 ssh_launchers = [
1141 ssh_launchers = [
1125 SSHLauncher,
1142 SSHLauncher,
1126 SSHControllerLauncher,
1143 SSHControllerLauncher,
1127 SSHEngineLauncher,
1144 SSHEngineLauncher,
1128 SSHEngineSetLauncher,
1145 SSHEngineSetLauncher,
1129 ]
1146 ]
1130 winhpc_launchers = [
1147 winhpc_launchers = [
1131 WindowsHPCLauncher,
1148 WindowsHPCLauncher,
1132 WindowsHPCControllerLauncher,
1149 WindowsHPCControllerLauncher,
1133 WindowsHPCEngineSetLauncher,
1150 WindowsHPCEngineSetLauncher,
1134 ]
1151 ]
1135 pbs_launchers = [
1152 pbs_launchers = [
1136 PBSLauncher,
1153 PBSLauncher,
1137 PBSControllerLauncher,
1154 PBSControllerLauncher,
1138 PBSEngineSetLauncher,
1155 PBSEngineSetLauncher,
1139 ]
1156 ]
1140 sge_launchers = [
1157 sge_launchers = [
1141 SGELauncher,
1158 SGELauncher,
1142 SGEControllerLauncher,
1159 SGEControllerLauncher,
1143 SGEEngineSetLauncher,
1160 SGEEngineSetLauncher,
1144 ]
1161 ]
1145 lsf_launchers = [
1162 lsf_launchers = [
1146 LSFLauncher,
1163 LSFLauncher,
1147 LSFControllerLauncher,
1164 LSFControllerLauncher,
1148 LSFEngineSetLauncher,
1165 LSFEngineSetLauncher,
1149 ]
1166 ]
1150 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1167 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1151 + pbs_launchers + sge_launchers + lsf_launchers
1168 + pbs_launchers + sge_launchers + lsf_launchers
1152
1169
General Comments 0
You need to be logged in to leave comments. Login now