##// END OF EJS Templates
Tided up queue configuration(no support in Condor)
James Booth -
Show More
@@ -1,1463 +1,1462 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67
67
68 from .win32support import forward_read_events
68 from .win32support import forward_read_events
69
69
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71
71
72 WINDOWS = os.name == 'nt'
72 WINDOWS = os.name == 'nt'
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Paths to the kernel apps
75 # Paths to the kernel apps
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79
79
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81
81
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83
83
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85
85
86 # HTCondor frustratingly destroys sys.executable when launching remote processes
86 # HTCondor frustratingly destroys sys.executable when launching remote processes
87 # thus Python will default back to the system module search paths which is
87 # thus Python will default back to the system module search paths which is
88 # ridiculously fragile (not to mention destructive for virutalenvs).
88 # ridiculously fragile (not to mention destructive for virutalenvs).
89 # however, if we use the ip{cluster, engine, controller} scripts as our
89 # however, if we use the ip{cluster, engine, controller} scripts as our
90 # executable we circumvent this - the mechanism of shebanged scripts means that
90 # executable we circumvent this - the mechanism of shebanged scripts means that
91 # the python binary will be launched with argv[0] set correctly.
91 # the python binary will be launched with argv[0] set correctly.
92 # This does mean that for HTCondor we require that:
92 # This does mean that for HTCondor we require that:
93 # a. The python interpreter you are using is in a folder next to the ipengine,
93 # a. The python interpreter you are using is in a folder next to the ipengine,
94 # ipcluster and ipcontroller scripts
94 # ipcluster and ipcontroller scripts
95 # b. I have no idea what the consequences are for Windows.
95 # b. I have no idea what the consequences are for Windows.
96 bin_dir = os.path.dirname(sys.executable)
96 bin_dir = os.path.dirname(sys.executable)
97
97
98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
99
99
100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
101
101
102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
103
103
104 #-----------------------------------------------------------------------------
104 #-----------------------------------------------------------------------------
105 # Base launchers and errors
105 # Base launchers and errors
106 #-----------------------------------------------------------------------------
106 #-----------------------------------------------------------------------------
107
107
108 class LauncherError(Exception):
108 class LauncherError(Exception):
109 pass
109 pass
110
110
111
111
112 class ProcessStateError(LauncherError):
112 class ProcessStateError(LauncherError):
113 pass
113 pass
114
114
115
115
116 class UnknownStatus(LauncherError):
116 class UnknownStatus(LauncherError):
117 pass
117 pass
118
118
119
119
120 class BaseLauncher(LoggingConfigurable):
120 class BaseLauncher(LoggingConfigurable):
121 """An asbtraction for starting, stopping and signaling a process."""
121 """An asbtraction for starting, stopping and signaling a process."""
122
122
123 # In all of the launchers, the work_dir is where child processes will be
123 # In all of the launchers, the work_dir is where child processes will be
124 # run. This will usually be the profile_dir, but may not be. any work_dir
124 # run. This will usually be the profile_dir, but may not be. any work_dir
125 # passed into the __init__ method will override the config value.
125 # passed into the __init__ method will override the config value.
126 # This should not be used to set the work_dir for the actual engine
126 # This should not be used to set the work_dir for the actual engine
127 # and controller. Instead, use their own config files or the
127 # and controller. Instead, use their own config files or the
128 # controller_args, engine_args attributes of the launchers to add
128 # controller_args, engine_args attributes of the launchers to add
129 # the work_dir option.
129 # the work_dir option.
130 work_dir = Unicode(u'.')
130 work_dir = Unicode(u'.')
131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
132
132
133 start_data = Any()
133 start_data = Any()
134 stop_data = Any()
134 stop_data = Any()
135
135
136 def _loop_default(self):
136 def _loop_default(self):
137 return ioloop.IOLoop.instance()
137 return ioloop.IOLoop.instance()
138
138
139 def __init__(self, work_dir=u'.', config=None, **kwargs):
139 def __init__(self, work_dir=u'.', config=None, **kwargs):
140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
141 self.state = 'before' # can be before, running, after
141 self.state = 'before' # can be before, running, after
142 self.stop_callbacks = []
142 self.stop_callbacks = []
143 self.start_data = None
143 self.start_data = None
144 self.stop_data = None
144 self.stop_data = None
145
145
146 @property
146 @property
147 def args(self):
147 def args(self):
148 """A list of cmd and args that will be used to start the process.
148 """A list of cmd and args that will be used to start the process.
149
149
150 This is what is passed to :func:`spawnProcess` and the first element
150 This is what is passed to :func:`spawnProcess` and the first element
151 will be the process name.
151 will be the process name.
152 """
152 """
153 return self.find_args()
153 return self.find_args()
154
154
155 def find_args(self):
155 def find_args(self):
156 """The ``.args`` property calls this to find the args list.
156 """The ``.args`` property calls this to find the args list.
157
157
158 Subcommand should implement this to construct the cmd and args.
158 Subcommand should implement this to construct the cmd and args.
159 """
159 """
160 raise NotImplementedError('find_args must be implemented in a subclass')
160 raise NotImplementedError('find_args must be implemented in a subclass')
161
161
162 @property
162 @property
163 def arg_str(self):
163 def arg_str(self):
164 """The string form of the program arguments."""
164 """The string form of the program arguments."""
165 return ' '.join(self.args)
165 return ' '.join(self.args)
166
166
167 @property
167 @property
168 def running(self):
168 def running(self):
169 """Am I running."""
169 """Am I running."""
170 if self.state == 'running':
170 if self.state == 'running':
171 return True
171 return True
172 else:
172 else:
173 return False
173 return False
174
174
175 def start(self):
175 def start(self):
176 """Start the process."""
176 """Start the process."""
177 raise NotImplementedError('start must be implemented in a subclass')
177 raise NotImplementedError('start must be implemented in a subclass')
178
178
179 def stop(self):
179 def stop(self):
180 """Stop the process and notify observers of stopping.
180 """Stop the process and notify observers of stopping.
181
181
182 This method will return None immediately.
182 This method will return None immediately.
183 To observe the actual process stopping, see :meth:`on_stop`.
183 To observe the actual process stopping, see :meth:`on_stop`.
184 """
184 """
185 raise NotImplementedError('stop must be implemented in a subclass')
185 raise NotImplementedError('stop must be implemented in a subclass')
186
186
187 def on_stop(self, f):
187 def on_stop(self, f):
188 """Register a callback to be called with this Launcher's stop_data
188 """Register a callback to be called with this Launcher's stop_data
189 when the process actually finishes.
189 when the process actually finishes.
190 """
190 """
191 if self.state=='after':
191 if self.state=='after':
192 return f(self.stop_data)
192 return f(self.stop_data)
193 else:
193 else:
194 self.stop_callbacks.append(f)
194 self.stop_callbacks.append(f)
195
195
196 def notify_start(self, data):
196 def notify_start(self, data):
197 """Call this to trigger startup actions.
197 """Call this to trigger startup actions.
198
198
199 This logs the process startup and sets the state to 'running'. It is
199 This logs the process startup and sets the state to 'running'. It is
200 a pass-through so it can be used as a callback.
200 a pass-through so it can be used as a callback.
201 """
201 """
202
202
203 self.log.debug('Process %r started: %r', self.args[0], data)
203 self.log.debug('Process %r started: %r', self.args[0], data)
204 self.start_data = data
204 self.start_data = data
205 self.state = 'running'
205 self.state = 'running'
206 return data
206 return data
207
207
208 def notify_stop(self, data):
208 def notify_stop(self, data):
209 """Call this to trigger process stop actions.
209 """Call this to trigger process stop actions.
210
210
211 This logs the process stopping and sets the state to 'after'. Call
211 This logs the process stopping and sets the state to 'after'. Call
212 this to trigger callbacks registered via :meth:`on_stop`."""
212 this to trigger callbacks registered via :meth:`on_stop`."""
213
213
214 self.log.debug('Process %r stopped: %r', self.args[0], data)
214 self.log.debug('Process %r stopped: %r', self.args[0], data)
215 self.stop_data = data
215 self.stop_data = data
216 self.state = 'after'
216 self.state = 'after'
217 for i in range(len(self.stop_callbacks)):
217 for i in range(len(self.stop_callbacks)):
218 d = self.stop_callbacks.pop()
218 d = self.stop_callbacks.pop()
219 d(data)
219 d(data)
220 return data
220 return data
221
221
222 def signal(self, sig):
222 def signal(self, sig):
223 """Signal the process.
223 """Signal the process.
224
224
225 Parameters
225 Parameters
226 ----------
226 ----------
227 sig : str or int
227 sig : str or int
228 'KILL', 'INT', etc., or any signal number
228 'KILL', 'INT', etc., or any signal number
229 """
229 """
230 raise NotImplementedError('signal must be implemented in a subclass')
230 raise NotImplementedError('signal must be implemented in a subclass')
231
231
232 class ClusterAppMixin(HasTraits):
232 class ClusterAppMixin(HasTraits):
233 """MixIn for cluster args as traits"""
233 """MixIn for cluster args as traits"""
234 profile_dir=Unicode('')
234 profile_dir=Unicode('')
235 cluster_id=Unicode('')
235 cluster_id=Unicode('')
236
236
237 @property
237 @property
238 def cluster_args(self):
238 def cluster_args(self):
239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
240
240
241 class ControllerMixin(ClusterAppMixin):
241 class ControllerMixin(ClusterAppMixin):
242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
243 help="""Popen command to launch ipcontroller.""")
243 help="""Popen command to launch ipcontroller.""")
244 # Command line arguments to ipcontroller.
244 # Command line arguments to ipcontroller.
245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
246 help="""command-line args to pass to ipcontroller""")
246 help="""command-line args to pass to ipcontroller""")
247
247
248 class EngineMixin(ClusterAppMixin):
248 class EngineMixin(ClusterAppMixin):
249 engine_cmd = List(ipengine_cmd_argv, config=True,
249 engine_cmd = List(ipengine_cmd_argv, config=True,
250 help="""command to launch the Engine.""")
250 help="""command to launch the Engine.""")
251 # Command line arguments for ipengine.
251 # Command line arguments for ipengine.
252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
253 help="command-line arguments to pass to ipengine"
253 help="command-line arguments to pass to ipengine"
254 )
254 )
255
255
256
256
257 #-----------------------------------------------------------------------------
257 #-----------------------------------------------------------------------------
258 # Local process launchers
258 # Local process launchers
259 #-----------------------------------------------------------------------------
259 #-----------------------------------------------------------------------------
260
260
261
261
262 class LocalProcessLauncher(BaseLauncher):
262 class LocalProcessLauncher(BaseLauncher):
263 """Start and stop an external process in an asynchronous manner.
263 """Start and stop an external process in an asynchronous manner.
264
264
265 This will launch the external process with a working directory of
265 This will launch the external process with a working directory of
266 ``self.work_dir``.
266 ``self.work_dir``.
267 """
267 """
268
268
269 # This is used to to construct self.args, which is passed to
269 # This is used to to construct self.args, which is passed to
270 # spawnProcess.
270 # spawnProcess.
271 cmd_and_args = List([])
271 cmd_and_args = List([])
272 poll_frequency = Integer(100) # in ms
272 poll_frequency = Integer(100) # in ms
273
273
274 def __init__(self, work_dir=u'.', config=None, **kwargs):
274 def __init__(self, work_dir=u'.', config=None, **kwargs):
275 super(LocalProcessLauncher, self).__init__(
275 super(LocalProcessLauncher, self).__init__(
276 work_dir=work_dir, config=config, **kwargs
276 work_dir=work_dir, config=config, **kwargs
277 )
277 )
278 self.process = None
278 self.process = None
279 self.poller = None
279 self.poller = None
280
280
281 def find_args(self):
281 def find_args(self):
282 return self.cmd_and_args
282 return self.cmd_and_args
283
283
284 def start(self):
284 def start(self):
285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
286 if self.state == 'before':
286 if self.state == 'before':
287 self.process = Popen(self.args,
287 self.process = Popen(self.args,
288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
289 env=os.environ,
289 env=os.environ,
290 cwd=self.work_dir
290 cwd=self.work_dir
291 )
291 )
292 if WINDOWS:
292 if WINDOWS:
293 self.stdout = forward_read_events(self.process.stdout)
293 self.stdout = forward_read_events(self.process.stdout)
294 self.stderr = forward_read_events(self.process.stderr)
294 self.stderr = forward_read_events(self.process.stderr)
295 else:
295 else:
296 self.stdout = self.process.stdout.fileno()
296 self.stdout = self.process.stdout.fileno()
297 self.stderr = self.process.stderr.fileno()
297 self.stderr = self.process.stderr.fileno()
298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
301 self.poller.start()
301 self.poller.start()
302 self.notify_start(self.process.pid)
302 self.notify_start(self.process.pid)
303 else:
303 else:
304 s = 'The process was already started and has state: %r' % self.state
304 s = 'The process was already started and has state: %r' % self.state
305 raise ProcessStateError(s)
305 raise ProcessStateError(s)
306
306
307 def stop(self):
307 def stop(self):
308 return self.interrupt_then_kill()
308 return self.interrupt_then_kill()
309
309
310 def signal(self, sig):
310 def signal(self, sig):
311 if self.state == 'running':
311 if self.state == 'running':
312 if WINDOWS and sig != SIGINT:
312 if WINDOWS and sig != SIGINT:
313 # use Windows tree-kill for better child cleanup
313 # use Windows tree-kill for better child cleanup
314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
315 else:
315 else:
316 self.process.send_signal(sig)
316 self.process.send_signal(sig)
317
317
318 def interrupt_then_kill(self, delay=2.0):
318 def interrupt_then_kill(self, delay=2.0):
319 """Send INT, wait a delay and then send KILL."""
319 """Send INT, wait a delay and then send KILL."""
320 try:
320 try:
321 self.signal(SIGINT)
321 self.signal(SIGINT)
322 except Exception:
322 except Exception:
323 self.log.debug("interrupt failed")
323 self.log.debug("interrupt failed")
324 pass
324 pass
325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
326 self.killer.start()
326 self.killer.start()
327
327
328 # callbacks, etc:
328 # callbacks, etc:
329
329
330 def handle_stdout(self, fd, events):
330 def handle_stdout(self, fd, events):
331 if WINDOWS:
331 if WINDOWS:
332 line = self.stdout.recv()
332 line = self.stdout.recv()
333 else:
333 else:
334 line = self.process.stdout.readline()
334 line = self.process.stdout.readline()
335 # a stopped process will be readable but return empty strings
335 # a stopped process will be readable but return empty strings
336 if line:
336 if line:
337 self.log.debug(line[:-1])
337 self.log.debug(line[:-1])
338 else:
338 else:
339 self.poll()
339 self.poll()
340
340
341 def handle_stderr(self, fd, events):
341 def handle_stderr(self, fd, events):
342 if WINDOWS:
342 if WINDOWS:
343 line = self.stderr.recv()
343 line = self.stderr.recv()
344 else:
344 else:
345 line = self.process.stderr.readline()
345 line = self.process.stderr.readline()
346 # a stopped process will be readable but return empty strings
346 # a stopped process will be readable but return empty strings
347 if line:
347 if line:
348 self.log.debug(line[:-1])
348 self.log.debug(line[:-1])
349 else:
349 else:
350 self.poll()
350 self.poll()
351
351
352 def poll(self):
352 def poll(self):
353 status = self.process.poll()
353 status = self.process.poll()
354 if status is not None:
354 if status is not None:
355 self.poller.stop()
355 self.poller.stop()
356 self.loop.remove_handler(self.stdout)
356 self.loop.remove_handler(self.stdout)
357 self.loop.remove_handler(self.stderr)
357 self.loop.remove_handler(self.stderr)
358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
359 return status
359 return status
360
360
361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
362 """Launch a controller as a regular external process."""
362 """Launch a controller as a regular external process."""
363
363
364 def find_args(self):
364 def find_args(self):
365 return self.controller_cmd + self.cluster_args + self.controller_args
365 return self.controller_cmd + self.cluster_args + self.controller_args
366
366
367 def start(self):
367 def start(self):
368 """Start the controller by profile_dir."""
368 """Start the controller by profile_dir."""
369 return super(LocalControllerLauncher, self).start()
369 return super(LocalControllerLauncher, self).start()
370
370
371
371
372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
373 """Launch a single engine as a regular externall process."""
373 """Launch a single engine as a regular externall process."""
374
374
375 def find_args(self):
375 def find_args(self):
376 return self.engine_cmd + self.cluster_args + self.engine_args
376 return self.engine_cmd + self.cluster_args + self.engine_args
377
377
378
378
379 class LocalEngineSetLauncher(LocalEngineLauncher):
379 class LocalEngineSetLauncher(LocalEngineLauncher):
380 """Launch a set of engines as regular external processes."""
380 """Launch a set of engines as regular external processes."""
381
381
382 delay = CFloat(0.1, config=True,
382 delay = CFloat(0.1, config=True,
383 help="""delay (in seconds) between starting each engine after the first.
383 help="""delay (in seconds) between starting each engine after the first.
384 This can help force the engines to get their ids in order, or limit
384 This can help force the engines to get their ids in order, or limit
385 process flood when starting many engines."""
385 process flood when starting many engines."""
386 )
386 )
387
387
388 # launcher class
388 # launcher class
389 launcher_class = LocalEngineLauncher
389 launcher_class = LocalEngineLauncher
390
390
391 launchers = Dict()
391 launchers = Dict()
392 stop_data = Dict()
392 stop_data = Dict()
393
393
394 def __init__(self, work_dir=u'.', config=None, **kwargs):
394 def __init__(self, work_dir=u'.', config=None, **kwargs):
395 super(LocalEngineSetLauncher, self).__init__(
395 super(LocalEngineSetLauncher, self).__init__(
396 work_dir=work_dir, config=config, **kwargs
396 work_dir=work_dir, config=config, **kwargs
397 )
397 )
398 self.stop_data = {}
398 self.stop_data = {}
399
399
400 def start(self, n):
400 def start(self, n):
401 """Start n engines by profile or profile_dir."""
401 """Start n engines by profile or profile_dir."""
402 dlist = []
402 dlist = []
403 for i in range(n):
403 for i in range(n):
404 if i > 0:
404 if i > 0:
405 time.sleep(self.delay)
405 time.sleep(self.delay)
406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
408 )
408 )
409
409
410 # Copy the engine args over to each engine launcher.
410 # Copy the engine args over to each engine launcher.
411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
412 el.engine_args = copy.deepcopy(self.engine_args)
412 el.engine_args = copy.deepcopy(self.engine_args)
413 el.on_stop(self._notice_engine_stopped)
413 el.on_stop(self._notice_engine_stopped)
414 d = el.start()
414 d = el.start()
415 self.launchers[i] = el
415 self.launchers[i] = el
416 dlist.append(d)
416 dlist.append(d)
417 self.notify_start(dlist)
417 self.notify_start(dlist)
418 return dlist
418 return dlist
419
419
420 def find_args(self):
420 def find_args(self):
421 return ['engine set']
421 return ['engine set']
422
422
423 def signal(self, sig):
423 def signal(self, sig):
424 dlist = []
424 dlist = []
425 for el in self.launchers.itervalues():
425 for el in self.launchers.itervalues():
426 d = el.signal(sig)
426 d = el.signal(sig)
427 dlist.append(d)
427 dlist.append(d)
428 return dlist
428 return dlist
429
429
430 def interrupt_then_kill(self, delay=1.0):
430 def interrupt_then_kill(self, delay=1.0):
431 dlist = []
431 dlist = []
432 for el in self.launchers.itervalues():
432 for el in self.launchers.itervalues():
433 d = el.interrupt_then_kill(delay)
433 d = el.interrupt_then_kill(delay)
434 dlist.append(d)
434 dlist.append(d)
435 return dlist
435 return dlist
436
436
437 def stop(self):
437 def stop(self):
438 return self.interrupt_then_kill()
438 return self.interrupt_then_kill()
439
439
440 def _notice_engine_stopped(self, data):
440 def _notice_engine_stopped(self, data):
441 pid = data['pid']
441 pid = data['pid']
442 for idx,el in self.launchers.iteritems():
442 for idx,el in self.launchers.iteritems():
443 if el.process.pid == pid:
443 if el.process.pid == pid:
444 break
444 break
445 self.launchers.pop(idx)
445 self.launchers.pop(idx)
446 self.stop_data[idx] = data
446 self.stop_data[idx] = data
447 if not self.launchers:
447 if not self.launchers:
448 self.notify_stop(self.stop_data)
448 self.notify_stop(self.stop_data)
449
449
450
450
451 #-----------------------------------------------------------------------------
451 #-----------------------------------------------------------------------------
452 # MPI launchers
452 # MPI launchers
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454
454
455
455
456 class MPILauncher(LocalProcessLauncher):
456 class MPILauncher(LocalProcessLauncher):
457 """Launch an external process using mpiexec."""
457 """Launch an external process using mpiexec."""
458
458
459 mpi_cmd = List(['mpiexec'], config=True,
459 mpi_cmd = List(['mpiexec'], config=True,
460 help="The mpiexec command to use in starting the process."
460 help="The mpiexec command to use in starting the process."
461 )
461 )
462 mpi_args = List([], config=True,
462 mpi_args = List([], config=True,
463 help="The command line arguments to pass to mpiexec."
463 help="The command line arguments to pass to mpiexec."
464 )
464 )
465 program = List(['date'],
465 program = List(['date'],
466 help="The program to start via mpiexec.")
466 help="The program to start via mpiexec.")
467 program_args = List([],
467 program_args = List([],
468 help="The command line argument to the program."
468 help="The command line argument to the program."
469 )
469 )
470 n = Integer(1)
470 n = Integer(1)
471
471
472 def __init__(self, *args, **kwargs):
472 def __init__(self, *args, **kwargs):
473 # deprecation for old MPIExec names:
473 # deprecation for old MPIExec names:
474 config = kwargs.get('config', {})
474 config = kwargs.get('config', {})
475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
476 deprecated = config.get(oldname)
476 deprecated = config.get(oldname)
477 if deprecated:
477 if deprecated:
478 newname = oldname.replace('MPIExec', 'MPI')
478 newname = oldname.replace('MPIExec', 'MPI')
479 config[newname].update(deprecated)
479 config[newname].update(deprecated)
480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
481
481
482 super(MPILauncher, self).__init__(*args, **kwargs)
482 super(MPILauncher, self).__init__(*args, **kwargs)
483
483
484 def find_args(self):
484 def find_args(self):
485 """Build self.args using all the fields."""
485 """Build self.args using all the fields."""
486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
487 self.program + self.program_args
487 self.program + self.program_args
488
488
489 def start(self, n):
489 def start(self, n):
490 """Start n instances of the program using mpiexec."""
490 """Start n instances of the program using mpiexec."""
491 self.n = n
491 self.n = n
492 return super(MPILauncher, self).start()
492 return super(MPILauncher, self).start()
493
493
494
494
495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
496 """Launch a controller using mpiexec."""
496 """Launch a controller using mpiexec."""
497
497
498 # alias back to *non-configurable* program[_args] for use in find_args()
498 # alias back to *non-configurable* program[_args] for use in find_args()
499 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # than *some* having `program_args` and others `controller_args`
500 # than *some* having `program_args` and others `controller_args`
501 @property
501 @property
502 def program(self):
502 def program(self):
503 return self.controller_cmd
503 return self.controller_cmd
504
504
505 @property
505 @property
506 def program_args(self):
506 def program_args(self):
507 return self.cluster_args + self.controller_args
507 return self.cluster_args + self.controller_args
508
508
509 def start(self):
509 def start(self):
510 """Start the controller by profile_dir."""
510 """Start the controller by profile_dir."""
511 return super(MPIControllerLauncher, self).start(1)
511 return super(MPIControllerLauncher, self).start(1)
512
512
513
513
514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
515 """Launch engines using mpiexec"""
515 """Launch engines using mpiexec"""
516
516
517 # alias back to *non-configurable* program[_args] for use in find_args()
517 # alias back to *non-configurable* program[_args] for use in find_args()
518 # this way all Controller/EngineSetLaunchers have the same form, rather
518 # this way all Controller/EngineSetLaunchers have the same form, rather
519 # than *some* having `program_args` and others `controller_args`
519 # than *some* having `program_args` and others `controller_args`
520 @property
520 @property
521 def program(self):
521 def program(self):
522 return self.engine_cmd
522 return self.engine_cmd
523
523
524 @property
524 @property
525 def program_args(self):
525 def program_args(self):
526 return self.cluster_args + self.engine_args
526 return self.cluster_args + self.engine_args
527
527
528 def start(self, n):
528 def start(self, n):
529 """Start n engines by profile or profile_dir."""
529 """Start n engines by profile or profile_dir."""
530 self.n = n
530 self.n = n
531 return super(MPIEngineSetLauncher, self).start(n)
531 return super(MPIEngineSetLauncher, self).start(n)
532
532
533 # deprecated MPIExec names
533 # deprecated MPIExec names
534 class DeprecatedMPILauncher(object):
534 class DeprecatedMPILauncher(object):
535 def warn(self):
535 def warn(self):
536 oldname = self.__class__.__name__
536 oldname = self.__class__.__name__
537 newname = oldname.replace('MPIExec', 'MPI')
537 newname = oldname.replace('MPIExec', 'MPI')
538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
539
539
540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
541 """Deprecated, use MPILauncher"""
541 """Deprecated, use MPILauncher"""
542 def __init__(self, *args, **kwargs):
542 def __init__(self, *args, **kwargs):
543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
544 self.warn()
544 self.warn()
545
545
546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
547 """Deprecated, use MPIControllerLauncher"""
547 """Deprecated, use MPIControllerLauncher"""
548 def __init__(self, *args, **kwargs):
548 def __init__(self, *args, **kwargs):
549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
550 self.warn()
550 self.warn()
551
551
552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
553 """Deprecated, use MPIEngineSetLauncher"""
553 """Deprecated, use MPIEngineSetLauncher"""
554 def __init__(self, *args, **kwargs):
554 def __init__(self, *args, **kwargs):
555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
556 self.warn()
556 self.warn()
557
557
558
558
559 #-----------------------------------------------------------------------------
559 #-----------------------------------------------------------------------------
560 # SSH launchers
560 # SSH launchers
561 #-----------------------------------------------------------------------------
561 #-----------------------------------------------------------------------------
562
562
563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
564
564
565 class SSHLauncher(LocalProcessLauncher):
565 class SSHLauncher(LocalProcessLauncher):
566 """A minimal launcher for ssh.
566 """A minimal launcher for ssh.
567
567
568 To be useful this will probably have to be extended to use the ``sshx``
568 To be useful this will probably have to be extended to use the ``sshx``
569 idea for environment variables. There could be other things this needs
569 idea for environment variables. There could be other things this needs
570 as well.
570 as well.
571 """
571 """
572
572
573 ssh_cmd = List(['ssh'], config=True,
573 ssh_cmd = List(['ssh'], config=True,
574 help="command for starting ssh")
574 help="command for starting ssh")
575 ssh_args = List(['-tt'], config=True,
575 ssh_args = List(['-tt'], config=True,
576 help="args to pass to ssh")
576 help="args to pass to ssh")
577 scp_cmd = List(['scp'], config=True,
577 scp_cmd = List(['scp'], config=True,
578 help="command for sending files")
578 help="command for sending files")
579 program = List(['date'],
579 program = List(['date'],
580 help="Program to launch via ssh")
580 help="Program to launch via ssh")
581 program_args = List([],
581 program_args = List([],
582 help="args to pass to remote program")
582 help="args to pass to remote program")
583 hostname = Unicode('', config=True,
583 hostname = Unicode('', config=True,
584 help="hostname on which to launch the program")
584 help="hostname on which to launch the program")
585 user = Unicode('', config=True,
585 user = Unicode('', config=True,
586 help="username for ssh")
586 help="username for ssh")
587 location = Unicode('', config=True,
587 location = Unicode('', config=True,
588 help="user@hostname location for ssh in one setting")
588 help="user@hostname location for ssh in one setting")
589 to_fetch = List([], config=True,
589 to_fetch = List([], config=True,
590 help="List of (remote, local) files to fetch after starting")
590 help="List of (remote, local) files to fetch after starting")
591 to_send = List([], config=True,
591 to_send = List([], config=True,
592 help="List of (local, remote) files to send before starting")
592 help="List of (local, remote) files to send before starting")
593
593
594 def _hostname_changed(self, name, old, new):
594 def _hostname_changed(self, name, old, new):
595 if self.user:
595 if self.user:
596 self.location = u'%s@%s' % (self.user, new)
596 self.location = u'%s@%s' % (self.user, new)
597 else:
597 else:
598 self.location = new
598 self.location = new
599
599
600 def _user_changed(self, name, old, new):
600 def _user_changed(self, name, old, new):
601 self.location = u'%s@%s' % (new, self.hostname)
601 self.location = u'%s@%s' % (new, self.hostname)
602
602
603 def find_args(self):
603 def find_args(self):
604 return self.ssh_cmd + self.ssh_args + [self.location] + \
604 return self.ssh_cmd + self.ssh_args + [self.location] + \
605 list(map(pipes.quote, self.program + self.program_args))
605 list(map(pipes.quote, self.program + self.program_args))
606
606
607 def _send_file(self, local, remote):
607 def _send_file(self, local, remote):
608 """send a single file"""
608 """send a single file"""
609 remote = "%s:%s" % (self.location, remote)
609 remote = "%s:%s" % (self.location, remote)
610 for i in range(10):
610 for i in range(10):
611 if not os.path.exists(local):
611 if not os.path.exists(local):
612 self.log.debug("waiting for %s" % local)
612 self.log.debug("waiting for %s" % local)
613 time.sleep(1)
613 time.sleep(1)
614 else:
614 else:
615 break
615 break
616 self.log.info("sending %s to %s", local, remote)
616 self.log.info("sending %s to %s", local, remote)
617 check_output(self.scp_cmd + [local, remote])
617 check_output(self.scp_cmd + [local, remote])
618
618
619 def send_files(self):
619 def send_files(self):
620 """send our files (called before start)"""
620 """send our files (called before start)"""
621 if not self.to_send:
621 if not self.to_send:
622 return
622 return
623 for local_file, remote_file in self.to_send:
623 for local_file, remote_file in self.to_send:
624 self._send_file(local_file, remote_file)
624 self._send_file(local_file, remote_file)
625
625
626 def _fetch_file(self, remote, local):
626 def _fetch_file(self, remote, local):
627 """fetch a single file"""
627 """fetch a single file"""
628 full_remote = "%s:%s" % (self.location, remote)
628 full_remote = "%s:%s" % (self.location, remote)
629 self.log.info("fetching %s from %s", local, full_remote)
629 self.log.info("fetching %s from %s", local, full_remote)
630 for i in range(10):
630 for i in range(10):
631 # wait up to 10s for remote file to exist
631 # wait up to 10s for remote file to exist
632 check = check_output(self.ssh_cmd + self.ssh_args + \
632 check = check_output(self.ssh_cmd + self.ssh_args + \
633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
635 if check == u'no':
635 if check == u'no':
636 time.sleep(1)
636 time.sleep(1)
637 elif check == u'yes':
637 elif check == u'yes':
638 break
638 break
639 check_output(self.scp_cmd + [full_remote, local])
639 check_output(self.scp_cmd + [full_remote, local])
640
640
641 def fetch_files(self):
641 def fetch_files(self):
642 """fetch remote files (called after start)"""
642 """fetch remote files (called after start)"""
643 if not self.to_fetch:
643 if not self.to_fetch:
644 return
644 return
645 for remote_file, local_file in self.to_fetch:
645 for remote_file, local_file in self.to_fetch:
646 self._fetch_file(remote_file, local_file)
646 self._fetch_file(remote_file, local_file)
647
647
648 def start(self, hostname=None, user=None):
648 def start(self, hostname=None, user=None):
649 if hostname is not None:
649 if hostname is not None:
650 self.hostname = hostname
650 self.hostname = hostname
651 if user is not None:
651 if user is not None:
652 self.user = user
652 self.user = user
653
653
654 self.send_files()
654 self.send_files()
655 super(SSHLauncher, self).start()
655 super(SSHLauncher, self).start()
656 self.fetch_files()
656 self.fetch_files()
657
657
658 def signal(self, sig):
658 def signal(self, sig):
659 if self.state == 'running':
659 if self.state == 'running':
660 # send escaped ssh connection-closer
660 # send escaped ssh connection-closer
661 self.process.stdin.write('~.')
661 self.process.stdin.write('~.')
662 self.process.stdin.flush()
662 self.process.stdin.flush()
663
663
664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
665
665
666 remote_profile_dir = Unicode('', config=True,
666 remote_profile_dir = Unicode('', config=True,
667 help="""The remote profile_dir to use.
667 help="""The remote profile_dir to use.
668
668
669 If not specified, use calling profile, stripping out possible leading homedir.
669 If not specified, use calling profile, stripping out possible leading homedir.
670 """)
670 """)
671
671
672 def _profile_dir_changed(self, name, old, new):
672 def _profile_dir_changed(self, name, old, new):
673 if not self.remote_profile_dir:
673 if not self.remote_profile_dir:
674 # trigger remote_profile_dir_default logic again,
674 # trigger remote_profile_dir_default logic again,
675 # in case it was already triggered before profile_dir was set
675 # in case it was already triggered before profile_dir was set
676 self.remote_profile_dir = self._strip_home(new)
676 self.remote_profile_dir = self._strip_home(new)
677
677
678 @staticmethod
678 @staticmethod
679 def _strip_home(path):
679 def _strip_home(path):
680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
681 home = get_home_dir()
681 home = get_home_dir()
682 if not home.endswith('/'):
682 if not home.endswith('/'):
683 home = home+'/'
683 home = home+'/'
684
684
685 if path.startswith(home):
685 if path.startswith(home):
686 return path[len(home):]
686 return path[len(home):]
687 else:
687 else:
688 return path
688 return path
689
689
690 def _remote_profile_dir_default(self):
690 def _remote_profile_dir_default(self):
691 return self._strip_home(self.profile_dir)
691 return self._strip_home(self.profile_dir)
692
692
693 def _cluster_id_changed(self, name, old, new):
693 def _cluster_id_changed(self, name, old, new):
694 if new:
694 if new:
695 raise ValueError("cluster id not supported by SSH launchers")
695 raise ValueError("cluster id not supported by SSH launchers")
696
696
697 @property
697 @property
698 def cluster_args(self):
698 def cluster_args(self):
699 return ['--profile-dir', self.remote_profile_dir]
699 return ['--profile-dir', self.remote_profile_dir]
700
700
701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
702
702
703 # alias back to *non-configurable* program[_args] for use in find_args()
703 # alias back to *non-configurable* program[_args] for use in find_args()
704 # this way all Controller/EngineSetLaunchers have the same form, rather
704 # this way all Controller/EngineSetLaunchers have the same form, rather
705 # than *some* having `program_args` and others `controller_args`
705 # than *some* having `program_args` and others `controller_args`
706
706
707 def _controller_cmd_default(self):
707 def _controller_cmd_default(self):
708 return ['ipcontroller']
708 return ['ipcontroller']
709
709
710 @property
710 @property
711 def program(self):
711 def program(self):
712 return self.controller_cmd
712 return self.controller_cmd
713
713
714 @property
714 @property
715 def program_args(self):
715 def program_args(self):
716 return self.cluster_args + self.controller_args
716 return self.cluster_args + self.controller_args
717
717
718 def _to_fetch_default(self):
718 def _to_fetch_default(self):
719 return [
719 return [
720 (os.path.join(self.remote_profile_dir, 'security', cf),
720 (os.path.join(self.remote_profile_dir, 'security', cf),
721 os.path.join(self.profile_dir, 'security', cf),)
721 os.path.join(self.profile_dir, 'security', cf),)
722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 ]
723 ]
724
724
725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
726
726
727 # alias back to *non-configurable* program[_args] for use in find_args()
727 # alias back to *non-configurable* program[_args] for use in find_args()
728 # this way all Controller/EngineSetLaunchers have the same form, rather
728 # this way all Controller/EngineSetLaunchers have the same form, rather
729 # than *some* having `program_args` and others `controller_args`
729 # than *some* having `program_args` and others `controller_args`
730
730
731 def _engine_cmd_default(self):
731 def _engine_cmd_default(self):
732 return ['ipengine']
732 return ['ipengine']
733
733
734 @property
734 @property
735 def program(self):
735 def program(self):
736 return self.engine_cmd
736 return self.engine_cmd
737
737
738 @property
738 @property
739 def program_args(self):
739 def program_args(self):
740 return self.cluster_args + self.engine_args
740 return self.cluster_args + self.engine_args
741
741
742 def _to_send_default(self):
742 def _to_send_default(self):
743 return [
743 return [
744 (os.path.join(self.profile_dir, 'security', cf),
744 (os.path.join(self.profile_dir, 'security', cf),
745 os.path.join(self.remote_profile_dir, 'security', cf))
745 os.path.join(self.remote_profile_dir, 'security', cf))
746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
747 ]
747 ]
748
748
749
749
750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
751 launcher_class = SSHEngineLauncher
751 launcher_class = SSHEngineLauncher
752 engines = Dict(config=True,
752 engines = Dict(config=True,
753 help="""dict of engines to launch. This is a dict by hostname of ints,
753 help="""dict of engines to launch. This is a dict by hostname of ints,
754 corresponding to the number of engines to start on that host.""")
754 corresponding to the number of engines to start on that host.""")
755
755
756 def _engine_cmd_default(self):
756 def _engine_cmd_default(self):
757 return ['ipengine']
757 return ['ipengine']
758
758
759 @property
759 @property
760 def engine_count(self):
760 def engine_count(self):
761 """determine engine count from `engines` dict"""
761 """determine engine count from `engines` dict"""
762 count = 0
762 count = 0
763 for n in self.engines.itervalues():
763 for n in self.engines.itervalues():
764 if isinstance(n, (tuple,list)):
764 if isinstance(n, (tuple,list)):
765 n,args = n
765 n,args = n
766 count += n
766 count += n
767 return count
767 return count
768
768
769 def start(self, n):
769 def start(self, n):
770 """Start engines by profile or profile_dir.
770 """Start engines by profile or profile_dir.
771 `n` is ignored, and the `engines` config property is used instead.
771 `n` is ignored, and the `engines` config property is used instead.
772 """
772 """
773
773
774 dlist = []
774 dlist = []
775 for host, n in self.engines.iteritems():
775 for host, n in self.engines.iteritems():
776 if isinstance(n, (tuple, list)):
776 if isinstance(n, (tuple, list)):
777 n, args = n
777 n, args = n
778 else:
778 else:
779 args = copy.deepcopy(self.engine_args)
779 args = copy.deepcopy(self.engine_args)
780
780
781 if '@' in host:
781 if '@' in host:
782 user,host = host.split('@',1)
782 user,host = host.split('@',1)
783 else:
783 else:
784 user=None
784 user=None
785 for i in range(n):
785 for i in range(n):
786 if i > 0:
786 if i > 0:
787 time.sleep(self.delay)
787 time.sleep(self.delay)
788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
790 )
790 )
791 if i > 0:
791 if i > 0:
792 # only send files for the first engine on each host
792 # only send files for the first engine on each host
793 el.to_send = []
793 el.to_send = []
794
794
795 # Copy the engine args over to each engine launcher.
795 # Copy the engine args over to each engine launcher.
796 el.engine_cmd = self.engine_cmd
796 el.engine_cmd = self.engine_cmd
797 el.engine_args = args
797 el.engine_args = args
798 el.on_stop(self._notice_engine_stopped)
798 el.on_stop(self._notice_engine_stopped)
799 d = el.start(user=user, hostname=host)
799 d = el.start(user=user, hostname=host)
800 self.launchers[ "%s/%i" % (host,i) ] = el
800 self.launchers[ "%s/%i" % (host,i) ] = el
801 dlist.append(d)
801 dlist.append(d)
802 self.notify_start(dlist)
802 self.notify_start(dlist)
803 return dlist
803 return dlist
804
804
805
805
806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
807 """Launcher for calling
807 """Launcher for calling
808 `ipcluster engines` on a remote machine.
808 `ipcluster engines` on a remote machine.
809
809
810 Requires that remote profile is already configured.
810 Requires that remote profile is already configured.
811 """
811 """
812
812
813 n = Integer()
813 n = Integer()
814 ipcluster_cmd = List(['ipcluster'], config=True)
814 ipcluster_cmd = List(['ipcluster'], config=True)
815
815
816 @property
816 @property
817 def program(self):
817 def program(self):
818 return self.ipcluster_cmd + ['engines']
818 return self.ipcluster_cmd + ['engines']
819
819
820 @property
820 @property
821 def program_args(self):
821 def program_args(self):
822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
823
823
824 def _to_send_default(self):
824 def _to_send_default(self):
825 return [
825 return [
826 (os.path.join(self.profile_dir, 'security', cf),
826 (os.path.join(self.profile_dir, 'security', cf),
827 os.path.join(self.remote_profile_dir, 'security', cf))
827 os.path.join(self.remote_profile_dir, 'security', cf))
828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
829 ]
829 ]
830
830
831 def start(self, n):
831 def start(self, n):
832 self.n = n
832 self.n = n
833 super(SSHProxyEngineSetLauncher, self).start()
833 super(SSHProxyEngineSetLauncher, self).start()
834
834
835
835
836 #-----------------------------------------------------------------------------
836 #-----------------------------------------------------------------------------
837 # Windows HPC Server 2008 scheduler launchers
837 # Windows HPC Server 2008 scheduler launchers
838 #-----------------------------------------------------------------------------
838 #-----------------------------------------------------------------------------
839
839
840
840
841 # This is only used on Windows.
841 # This is only used on Windows.
842 def find_job_cmd():
842 def find_job_cmd():
843 if WINDOWS:
843 if WINDOWS:
844 try:
844 try:
845 return find_cmd('job')
845 return find_cmd('job')
846 except (FindCmdError, ImportError):
846 except (FindCmdError, ImportError):
847 # ImportError will be raised if win32api is not installed
847 # ImportError will be raised if win32api is not installed
848 return 'job'
848 return 'job'
849 else:
849 else:
850 return 'job'
850 return 'job'
851
851
852
852
853 class WindowsHPCLauncher(BaseLauncher):
853 class WindowsHPCLauncher(BaseLauncher):
854
854
855 job_id_regexp = CRegExp(r'\d+', config=True,
855 job_id_regexp = CRegExp(r'\d+', config=True,
856 help="""A regular expression used to get the job id from the output of the
856 help="""A regular expression used to get the job id from the output of the
857 submit_command. """
857 submit_command. """
858 )
858 )
859 job_file_name = Unicode(u'ipython_job.xml', config=True,
859 job_file_name = Unicode(u'ipython_job.xml', config=True,
860 help="The filename of the instantiated job script.")
860 help="The filename of the instantiated job script.")
861 # The full path to the instantiated job script. This gets made dynamically
861 # The full path to the instantiated job script. This gets made dynamically
862 # by combining the work_dir with the job_file_name.
862 # by combining the work_dir with the job_file_name.
863 job_file = Unicode(u'')
863 job_file = Unicode(u'')
864 scheduler = Unicode('', config=True,
864 scheduler = Unicode('', config=True,
865 help="The hostname of the scheduler to submit the job to.")
865 help="The hostname of the scheduler to submit the job to.")
866 job_cmd = Unicode(find_job_cmd(), config=True,
866 job_cmd = Unicode(find_job_cmd(), config=True,
867 help="The command for submitting jobs.")
867 help="The command for submitting jobs.")
868
868
869 def __init__(self, work_dir=u'.', config=None, **kwargs):
869 def __init__(self, work_dir=u'.', config=None, **kwargs):
870 super(WindowsHPCLauncher, self).__init__(
870 super(WindowsHPCLauncher, self).__init__(
871 work_dir=work_dir, config=config, **kwargs
871 work_dir=work_dir, config=config, **kwargs
872 )
872 )
873
873
874 @property
874 @property
875 def job_file(self):
875 def job_file(self):
876 return os.path.join(self.work_dir, self.job_file_name)
876 return os.path.join(self.work_dir, self.job_file_name)
877
877
878 def write_job_file(self, n):
878 def write_job_file(self, n):
879 raise NotImplementedError("Implement write_job_file in a subclass.")
879 raise NotImplementedError("Implement write_job_file in a subclass.")
880
880
881 def find_args(self):
881 def find_args(self):
882 return [u'job.exe']
882 return [u'job.exe']
883
883
884 def parse_job_id(self, output):
884 def parse_job_id(self, output):
885 """Take the output of the submit command and return the job id."""
885 """Take the output of the submit command and return the job id."""
886 m = self.job_id_regexp.search(output)
886 m = self.job_id_regexp.search(output)
887 if m is not None:
887 if m is not None:
888 job_id = m.group()
888 job_id = m.group()
889 else:
889 else:
890 raise LauncherError("Job id couldn't be determined: %s" % output)
890 raise LauncherError("Job id couldn't be determined: %s" % output)
891 self.job_id = job_id
891 self.job_id = job_id
892 self.log.info('Job started with id: %r', job_id)
892 self.log.info('Job started with id: %r', job_id)
893 return job_id
893 return job_id
894
894
895 def start(self, n):
895 def start(self, n):
896 """Start n copies of the process using the Win HPC job scheduler."""
896 """Start n copies of the process using the Win HPC job scheduler."""
897 self.write_job_file(n)
897 self.write_job_file(n)
898 args = [
898 args = [
899 'submit',
899 'submit',
900 '/jobfile:%s' % self.job_file,
900 '/jobfile:%s' % self.job_file,
901 '/scheduler:%s' % self.scheduler
901 '/scheduler:%s' % self.scheduler
902 ]
902 ]
903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904
904
905 output = check_output([self.job_cmd]+args,
905 output = check_output([self.job_cmd]+args,
906 env=os.environ,
906 env=os.environ,
907 cwd=self.work_dir,
907 cwd=self.work_dir,
908 stderr=STDOUT
908 stderr=STDOUT
909 )
909 )
910 output = output.decode(DEFAULT_ENCODING, 'replace')
910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 job_id = self.parse_job_id(output)
911 job_id = self.parse_job_id(output)
912 self.notify_start(job_id)
912 self.notify_start(job_id)
913 return job_id
913 return job_id
914
914
915 def stop(self):
915 def stop(self):
916 args = [
916 args = [
917 'cancel',
917 'cancel',
918 self.job_id,
918 self.job_id,
919 '/scheduler:%s' % self.scheduler
919 '/scheduler:%s' % self.scheduler
920 ]
920 ]
921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
922 try:
922 try:
923 output = check_output([self.job_cmd]+args,
923 output = check_output([self.job_cmd]+args,
924 env=os.environ,
924 env=os.environ,
925 cwd=self.work_dir,
925 cwd=self.work_dir,
926 stderr=STDOUT
926 stderr=STDOUT
927 )
927 )
928 output = output.decode(DEFAULT_ENCODING, 'replace')
928 output = output.decode(DEFAULT_ENCODING, 'replace')
929 except:
929 except:
930 output = u'The job already appears to be stopped: %r' % self.job_id
930 output = u'The job already appears to be stopped: %r' % self.job_id
931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
932 return output
932 return output
933
933
934
934
935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
936
936
937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
938 help="WinHPC xml job file.")
938 help="WinHPC xml job file.")
939 controller_args = List([], config=False,
939 controller_args = List([], config=False,
940 help="extra args to pass to ipcontroller")
940 help="extra args to pass to ipcontroller")
941
941
942 def write_job_file(self, n):
942 def write_job_file(self, n):
943 job = IPControllerJob(config=self.config)
943 job = IPControllerJob(config=self.config)
944
944
945 t = IPControllerTask(config=self.config)
945 t = IPControllerTask(config=self.config)
946 # The tasks work directory is *not* the actual work directory of
946 # The tasks work directory is *not* the actual work directory of
947 # the controller. It is used as the base path for the stdout/stderr
947 # the controller. It is used as the base path for the stdout/stderr
948 # files that the scheduler redirects to.
948 # files that the scheduler redirects to.
949 t.work_directory = self.profile_dir
949 t.work_directory = self.profile_dir
950 # Add the profile_dir and from self.start().
950 # Add the profile_dir and from self.start().
951 t.controller_args.extend(self.cluster_args)
951 t.controller_args.extend(self.cluster_args)
952 t.controller_args.extend(self.controller_args)
952 t.controller_args.extend(self.controller_args)
953 job.add_task(t)
953 job.add_task(t)
954
954
955 self.log.debug("Writing job description file: %s", self.job_file)
955 self.log.debug("Writing job description file: %s", self.job_file)
956 job.write(self.job_file)
956 job.write(self.job_file)
957
957
958 @property
958 @property
959 def job_file(self):
959 def job_file(self):
960 return os.path.join(self.profile_dir, self.job_file_name)
960 return os.path.join(self.profile_dir, self.job_file_name)
961
961
962 def start(self):
962 def start(self):
963 """Start the controller by profile_dir."""
963 """Start the controller by profile_dir."""
964 return super(WindowsHPCControllerLauncher, self).start(1)
964 return super(WindowsHPCControllerLauncher, self).start(1)
965
965
966
966
967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
968
968
969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
970 help="jobfile for ipengines job")
970 help="jobfile for ipengines job")
971 engine_args = List([], config=False,
971 engine_args = List([], config=False,
972 help="extra args to pas to ipengine")
972 help="extra args to pas to ipengine")
973
973
974 def write_job_file(self, n):
974 def write_job_file(self, n):
975 job = IPEngineSetJob(config=self.config)
975 job = IPEngineSetJob(config=self.config)
976
976
977 for i in range(n):
977 for i in range(n):
978 t = IPEngineTask(config=self.config)
978 t = IPEngineTask(config=self.config)
979 # The tasks work directory is *not* the actual work directory of
979 # The tasks work directory is *not* the actual work directory of
980 # the engine. It is used as the base path for the stdout/stderr
980 # the engine. It is used as the base path for the stdout/stderr
981 # files that the scheduler redirects to.
981 # files that the scheduler redirects to.
982 t.work_directory = self.profile_dir
982 t.work_directory = self.profile_dir
983 # Add the profile_dir and from self.start().
983 # Add the profile_dir and from self.start().
984 t.engine_args.extend(self.cluster_args)
984 t.engine_args.extend(self.cluster_args)
985 t.engine_args.extend(self.engine_args)
985 t.engine_args.extend(self.engine_args)
986 job.add_task(t)
986 job.add_task(t)
987
987
988 self.log.debug("Writing job description file: %s", self.job_file)
988 self.log.debug("Writing job description file: %s", self.job_file)
989 job.write(self.job_file)
989 job.write(self.job_file)
990
990
991 @property
991 @property
992 def job_file(self):
992 def job_file(self):
993 return os.path.join(self.profile_dir, self.job_file_name)
993 return os.path.join(self.profile_dir, self.job_file_name)
994
994
995 def start(self, n):
995 def start(self, n):
996 """Start the controller by profile_dir."""
996 """Start the controller by profile_dir."""
997 return super(WindowsHPCEngineSetLauncher, self).start(n)
997 return super(WindowsHPCEngineSetLauncher, self).start(n)
998
998
999
999
1000 #-----------------------------------------------------------------------------
1000 #-----------------------------------------------------------------------------
1001 # Batch (PBS) system launchers
1001 # Batch (PBS) system launchers
1002 #-----------------------------------------------------------------------------
1002 #-----------------------------------------------------------------------------
1003
1003
1004 class BatchClusterAppMixin(ClusterAppMixin):
1004 class BatchClusterAppMixin(ClusterAppMixin):
1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1006 def _profile_dir_changed(self, name, old, new):
1006 def _profile_dir_changed(self, name, old, new):
1007 self.context[name] = new
1007 self.context[name] = new
1008 _cluster_id_changed = _profile_dir_changed
1008 _cluster_id_changed = _profile_dir_changed
1009
1009
1010 def _profile_dir_default(self):
1010 def _profile_dir_default(self):
1011 self.context['profile_dir'] = ''
1011 self.context['profile_dir'] = ''
1012 return ''
1012 return ''
1013 def _cluster_id_default(self):
1013 def _cluster_id_default(self):
1014 self.context['cluster_id'] = ''
1014 self.context['cluster_id'] = ''
1015 return ''
1015 return ''
1016
1016
1017
1017
1018 class BatchSystemLauncher(BaseLauncher):
1018 class BatchSystemLauncher(BaseLauncher):
1019 """Launch an external process using a batch system.
1019 """Launch an external process using a batch system.
1020
1020
1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1022 GridEngine, etc. The overall model is that there are different commands
1022 GridEngine, etc. The overall model is that there are different commands
1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1024
1024
1025 This class also has the notion of a batch script. The ``batch_template``
1025 This class also has the notion of a batch script. The ``batch_template``
1026 attribute can be set to a string that is a template for the batch script.
1026 attribute can be set to a string that is a template for the batch script.
1027 This template is instantiated using string formatting. Thus the template can
1027 This template is instantiated using string formatting. Thus the template can
1028 use {n} fot the number of instances. Subclasses can add additional variables
1028 use {n} fot the number of instances. Subclasses can add additional variables
1029 to the template dict.
1029 to the template dict.
1030 """
1030 """
1031
1031
1032 # Subclasses must fill these in. See PBSEngineSet
1032 # Subclasses must fill these in. See PBSEngineSet
1033 submit_command = List([''], config=True,
1033 submit_command = List([''], config=True,
1034 help="The name of the command line program used to submit jobs.")
1034 help="The name of the command line program used to submit jobs.")
1035 delete_command = List([''], config=True,
1035 delete_command = List([''], config=True,
1036 help="The name of the command line program used to delete jobs.")
1036 help="The name of the command line program used to delete jobs.")
1037 job_id_regexp = CRegExp('', config=True,
1037 job_id_regexp = CRegExp('', config=True,
1038 help="""A regular expression used to get the job id from the output of the
1038 help="""A regular expression used to get the job id from the output of the
1039 submit_command.""")
1039 submit_command.""")
1040 job_id_regexp_group = Integer(0, config=True,
1040 job_id_regexp_group = Integer(0, config=True,
1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1042 batch_template = Unicode('', config=True,
1042 batch_template = Unicode('', config=True,
1043 help="The string that is the batch script template itself.")
1043 help="The string that is the batch script template itself.")
1044 batch_template_file = Unicode(u'', config=True,
1044 batch_template_file = Unicode(u'', config=True,
1045 help="The file that contains the batch template.")
1045 help="The file that contains the batch template.")
1046 batch_file_name = Unicode(u'batch_script', config=True,
1046 batch_file_name = Unicode(u'batch_script', config=True,
1047 help="The filename of the instantiated batch script.")
1047 help="The filename of the instantiated batch script.")
1048 queue = Unicode(u'', config=True,
1048 queue = Unicode(u'', config=True,
1049 help="The PBS Queue.")
1049 help="The PBS Queue.")
1050
1050
1051 def _queue_changed(self, name, old, new):
1051 def _queue_changed(self, name, old, new):
1052 self.context[name] = new
1052 self.context[name] = new
1053
1053
1054 n = Integer(1)
1054 n = Integer(1)
1055 _n_changed = _queue_changed
1055 _n_changed = _queue_changed
1056
1056
1057 # not configurable, override in subclasses
1057 # not configurable, override in subclasses
1058 # PBS Job Array regex
1058 # PBS Job Array regex
1059 job_array_regexp = CRegExp('')
1059 job_array_regexp = CRegExp('')
1060 job_array_template = Unicode('')
1060 job_array_template = Unicode('')
1061 # PBS Queue regex
1061 # PBS Queue regex
1062 queue_regexp = CRegExp('')
1062 queue_regexp = CRegExp('')
1063 queue_template = Unicode('')
1063 queue_template = Unicode('')
1064 # The default batch template, override in subclasses
1064 # The default batch template, override in subclasses
1065 default_template = Unicode('')
1065 default_template = Unicode('')
1066 # The full path to the instantiated batch script.
1066 # The full path to the instantiated batch script.
1067 batch_file = Unicode(u'')
1067 batch_file = Unicode(u'')
1068 # the format dict used with batch_template:
1068 # the format dict used with batch_template:
1069 context = Dict()
1069 context = Dict()
1070
1070
1071 def _context_default(self):
1071 def _context_default(self):
1072 """load the default context with the default values for the basic keys
1072 """load the default context with the default values for the basic keys
1073
1073
1074 because the _trait_changed methods only load the context if they
1074 because the _trait_changed methods only load the context if they
1075 are set to something other than the default value.
1075 are set to something other than the default value.
1076 """
1076 """
1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1078
1078
1079 # the Formatter instance for rendering the templates:
1079 # the Formatter instance for rendering the templates:
1080 formatter = Instance(EvalFormatter, (), {})
1080 formatter = Instance(EvalFormatter, (), {})
1081
1081
1082 def find_args(self):
1082 def find_args(self):
1083 return self.submit_command + [self.batch_file]
1083 return self.submit_command + [self.batch_file]
1084
1084
1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1086 super(BatchSystemLauncher, self).__init__(
1086 super(BatchSystemLauncher, self).__init__(
1087 work_dir=work_dir, config=config, **kwargs
1087 work_dir=work_dir, config=config, **kwargs
1088 )
1088 )
1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1090
1090
1091 def parse_job_id(self, output):
1091 def parse_job_id(self, output):
1092 """Take the output of the submit command and return the job id."""
1092 """Take the output of the submit command and return the job id."""
1093 m = self.job_id_regexp.search(output)
1093 m = self.job_id_regexp.search(output)
1094 if m is not None:
1094 if m is not None:
1095 job_id = m.group(self.job_id_regexp_group)
1095 job_id = m.group(self.job_id_regexp_group)
1096 else:
1096 else:
1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1098 self.job_id = job_id
1098 self.job_id = job_id
1099 self.log.info('Job submitted with job id: %r', job_id)
1099 self.log.info('Job submitted with job id: %r', job_id)
1100 return job_id
1100 return job_id
1101
1101
1102 def write_batch_script(self, n):
1102 def write_batch_script(self, n):
1103 """Instantiate and write the batch script to the work_dir."""
1103 """Instantiate and write the batch script to the work_dir."""
1104 self.n = n
1104 self.n = n
1105 # first priority is batch_template if set
1105 # first priority is batch_template if set
1106 if self.batch_template_file and not self.batch_template:
1106 if self.batch_template_file and not self.batch_template:
1107 # second priority is batch_template_file
1107 # second priority is batch_template_file
1108 with open(self.batch_template_file) as f:
1108 with open(self.batch_template_file) as f:
1109 self.batch_template = f.read()
1109 self.batch_template = f.read()
1110 if not self.batch_template:
1110 if not self.batch_template:
1111 # third (last) priority is default_template
1111 # third (last) priority is default_template
1112 self.batch_template = self.default_template
1112 self.batch_template = self.default_template
1113 # add jobarray or queue lines to user-specified template
1113 # add jobarray or queue lines to user-specified template
1114 # note that this is *only* when user did not specify a template.
1114 # note that this is *only* when user did not specify a template.
1115 self._insert_queue_in_script()
1115 self._insert_queue_in_script()
1116 self._insert_job_array_in_script()
1116 self._insert_job_array_in_script()
1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1118 self.log.debug('Writing batch script: %s', self.batch_file)
1118 self.log.debug('Writing batch script: %s', self.batch_file)
1119 with open(self.batch_file, 'w') as f:
1119 with open(self.batch_file, 'w') as f:
1120 f.write(script_as_string)
1120 f.write(script_as_string)
1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1122
1122
1123 def _insert_queue_in_script(self):
1123 def _insert_queue_in_script(self):
1124 """Inserts a queue if required into the batch script.
1124 """Inserts a queue if required into the batch script.
1125 """
1125 """
1126 print self.queue_regexp.search(self.batch_template)
1126 print self.queue_regexp.search(self.batch_template)
1127 if self.queue and not self.queue_regexp.search(self.batch_template):
1127 if self.queue and not self.queue_regexp.search(self.batch_template):
1128 self.log.debug("adding PBS queue settings to batch script")
1128 self.log.debug("adding PBS queue settings to batch script")
1129 firstline, rest = self.batch_template.split('\n',1)
1129 firstline, rest = self.batch_template.split('\n',1)
1130 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1130 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1131
1131
1132 def _insert_job_array_in_script(self):
1132 def _insert_job_array_in_script(self):
1133 """Inserts a job array if required into the batch script.
1133 """Inserts a job array if required into the batch script.
1134 """
1134 """
1135 print self.job_array_regexp.search(self.batch_template)
1135 print self.job_array_regexp.search(self.batch_template)
1136 if not self.job_array_regexp.search(self.batch_template):
1136 if not self.job_array_regexp.search(self.batch_template):
1137 self.log.debug("adding job array settings to batch script")
1137 self.log.debug("adding job array settings to batch script")
1138 firstline, rest = self.batch_template.split('\n',1)
1138 firstline, rest = self.batch_template.split('\n',1)
1139 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1139 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1140
1140
1141 def start(self, n):
1141 def start(self, n):
1142 """Start n copies of the process using a batch system."""
1142 """Start n copies of the process using a batch system."""
1143 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1143 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1144 # Here we save profile_dir in the context so they
1144 # Here we save profile_dir in the context so they
1145 # can be used in the batch script template as {profile_dir}
1145 # can be used in the batch script template as {profile_dir}
1146 self.write_batch_script(n)
1146 self.write_batch_script(n)
1147 output = check_output(self.args, env=os.environ)
1147 output = check_output(self.args, env=os.environ)
1148 output = output.decode(DEFAULT_ENCODING, 'replace')
1148 output = output.decode(DEFAULT_ENCODING, 'replace')
1149
1149
1150 job_id = self.parse_job_id(output)
1150 job_id = self.parse_job_id(output)
1151 self.notify_start(job_id)
1151 self.notify_start(job_id)
1152 return job_id
1152 return job_id
1153
1153
1154 def stop(self):
1154 def stop(self):
1155 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1155 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1156 output = output.decode(DEFAULT_ENCODING, 'replace')
1156 output = output.decode(DEFAULT_ENCODING, 'replace')
1157 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1157 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1158 return output
1158 return output
1159
1159
1160
1160
1161 class PBSLauncher(BatchSystemLauncher):
1161 class PBSLauncher(BatchSystemLauncher):
1162 """A BatchSystemLauncher subclass for PBS."""
1162 """A BatchSystemLauncher subclass for PBS."""
1163
1163
1164 submit_command = List(['qsub'], config=True,
1164 submit_command = List(['qsub'], config=True,
1165 help="The PBS submit command ['qsub']")
1165 help="The PBS submit command ['qsub']")
1166 delete_command = List(['qdel'], config=True,
1166 delete_command = List(['qdel'], config=True,
1167 help="The PBS delete command ['qsub']")
1167 help="The PBS delete command ['qsub']")
1168 job_id_regexp = CRegExp(r'\d+', config=True,
1168 job_id_regexp = CRegExp(r'\d+', config=True,
1169 help="Regular expresion for identifying the job ID [r'\d+']")
1169 help="Regular expresion for identifying the job ID [r'\d+']")
1170
1170
1171 batch_file = Unicode(u'')
1171 batch_file = Unicode(u'')
1172 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1172 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1173 job_array_template = Unicode('#PBS -t 1-{n}')
1173 job_array_template = Unicode('#PBS -t 1-{n}')
1174 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1174 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1175 queue_template = Unicode('#PBS -q {queue}')
1175 queue_template = Unicode('#PBS -q {queue}')
1176
1176
1177
1177
1178 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1178 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1179 """Launch a controller using PBS."""
1179 """Launch a controller using PBS."""
1180
1180
1181 batch_file_name = Unicode(u'pbs_controller', config=True,
1181 batch_file_name = Unicode(u'pbs_controller', config=True,
1182 help="batch file name for the controller job.")
1182 help="batch file name for the controller job.")
1183 default_template= Unicode("""#!/bin/sh
1183 default_template= Unicode("""#!/bin/sh
1184 #PBS -V
1184 #PBS -V
1185 #PBS -N ipcontroller
1185 #PBS -N ipcontroller
1186 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1186 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1187 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188
1188
1189
1189
1190 def start(self):
1190 def start(self):
1191 """Start the controller by profile or profile_dir."""
1191 """Start the controller by profile or profile_dir."""
1192 return super(PBSControllerLauncher, self).start(1)
1192 return super(PBSControllerLauncher, self).start(1)
1193
1193
1194
1194
1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1196 """Launch Engines using PBS"""
1196 """Launch Engines using PBS"""
1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1198 help="batch file name for the engine(s) job.")
1198 help="batch file name for the engine(s) job.")
1199 default_template= Unicode(u"""#!/bin/sh
1199 default_template= Unicode(u"""#!/bin/sh
1200 #PBS -V
1200 #PBS -V
1201 #PBS -N ipengine
1201 #PBS -N ipengine
1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1204
1204
1205 def start(self, n):
1205 def start(self, n):
1206 """Start n engines by profile or profile_dir."""
1206 """Start n engines by profile or profile_dir."""
1207 return super(PBSEngineSetLauncher, self).start(n)
1207 return super(PBSEngineSetLauncher, self).start(n)
1208
1208
1209
1209
1210 #SGE is very similar to PBS
1210 #SGE is very similar to PBS
1211
1211
1212 class SGELauncher(PBSLauncher):
1212 class SGELauncher(PBSLauncher):
1213 """Sun GridEngine is a PBS clone with slightly different syntax"""
1213 """Sun GridEngine is a PBS clone with slightly different syntax"""
1214 job_array_regexp = CRegExp('#\$\W+\-t')
1214 job_array_regexp = CRegExp('#\$\W+\-t')
1215 job_array_template = Unicode('#$ -t 1-{n}')
1215 job_array_template = Unicode('#$ -t 1-{n}')
1216 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1216 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1217 queue_template = Unicode('#$ -q {queue}')
1217 queue_template = Unicode('#$ -q {queue}')
1218
1218
1219
1219
1220 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1220 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1221 """Launch a controller using SGE."""
1221 """Launch a controller using SGE."""
1222
1222
1223 batch_file_name = Unicode(u'sge_controller', config=True,
1223 batch_file_name = Unicode(u'sge_controller', config=True,
1224 help="batch file name for the ipontroller job.")
1224 help="batch file name for the ipontroller job.")
1225 default_template= Unicode(u"""#$ -V
1225 default_template= Unicode(u"""#$ -V
1226 #$ -S /bin/sh
1226 #$ -S /bin/sh
1227 #$ -N ipcontroller
1227 #$ -N ipcontroller
1228 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1228 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1229 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1229 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1230
1230
1231 def start(self):
1231 def start(self):
1232 """Start the controller by profile or profile_dir."""
1232 """Start the controller by profile or profile_dir."""
1233 return super(SGEControllerLauncher, self).start(1)
1233 return super(SGEControllerLauncher, self).start(1)
1234
1234
1235
1235
1236 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1236 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1237 """Launch Engines with SGE"""
1237 """Launch Engines with SGE"""
1238 batch_file_name = Unicode(u'sge_engines', config=True,
1238 batch_file_name = Unicode(u'sge_engines', config=True,
1239 help="batch file name for the engine(s) job.")
1239 help="batch file name for the engine(s) job.")
1240 default_template = Unicode("""#$ -V
1240 default_template = Unicode("""#$ -V
1241 #$ -S /bin/sh
1241 #$ -S /bin/sh
1242 #$ -N ipengine
1242 #$ -N ipengine
1243 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1243 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1244 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1244 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1245
1245
1246 def start(self, n):
1246 def start(self, n):
1247 """Start n engines by profile or profile_dir."""
1247 """Start n engines by profile or profile_dir."""
1248 return super(SGEEngineSetLauncher, self).start(n)
1248 return super(SGEEngineSetLauncher, self).start(n)
1249
1249
1250
1250
1251 # LSF launchers
1251 # LSF launchers
1252
1252
1253 class LSFLauncher(BatchSystemLauncher):
1253 class LSFLauncher(BatchSystemLauncher):
1254 """A BatchSystemLauncher subclass for LSF."""
1254 """A BatchSystemLauncher subclass for LSF."""
1255
1255
1256 submit_command = List(['bsub'], config=True,
1256 submit_command = List(['bsub'], config=True,
1257 help="The PBS submit command ['bsub']")
1257 help="The PBS submit command ['bsub']")
1258 delete_command = List(['bkill'], config=True,
1258 delete_command = List(['bkill'], config=True,
1259 help="The PBS delete command ['bkill']")
1259 help="The PBS delete command ['bkill']")
1260 job_id_regexp = CRegExp(r'\d+', config=True,
1260 job_id_regexp = CRegExp(r'\d+', config=True,
1261 help="Regular expresion for identifying the job ID [r'\d+']")
1261 help="Regular expresion for identifying the job ID [r'\d+']")
1262
1262
1263 batch_file = Unicode(u'')
1263 batch_file = Unicode(u'')
1264 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1264 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1265 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1265 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1266 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1266 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1267 queue_template = Unicode('#BSUB -q {queue}')
1267 queue_template = Unicode('#BSUB -q {queue}')
1268
1268
1269 def start(self, n):
1269 def start(self, n):
1270 """Start n copies of the process using LSF batch system.
1270 """Start n copies of the process using LSF batch system.
1271 This cant inherit from the base class because bsub expects
1271 This cant inherit from the base class because bsub expects
1272 to be piped a shell script in order to honor the #BSUB directives :
1272 to be piped a shell script in order to honor the #BSUB directives :
1273 bsub < script
1273 bsub < script
1274 """
1274 """
1275 # Here we save profile_dir in the context so they
1275 # Here we save profile_dir in the context so they
1276 # can be used in the batch script template as {profile_dir}
1276 # can be used in the batch script template as {profile_dir}
1277 self.write_batch_script(n)
1277 self.write_batch_script(n)
1278 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1278 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1279 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1279 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1280 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1280 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1281 output,err = p.communicate()
1281 output,err = p.communicate()
1282 output = output.decode(DEFAULT_ENCODING, 'replace')
1282 output = output.decode(DEFAULT_ENCODING, 'replace')
1283 job_id = self.parse_job_id(output)
1283 job_id = self.parse_job_id(output)
1284 self.notify_start(job_id)
1284 self.notify_start(job_id)
1285 return job_id
1285 return job_id
1286
1286
1287
1287
1288 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1288 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1289 """Launch a controller using LSF."""
1289 """Launch a controller using LSF."""
1290
1290
1291 batch_file_name = Unicode(u'lsf_controller', config=True,
1291 batch_file_name = Unicode(u'lsf_controller', config=True,
1292 help="batch file name for the controller job.")
1292 help="batch file name for the controller job.")
1293 default_template= Unicode("""#!/bin/sh
1293 default_template= Unicode("""#!/bin/sh
1294 #BSUB -J ipcontroller
1294 #BSUB -J ipcontroller
1295 #BSUB -oo ipcontroller.o.%%J
1295 #BSUB -oo ipcontroller.o.%%J
1296 #BSUB -eo ipcontroller.e.%%J
1296 #BSUB -eo ipcontroller.e.%%J
1297 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1297 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1298 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1298 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1299
1299
1300 def start(self):
1300 def start(self):
1301 """Start the controller by profile or profile_dir."""
1301 """Start the controller by profile or profile_dir."""
1302 return super(LSFControllerLauncher, self).start(1)
1302 return super(LSFControllerLauncher, self).start(1)
1303
1303
1304
1304
1305 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1305 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1306 """Launch Engines using LSF"""
1306 """Launch Engines using LSF"""
1307 batch_file_name = Unicode(u'lsf_engines', config=True,
1307 batch_file_name = Unicode(u'lsf_engines', config=True,
1308 help="batch file name for the engine(s) job.")
1308 help="batch file name for the engine(s) job.")
1309 default_template= Unicode(u"""#!/bin/sh
1309 default_template= Unicode(u"""#!/bin/sh
1310 #BSUB -oo ipengine.o.%%J
1310 #BSUB -oo ipengine.o.%%J
1311 #BSUB -eo ipengine.e.%%J
1311 #BSUB -eo ipengine.e.%%J
1312 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1312 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1313 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1313 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1314
1314
1315 def start(self, n):
1315 def start(self, n):
1316 """Start n engines by profile or profile_dir."""
1316 """Start n engines by profile or profile_dir."""
1317 return super(LSFEngineSetLauncher, self).start(n)
1317 return super(LSFEngineSetLauncher, self).start(n)
1318
1318
1319
1319
1320 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1320 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1321 # that the python instance but otherwise is very similar to PBS
1321 # that the python instance but otherwise is very similar to PBS
1322
1322
1323 class CondorLauncher(BatchSystemLauncher):
1323 class CondorLauncher(BatchSystemLauncher):
1324 """A BatchSystemLauncher subclass for Condor."""
1324 """A BatchSystemLauncher subclass for Condor."""
1325
1325
1326 submit_command = List(['condor_submit'], config=True,
1326 submit_command = List(['condor_submit'], config=True,
1327 help="The Condor submit command ['condor_submit']")
1327 help="The Condor submit command ['condor_submit']")
1328 delete_command = List(['condor_rm'], config=True,
1328 delete_command = List(['condor_rm'], config=True,
1329 help="The Condor delete command ['condor_rm']")
1329 help="The Condor delete command ['condor_rm']")
1330 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1330 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1331 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1331 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1332 job_id_regexp_group = Integer(1, config=True,
1332 job_id_regexp_group = Integer(1, config=True,
1333 help="""The group we wish to match in job_id_regexp [1]""")
1333 help="""The group we wish to match in job_id_regexp [1]""")
1334
1334
1335 job_array_regexp = CRegExp('queue\W+\$')
1335 job_array_regexp = CRegExp('queue\W+\$')
1336 job_array_template = Unicode('queue {n}')
1336 job_array_template = Unicode('queue {n}')
1337 # template for the submission of multiple jobs
1337
1338 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1339 # regex to find a queue if the user has specified a template
1340 queue_template = Unicode('#PBS -q {queue}')
1341 # the queue we wish to submit to. Need to know the Condor eqiv (eg ibug cluster
1342 # or general?)
1343
1338
1344 def _insert_job_array_in_script(self):
1339 def _insert_job_array_in_script(self):
1345 """Inserts a job array if required into the batch script.
1340 """Inserts a job array if required into the batch script.
1346 """
1341 """
1347 print self.job_array_regexp.search(self.batch_template)
1348 #Condor requires that the job array goes at the bottom of the
1349 #script
1350 if not self.job_array_regexp.search(self.batch_template):
1342 if not self.job_array_regexp.search(self.batch_template):
1351 self.log.debug("adding job array settings to batch script")
1343 self.log.debug("adding job array settings to batch script")
1344 #Condor requires that the job array goes at the bottom of the script
1352 self.batch_template = '\n'.join([self.batch_template,
1345 self.batch_template = '\n'.join([self.batch_template,
1353 self.job_array_template])
1346 self.job_array_template])
1354
1347
1348 def _insert_queue_in_script(self):
1349 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1350 specified in the script..
1351 """
1352 pass
1353
1355
1354
1356 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1355 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1357 """Launch a controller using Condor."""
1356 """Launch a controller using Condor."""
1358
1357
1359 batch_file_name = Unicode(u'condor_controller', config=True,
1358 batch_file_name = Unicode(u'condor_controller', config=True,
1360 help="batch file name for the controller job.")
1359 help="batch file name for the controller job.")
1361 default_template = Unicode(r"""
1360 default_template = Unicode(r"""
1362 universe = vanilla
1361 universe = vanilla
1363 executable = %s
1362 executable = %s
1364 # by default we expect a shared file system
1363 # by default we expect a shared file system
1365 transfer_executable = False
1364 transfer_executable = False
1366 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1365 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1367 """ % condor_ipcontroller_cmd_argv)
1366 """ % condor_ipcontroller_cmd_argv)
1368
1367
1369 def start(self):
1368 def start(self):
1370 """Start the controller by profile or profile_dir."""
1369 """Start the controller by profile or profile_dir."""
1371 return super(CondorControllerLauncher, self).start(1)
1370 return super(CondorControllerLauncher, self).start(1)
1372
1371
1373
1372
1374 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1373 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1375 """Launch Engines using Condor"""
1374 """Launch Engines using Condor"""
1376 batch_file_name = Unicode(u'condor_engines', config=True,
1375 batch_file_name = Unicode(u'condor_engines', config=True,
1377 help="batch file name for the engine(s) job.")
1376 help="batch file name for the engine(s) job.")
1378 default_template = Unicode("""
1377 default_template = Unicode("""
1379 universe = vanilla
1378 universe = vanilla
1380 executable = %s
1379 executable = %s
1381 # by default we expect a shared file system
1380 # by default we expect a shared file system
1382 transfer_executable = False
1381 transfer_executable = False
1383 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1382 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1384 """ % condor_ipengine_cmd_argv)
1383 """ % condor_ipengine_cmd_argv)
1385
1384
1386 def start(self, n):
1385 def start(self, n):
1387 """Start n engines by profile or profile_dir."""
1386 """Start n engines by profile or profile_dir."""
1388 return super(CondorEngineSetLauncher, self).start(n)
1387 return super(CondorEngineSetLauncher, self).start(n)
1389
1388
1390
1389
1391 #-----------------------------------------------------------------------------
1390 #-----------------------------------------------------------------------------
1392 # A launcher for ipcluster itself!
1391 # A launcher for ipcluster itself!
1393 #-----------------------------------------------------------------------------
1392 #-----------------------------------------------------------------------------
1394
1393
1395
1394
1396 class IPClusterLauncher(LocalProcessLauncher):
1395 class IPClusterLauncher(LocalProcessLauncher):
1397 """Launch the ipcluster program in an external process."""
1396 """Launch the ipcluster program in an external process."""
1398
1397
1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1398 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1400 help="Popen command for ipcluster")
1399 help="Popen command for ipcluster")
1401 ipcluster_args = List(
1400 ipcluster_args = List(
1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1401 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1403 help="Command line arguments to pass to ipcluster.")
1402 help="Command line arguments to pass to ipcluster.")
1404 ipcluster_subcommand = Unicode('start')
1403 ipcluster_subcommand = Unicode('start')
1405 profile = Unicode('default')
1404 profile = Unicode('default')
1406 n = Integer(2)
1405 n = Integer(2)
1407
1406
1408 def find_args(self):
1407 def find_args(self):
1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1408 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1409 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1411 self.ipcluster_args
1410 self.ipcluster_args
1412
1411
1413 def start(self):
1412 def start(self):
1414 return super(IPClusterLauncher, self).start()
1413 return super(IPClusterLauncher, self).start()
1415
1414
1416 #-----------------------------------------------------------------------------
1415 #-----------------------------------------------------------------------------
1417 # Collections of launchers
1416 # Collections of launchers
1418 #-----------------------------------------------------------------------------
1417 #-----------------------------------------------------------------------------
1419
1418
1420 local_launchers = [
1419 local_launchers = [
1421 LocalControllerLauncher,
1420 LocalControllerLauncher,
1422 LocalEngineLauncher,
1421 LocalEngineLauncher,
1423 LocalEngineSetLauncher,
1422 LocalEngineSetLauncher,
1424 ]
1423 ]
1425 mpi_launchers = [
1424 mpi_launchers = [
1426 MPILauncher,
1425 MPILauncher,
1427 MPIControllerLauncher,
1426 MPIControllerLauncher,
1428 MPIEngineSetLauncher,
1427 MPIEngineSetLauncher,
1429 ]
1428 ]
1430 ssh_launchers = [
1429 ssh_launchers = [
1431 SSHLauncher,
1430 SSHLauncher,
1432 SSHControllerLauncher,
1431 SSHControllerLauncher,
1433 SSHEngineLauncher,
1432 SSHEngineLauncher,
1434 SSHEngineSetLauncher,
1433 SSHEngineSetLauncher,
1435 SSHProxyEngineSetLauncher,
1434 SSHProxyEngineSetLauncher,
1436 ]
1435 ]
1437 winhpc_launchers = [
1436 winhpc_launchers = [
1438 WindowsHPCLauncher,
1437 WindowsHPCLauncher,
1439 WindowsHPCControllerLauncher,
1438 WindowsHPCControllerLauncher,
1440 WindowsHPCEngineSetLauncher,
1439 WindowsHPCEngineSetLauncher,
1441 ]
1440 ]
1442 pbs_launchers = [
1441 pbs_launchers = [
1443 PBSLauncher,
1442 PBSLauncher,
1444 PBSControllerLauncher,
1443 PBSControllerLauncher,
1445 PBSEngineSetLauncher,
1444 PBSEngineSetLauncher,
1446 ]
1445 ]
1447 sge_launchers = [
1446 sge_launchers = [
1448 SGELauncher,
1447 SGELauncher,
1449 SGEControllerLauncher,
1448 SGEControllerLauncher,
1450 SGEEngineSetLauncher,
1449 SGEEngineSetLauncher,
1451 ]
1450 ]
1452 lsf_launchers = [
1451 lsf_launchers = [
1453 LSFLauncher,
1452 LSFLauncher,
1454 LSFControllerLauncher,
1453 LSFControllerLauncher,
1455 LSFEngineSetLauncher,
1454 LSFEngineSetLauncher,
1456 ]
1455 ]
1457 condor_launchers = [
1456 condor_launchers = [
1458 CondorLauncher,
1457 CondorLauncher,
1459 CondorControllerLauncher,
1458 CondorControllerLauncher,
1460 CondorEngineSetLauncher,
1459 CondorEngineSetLauncher,
1461 ]
1460 ]
1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1461 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1463 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
1462 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
General Comments 0
You need to be logged in to leave comments. Login now