##// END OF EJS Templates
removed noop start methods on EngineSetLaunchers
James Booth -
Show More
@@ -1,1460 +1,1443 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67
67
68 from .win32support import forward_read_events
68 from .win32support import forward_read_events
69
69
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71
71
72 WINDOWS = os.name == 'nt'
72 WINDOWS = os.name == 'nt'
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # Paths to the kernel apps
75 # Paths to the kernel apps
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79
79
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81
81
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83
83
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85
85
86 # HTCondor frustratingly destroys sys.executable when launching remote processes
86 # HTCondor frustratingly destroys sys.executable when launching remote processes
87 # thus Python will default back to the system module search paths which is
87 # thus Python will default back to the system module search paths which is
88 # ridiculously fragile (not to mention destructive for virutalenvs).
88 # ridiculously fragile (not to mention destructive for virutalenvs).
89 # however, if we use the ip{cluster, engine, controller} scripts as our
89 # however, if we use the ip{cluster, engine, controller} scripts as our
90 # executable we circumvent this - the mechanism of shebanged scripts means that
90 # executable we circumvent this - the mechanism of shebanged scripts means that
91 # the python binary will be launched with argv[0] set correctly.
91 # the python binary will be launched with argv[0] set correctly.
92 # This does mean that for HTCondor we require that:
92 # This does mean that for HTCondor we require that:
93 # a. The python interpreter you are using is in a folder next to the ipengine,
93 # a. The python interpreter you are using is in a folder next to the ipengine,
94 # ipcluster and ipcontroller scripts
94 # ipcluster and ipcontroller scripts
95 # b. I have no idea what the consequences are for Windows.
95 # b. I have no idea what the consequences are for Windows.
96 bin_dir = os.path.dirname(sys.executable)
96 bin_dir = os.path.dirname(sys.executable)
97
97
98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
98 condor_ipcluster_cmd_argv = os.path.join(bin_dir, 'ipcluster')
99
99
100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
100 condor_ipengine_cmd_argv = os.path.join(bin_dir, 'ipengine')
101
101
102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
102 condor_ipcontroller_cmd_argv = os.path.join(bin_dir, 'ipcontroller')
103
103
104 #-----------------------------------------------------------------------------
104 #-----------------------------------------------------------------------------
105 # Base launchers and errors
105 # Base launchers and errors
106 #-----------------------------------------------------------------------------
106 #-----------------------------------------------------------------------------
107
107
108 class LauncherError(Exception):
108 class LauncherError(Exception):
109 pass
109 pass
110
110
111
111
112 class ProcessStateError(LauncherError):
112 class ProcessStateError(LauncherError):
113 pass
113 pass
114
114
115
115
116 class UnknownStatus(LauncherError):
116 class UnknownStatus(LauncherError):
117 pass
117 pass
118
118
119
119
120 class BaseLauncher(LoggingConfigurable):
120 class BaseLauncher(LoggingConfigurable):
121 """An asbtraction for starting, stopping and signaling a process."""
121 """An asbtraction for starting, stopping and signaling a process."""
122
122
123 # In all of the launchers, the work_dir is where child processes will be
123 # In all of the launchers, the work_dir is where child processes will be
124 # run. This will usually be the profile_dir, but may not be. any work_dir
124 # run. This will usually be the profile_dir, but may not be. any work_dir
125 # passed into the __init__ method will override the config value.
125 # passed into the __init__ method will override the config value.
126 # This should not be used to set the work_dir for the actual engine
126 # This should not be used to set the work_dir for the actual engine
127 # and controller. Instead, use their own config files or the
127 # and controller. Instead, use their own config files or the
128 # controller_args, engine_args attributes of the launchers to add
128 # controller_args, engine_args attributes of the launchers to add
129 # the work_dir option.
129 # the work_dir option.
130 work_dir = Unicode(u'.')
130 work_dir = Unicode(u'.')
131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
131 loop = Instance('zmq.eventloop.ioloop.IOLoop')
132
132
133 start_data = Any()
133 start_data = Any()
134 stop_data = Any()
134 stop_data = Any()
135
135
136 def _loop_default(self):
136 def _loop_default(self):
137 return ioloop.IOLoop.instance()
137 return ioloop.IOLoop.instance()
138
138
139 def __init__(self, work_dir=u'.', config=None, **kwargs):
139 def __init__(self, work_dir=u'.', config=None, **kwargs):
140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
140 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
141 self.state = 'before' # can be before, running, after
141 self.state = 'before' # can be before, running, after
142 self.stop_callbacks = []
142 self.stop_callbacks = []
143 self.start_data = None
143 self.start_data = None
144 self.stop_data = None
144 self.stop_data = None
145
145
146 @property
146 @property
147 def args(self):
147 def args(self):
148 """A list of cmd and args that will be used to start the process.
148 """A list of cmd and args that will be used to start the process.
149
149
150 This is what is passed to :func:`spawnProcess` and the first element
150 This is what is passed to :func:`spawnProcess` and the first element
151 will be the process name.
151 will be the process name.
152 """
152 """
153 return self.find_args()
153 return self.find_args()
154
154
155 def find_args(self):
155 def find_args(self):
156 """The ``.args`` property calls this to find the args list.
156 """The ``.args`` property calls this to find the args list.
157
157
158 Subcommand should implement this to construct the cmd and args.
158 Subcommand should implement this to construct the cmd and args.
159 """
159 """
160 raise NotImplementedError('find_args must be implemented in a subclass')
160 raise NotImplementedError('find_args must be implemented in a subclass')
161
161
162 @property
162 @property
163 def arg_str(self):
163 def arg_str(self):
164 """The string form of the program arguments."""
164 """The string form of the program arguments."""
165 return ' '.join(self.args)
165 return ' '.join(self.args)
166
166
167 @property
167 @property
168 def running(self):
168 def running(self):
169 """Am I running."""
169 """Am I running."""
170 if self.state == 'running':
170 if self.state == 'running':
171 return True
171 return True
172 else:
172 else:
173 return False
173 return False
174
174
175 def start(self):
175 def start(self):
176 """Start the process."""
176 """Start the process."""
177 raise NotImplementedError('start must be implemented in a subclass')
177 raise NotImplementedError('start must be implemented in a subclass')
178
178
179 def stop(self):
179 def stop(self):
180 """Stop the process and notify observers of stopping.
180 """Stop the process and notify observers of stopping.
181
181
182 This method will return None immediately.
182 This method will return None immediately.
183 To observe the actual process stopping, see :meth:`on_stop`.
183 To observe the actual process stopping, see :meth:`on_stop`.
184 """
184 """
185 raise NotImplementedError('stop must be implemented in a subclass')
185 raise NotImplementedError('stop must be implemented in a subclass')
186
186
187 def on_stop(self, f):
187 def on_stop(self, f):
188 """Register a callback to be called with this Launcher's stop_data
188 """Register a callback to be called with this Launcher's stop_data
189 when the process actually finishes.
189 when the process actually finishes.
190 """
190 """
191 if self.state=='after':
191 if self.state=='after':
192 return f(self.stop_data)
192 return f(self.stop_data)
193 else:
193 else:
194 self.stop_callbacks.append(f)
194 self.stop_callbacks.append(f)
195
195
196 def notify_start(self, data):
196 def notify_start(self, data):
197 """Call this to trigger startup actions.
197 """Call this to trigger startup actions.
198
198
199 This logs the process startup and sets the state to 'running'. It is
199 This logs the process startup and sets the state to 'running'. It is
200 a pass-through so it can be used as a callback.
200 a pass-through so it can be used as a callback.
201 """
201 """
202
202
203 self.log.debug('Process %r started: %r', self.args[0], data)
203 self.log.debug('Process %r started: %r', self.args[0], data)
204 self.start_data = data
204 self.start_data = data
205 self.state = 'running'
205 self.state = 'running'
206 return data
206 return data
207
207
208 def notify_stop(self, data):
208 def notify_stop(self, data):
209 """Call this to trigger process stop actions.
209 """Call this to trigger process stop actions.
210
210
211 This logs the process stopping and sets the state to 'after'. Call
211 This logs the process stopping and sets the state to 'after'. Call
212 this to trigger callbacks registered via :meth:`on_stop`."""
212 this to trigger callbacks registered via :meth:`on_stop`."""
213
213
214 self.log.debug('Process %r stopped: %r', self.args[0], data)
214 self.log.debug('Process %r stopped: %r', self.args[0], data)
215 self.stop_data = data
215 self.stop_data = data
216 self.state = 'after'
216 self.state = 'after'
217 for i in range(len(self.stop_callbacks)):
217 for i in range(len(self.stop_callbacks)):
218 d = self.stop_callbacks.pop()
218 d = self.stop_callbacks.pop()
219 d(data)
219 d(data)
220 return data
220 return data
221
221
222 def signal(self, sig):
222 def signal(self, sig):
223 """Signal the process.
223 """Signal the process.
224
224
225 Parameters
225 Parameters
226 ----------
226 ----------
227 sig : str or int
227 sig : str or int
228 'KILL', 'INT', etc., or any signal number
228 'KILL', 'INT', etc., or any signal number
229 """
229 """
230 raise NotImplementedError('signal must be implemented in a subclass')
230 raise NotImplementedError('signal must be implemented in a subclass')
231
231
232 class ClusterAppMixin(HasTraits):
232 class ClusterAppMixin(HasTraits):
233 """MixIn for cluster args as traits"""
233 """MixIn for cluster args as traits"""
234 profile_dir=Unicode('')
234 profile_dir=Unicode('')
235 cluster_id=Unicode('')
235 cluster_id=Unicode('')
236
236
237 @property
237 @property
238 def cluster_args(self):
238 def cluster_args(self):
239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
239 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
240
240
241 class ControllerMixin(ClusterAppMixin):
241 class ControllerMixin(ClusterAppMixin):
242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
242 controller_cmd = List(ipcontroller_cmd_argv, config=True,
243 help="""Popen command to launch ipcontroller.""")
243 help="""Popen command to launch ipcontroller.""")
244 # Command line arguments to ipcontroller.
244 # Command line arguments to ipcontroller.
245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
245 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
246 help="""command-line args to pass to ipcontroller""")
246 help="""command-line args to pass to ipcontroller""")
247
247
248 class EngineMixin(ClusterAppMixin):
248 class EngineMixin(ClusterAppMixin):
249 engine_cmd = List(ipengine_cmd_argv, config=True,
249 engine_cmd = List(ipengine_cmd_argv, config=True,
250 help="""command to launch the Engine.""")
250 help="""command to launch the Engine.""")
251 # Command line arguments for ipengine.
251 # Command line arguments for ipengine.
252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
252 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
253 help="command-line arguments to pass to ipengine"
253 help="command-line arguments to pass to ipengine"
254 )
254 )
255
255
256
256
257 #-----------------------------------------------------------------------------
257 #-----------------------------------------------------------------------------
258 # Local process launchers
258 # Local process launchers
259 #-----------------------------------------------------------------------------
259 #-----------------------------------------------------------------------------
260
260
261
261
262 class LocalProcessLauncher(BaseLauncher):
262 class LocalProcessLauncher(BaseLauncher):
263 """Start and stop an external process in an asynchronous manner.
263 """Start and stop an external process in an asynchronous manner.
264
264
265 This will launch the external process with a working directory of
265 This will launch the external process with a working directory of
266 ``self.work_dir``.
266 ``self.work_dir``.
267 """
267 """
268
268
269 # This is used to to construct self.args, which is passed to
269 # This is used to to construct self.args, which is passed to
270 # spawnProcess.
270 # spawnProcess.
271 cmd_and_args = List([])
271 cmd_and_args = List([])
272 poll_frequency = Integer(100) # in ms
272 poll_frequency = Integer(100) # in ms
273
273
274 def __init__(self, work_dir=u'.', config=None, **kwargs):
274 def __init__(self, work_dir=u'.', config=None, **kwargs):
275 super(LocalProcessLauncher, self).__init__(
275 super(LocalProcessLauncher, self).__init__(
276 work_dir=work_dir, config=config, **kwargs
276 work_dir=work_dir, config=config, **kwargs
277 )
277 )
278 self.process = None
278 self.process = None
279 self.poller = None
279 self.poller = None
280
280
281 def find_args(self):
281 def find_args(self):
282 return self.cmd_and_args
282 return self.cmd_and_args
283
283
284 def start(self):
284 def start(self):
285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
285 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
286 if self.state == 'before':
286 if self.state == 'before':
287 self.process = Popen(self.args,
287 self.process = Popen(self.args,
288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
288 stdout=PIPE,stderr=PIPE,stdin=PIPE,
289 env=os.environ,
289 env=os.environ,
290 cwd=self.work_dir
290 cwd=self.work_dir
291 )
291 )
292 if WINDOWS:
292 if WINDOWS:
293 self.stdout = forward_read_events(self.process.stdout)
293 self.stdout = forward_read_events(self.process.stdout)
294 self.stderr = forward_read_events(self.process.stderr)
294 self.stderr = forward_read_events(self.process.stderr)
295 else:
295 else:
296 self.stdout = self.process.stdout.fileno()
296 self.stdout = self.process.stdout.fileno()
297 self.stderr = self.process.stderr.fileno()
297 self.stderr = self.process.stderr.fileno()
298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
298 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
299 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
300 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
301 self.poller.start()
301 self.poller.start()
302 self.notify_start(self.process.pid)
302 self.notify_start(self.process.pid)
303 else:
303 else:
304 s = 'The process was already started and has state: %r' % self.state
304 s = 'The process was already started and has state: %r' % self.state
305 raise ProcessStateError(s)
305 raise ProcessStateError(s)
306
306
307 def stop(self):
307 def stop(self):
308 return self.interrupt_then_kill()
308 return self.interrupt_then_kill()
309
309
310 def signal(self, sig):
310 def signal(self, sig):
311 if self.state == 'running':
311 if self.state == 'running':
312 if WINDOWS and sig != SIGINT:
312 if WINDOWS and sig != SIGINT:
313 # use Windows tree-kill for better child cleanup
313 # use Windows tree-kill for better child cleanup
314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
314 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
315 else:
315 else:
316 self.process.send_signal(sig)
316 self.process.send_signal(sig)
317
317
318 def interrupt_then_kill(self, delay=2.0):
318 def interrupt_then_kill(self, delay=2.0):
319 """Send INT, wait a delay and then send KILL."""
319 """Send INT, wait a delay and then send KILL."""
320 try:
320 try:
321 self.signal(SIGINT)
321 self.signal(SIGINT)
322 except Exception:
322 except Exception:
323 self.log.debug("interrupt failed")
323 self.log.debug("interrupt failed")
324 pass
324 pass
325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
325 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
326 self.killer.start()
326 self.killer.start()
327
327
328 # callbacks, etc:
328 # callbacks, etc:
329
329
330 def handle_stdout(self, fd, events):
330 def handle_stdout(self, fd, events):
331 if WINDOWS:
331 if WINDOWS:
332 line = self.stdout.recv()
332 line = self.stdout.recv()
333 else:
333 else:
334 line = self.process.stdout.readline()
334 line = self.process.stdout.readline()
335 # a stopped process will be readable but return empty strings
335 # a stopped process will be readable but return empty strings
336 if line:
336 if line:
337 self.log.debug(line[:-1])
337 self.log.debug(line[:-1])
338 else:
338 else:
339 self.poll()
339 self.poll()
340
340
341 def handle_stderr(self, fd, events):
341 def handle_stderr(self, fd, events):
342 if WINDOWS:
342 if WINDOWS:
343 line = self.stderr.recv()
343 line = self.stderr.recv()
344 else:
344 else:
345 line = self.process.stderr.readline()
345 line = self.process.stderr.readline()
346 # a stopped process will be readable but return empty strings
346 # a stopped process will be readable but return empty strings
347 if line:
347 if line:
348 self.log.debug(line[:-1])
348 self.log.debug(line[:-1])
349 else:
349 else:
350 self.poll()
350 self.poll()
351
351
352 def poll(self):
352 def poll(self):
353 status = self.process.poll()
353 status = self.process.poll()
354 if status is not None:
354 if status is not None:
355 self.poller.stop()
355 self.poller.stop()
356 self.loop.remove_handler(self.stdout)
356 self.loop.remove_handler(self.stdout)
357 self.loop.remove_handler(self.stderr)
357 self.loop.remove_handler(self.stderr)
358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
358 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
359 return status
359 return status
360
360
361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
361 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
362 """Launch a controller as a regular external process."""
362 """Launch a controller as a regular external process."""
363
363
364 def find_args(self):
364 def find_args(self):
365 return self.controller_cmd + self.cluster_args + self.controller_args
365 return self.controller_cmd + self.cluster_args + self.controller_args
366
366
367 def start(self):
367 def start(self):
368 """Start the controller by profile_dir."""
368 """Start the controller by profile_dir."""
369 return super(LocalControllerLauncher, self).start()
369 return super(LocalControllerLauncher, self).start()
370
370
371
371
372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
372 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
373 """Launch a single engine as a regular externall process."""
373 """Launch a single engine as a regular externall process."""
374
374
375 def find_args(self):
375 def find_args(self):
376 return self.engine_cmd + self.cluster_args + self.engine_args
376 return self.engine_cmd + self.cluster_args + self.engine_args
377
377
378
378
379 class LocalEngineSetLauncher(LocalEngineLauncher):
379 class LocalEngineSetLauncher(LocalEngineLauncher):
380 """Launch a set of engines as regular external processes."""
380 """Launch a set of engines as regular external processes."""
381
381
382 delay = CFloat(0.1, config=True,
382 delay = CFloat(0.1, config=True,
383 help="""delay (in seconds) between starting each engine after the first.
383 help="""delay (in seconds) between starting each engine after the first.
384 This can help force the engines to get their ids in order, or limit
384 This can help force the engines to get their ids in order, or limit
385 process flood when starting many engines."""
385 process flood when starting many engines."""
386 )
386 )
387
387
388 # launcher class
388 # launcher class
389 launcher_class = LocalEngineLauncher
389 launcher_class = LocalEngineLauncher
390
390
391 launchers = Dict()
391 launchers = Dict()
392 stop_data = Dict()
392 stop_data = Dict()
393
393
394 def __init__(self, work_dir=u'.', config=None, **kwargs):
394 def __init__(self, work_dir=u'.', config=None, **kwargs):
395 super(LocalEngineSetLauncher, self).__init__(
395 super(LocalEngineSetLauncher, self).__init__(
396 work_dir=work_dir, config=config, **kwargs
396 work_dir=work_dir, config=config, **kwargs
397 )
397 )
398 self.stop_data = {}
398 self.stop_data = {}
399
399
400 def start(self, n):
400 def start(self, n):
401 """Start n engines by profile or profile_dir."""
401 """Start n engines by profile or profile_dir."""
402 dlist = []
402 dlist = []
403 for i in range(n):
403 for i in range(n):
404 if i > 0:
404 if i > 0:
405 time.sleep(self.delay)
405 time.sleep(self.delay)
406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
406 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
407 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
408 )
408 )
409
409
410 # Copy the engine args over to each engine launcher.
410 # Copy the engine args over to each engine launcher.
411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
411 el.engine_cmd = copy.deepcopy(self.engine_cmd)
412 el.engine_args = copy.deepcopy(self.engine_args)
412 el.engine_args = copy.deepcopy(self.engine_args)
413 el.on_stop(self._notice_engine_stopped)
413 el.on_stop(self._notice_engine_stopped)
414 d = el.start()
414 d = el.start()
415 self.launchers[i] = el
415 self.launchers[i] = el
416 dlist.append(d)
416 dlist.append(d)
417 self.notify_start(dlist)
417 self.notify_start(dlist)
418 return dlist
418 return dlist
419
419
420 def find_args(self):
420 def find_args(self):
421 return ['engine set']
421 return ['engine set']
422
422
423 def signal(self, sig):
423 def signal(self, sig):
424 dlist = []
424 dlist = []
425 for el in self.launchers.itervalues():
425 for el in self.launchers.itervalues():
426 d = el.signal(sig)
426 d = el.signal(sig)
427 dlist.append(d)
427 dlist.append(d)
428 return dlist
428 return dlist
429
429
430 def interrupt_then_kill(self, delay=1.0):
430 def interrupt_then_kill(self, delay=1.0):
431 dlist = []
431 dlist = []
432 for el in self.launchers.itervalues():
432 for el in self.launchers.itervalues():
433 d = el.interrupt_then_kill(delay)
433 d = el.interrupt_then_kill(delay)
434 dlist.append(d)
434 dlist.append(d)
435 return dlist
435 return dlist
436
436
437 def stop(self):
437 def stop(self):
438 return self.interrupt_then_kill()
438 return self.interrupt_then_kill()
439
439
440 def _notice_engine_stopped(self, data):
440 def _notice_engine_stopped(self, data):
441 pid = data['pid']
441 pid = data['pid']
442 for idx,el in self.launchers.iteritems():
442 for idx,el in self.launchers.iteritems():
443 if el.process.pid == pid:
443 if el.process.pid == pid:
444 break
444 break
445 self.launchers.pop(idx)
445 self.launchers.pop(idx)
446 self.stop_data[idx] = data
446 self.stop_data[idx] = data
447 if not self.launchers:
447 if not self.launchers:
448 self.notify_stop(self.stop_data)
448 self.notify_stop(self.stop_data)
449
449
450
450
451 #-----------------------------------------------------------------------------
451 #-----------------------------------------------------------------------------
452 # MPI launchers
452 # MPI launchers
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454
454
455
455
456 class MPILauncher(LocalProcessLauncher):
456 class MPILauncher(LocalProcessLauncher):
457 """Launch an external process using mpiexec."""
457 """Launch an external process using mpiexec."""
458
458
459 mpi_cmd = List(['mpiexec'], config=True,
459 mpi_cmd = List(['mpiexec'], config=True,
460 help="The mpiexec command to use in starting the process."
460 help="The mpiexec command to use in starting the process."
461 )
461 )
462 mpi_args = List([], config=True,
462 mpi_args = List([], config=True,
463 help="The command line arguments to pass to mpiexec."
463 help="The command line arguments to pass to mpiexec."
464 )
464 )
465 program = List(['date'],
465 program = List(['date'],
466 help="The program to start via mpiexec.")
466 help="The program to start via mpiexec.")
467 program_args = List([],
467 program_args = List([],
468 help="The command line argument to the program."
468 help="The command line argument to the program."
469 )
469 )
470 n = Integer(1)
470 n = Integer(1)
471
471
472 def __init__(self, *args, **kwargs):
472 def __init__(self, *args, **kwargs):
473 # deprecation for old MPIExec names:
473 # deprecation for old MPIExec names:
474 config = kwargs.get('config', {})
474 config = kwargs.get('config', {})
475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
475 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
476 deprecated = config.get(oldname)
476 deprecated = config.get(oldname)
477 if deprecated:
477 if deprecated:
478 newname = oldname.replace('MPIExec', 'MPI')
478 newname = oldname.replace('MPIExec', 'MPI')
479 config[newname].update(deprecated)
479 config[newname].update(deprecated)
480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
480 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
481
481
482 super(MPILauncher, self).__init__(*args, **kwargs)
482 super(MPILauncher, self).__init__(*args, **kwargs)
483
483
484 def find_args(self):
484 def find_args(self):
485 """Build self.args using all the fields."""
485 """Build self.args using all the fields."""
486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
486 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
487 self.program + self.program_args
487 self.program + self.program_args
488
488
489 def start(self, n):
489 def start(self, n):
490 """Start n instances of the program using mpiexec."""
490 """Start n instances of the program using mpiexec."""
491 self.n = n
491 self.n = n
492 return super(MPILauncher, self).start()
492 return super(MPILauncher, self).start()
493
493
494
494
495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
495 class MPIControllerLauncher(MPILauncher, ControllerMixin):
496 """Launch a controller using mpiexec."""
496 """Launch a controller using mpiexec."""
497
497
498 # alias back to *non-configurable* program[_args] for use in find_args()
498 # alias back to *non-configurable* program[_args] for use in find_args()
499 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # than *some* having `program_args` and others `controller_args`
500 # than *some* having `program_args` and others `controller_args`
501 @property
501 @property
502 def program(self):
502 def program(self):
503 return self.controller_cmd
503 return self.controller_cmd
504
504
505 @property
505 @property
506 def program_args(self):
506 def program_args(self):
507 return self.cluster_args + self.controller_args
507 return self.cluster_args + self.controller_args
508
508
509 def start(self):
509 def start(self):
510 """Start the controller by profile_dir."""
510 """Start the controller by profile_dir."""
511 return super(MPIControllerLauncher, self).start(1)
511 return super(MPIControllerLauncher, self).start(1)
512
512
513
513
514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
514 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
515 """Launch engines using mpiexec"""
515 """Launch engines using mpiexec"""
516
516
517 # alias back to *non-configurable* program[_args] for use in find_args()
517 # alias back to *non-configurable* program[_args] for use in find_args()
518 # this way all Controller/EngineSetLaunchers have the same form, rather
518 # this way all Controller/EngineSetLaunchers have the same form, rather
519 # than *some* having `program_args` and others `controller_args`
519 # than *some* having `program_args` and others `controller_args`
520 @property
520 @property
521 def program(self):
521 def program(self):
522 return self.engine_cmd
522 return self.engine_cmd
523
523
524 @property
524 @property
525 def program_args(self):
525 def program_args(self):
526 return self.cluster_args + self.engine_args
526 return self.cluster_args + self.engine_args
527
527
528 def start(self, n):
528 def start(self, n):
529 """Start n engines by profile or profile_dir."""
529 """Start n engines by profile or profile_dir."""
530 self.n = n
530 self.n = n
531 return super(MPIEngineSetLauncher, self).start(n)
531 return super(MPIEngineSetLauncher, self).start(n)
532
532
533 # deprecated MPIExec names
533 # deprecated MPIExec names
534 class DeprecatedMPILauncher(object):
534 class DeprecatedMPILauncher(object):
535 def warn(self):
535 def warn(self):
536 oldname = self.__class__.__name__
536 oldname = self.__class__.__name__
537 newname = oldname.replace('MPIExec', 'MPI')
537 newname = oldname.replace('MPIExec', 'MPI')
538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
538 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
539
539
540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
540 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
541 """Deprecated, use MPILauncher"""
541 """Deprecated, use MPILauncher"""
542 def __init__(self, *args, **kwargs):
542 def __init__(self, *args, **kwargs):
543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
543 super(MPIExecLauncher, self).__init__(*args, **kwargs)
544 self.warn()
544 self.warn()
545
545
546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
546 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
547 """Deprecated, use MPIControllerLauncher"""
547 """Deprecated, use MPIControllerLauncher"""
548 def __init__(self, *args, **kwargs):
548 def __init__(self, *args, **kwargs):
549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
549 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
550 self.warn()
550 self.warn()
551
551
552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
552 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
553 """Deprecated, use MPIEngineSetLauncher"""
553 """Deprecated, use MPIEngineSetLauncher"""
554 def __init__(self, *args, **kwargs):
554 def __init__(self, *args, **kwargs):
555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
555 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
556 self.warn()
556 self.warn()
557
557
558
558
559 #-----------------------------------------------------------------------------
559 #-----------------------------------------------------------------------------
560 # SSH launchers
560 # SSH launchers
561 #-----------------------------------------------------------------------------
561 #-----------------------------------------------------------------------------
562
562
563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
563 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
564
564
565 class SSHLauncher(LocalProcessLauncher):
565 class SSHLauncher(LocalProcessLauncher):
566 """A minimal launcher for ssh.
566 """A minimal launcher for ssh.
567
567
568 To be useful this will probably have to be extended to use the ``sshx``
568 To be useful this will probably have to be extended to use the ``sshx``
569 idea for environment variables. There could be other things this needs
569 idea for environment variables. There could be other things this needs
570 as well.
570 as well.
571 """
571 """
572
572
573 ssh_cmd = List(['ssh'], config=True,
573 ssh_cmd = List(['ssh'], config=True,
574 help="command for starting ssh")
574 help="command for starting ssh")
575 ssh_args = List(['-tt'], config=True,
575 ssh_args = List(['-tt'], config=True,
576 help="args to pass to ssh")
576 help="args to pass to ssh")
577 scp_cmd = List(['scp'], config=True,
577 scp_cmd = List(['scp'], config=True,
578 help="command for sending files")
578 help="command for sending files")
579 program = List(['date'],
579 program = List(['date'],
580 help="Program to launch via ssh")
580 help="Program to launch via ssh")
581 program_args = List([],
581 program_args = List([],
582 help="args to pass to remote program")
582 help="args to pass to remote program")
583 hostname = Unicode('', config=True,
583 hostname = Unicode('', config=True,
584 help="hostname on which to launch the program")
584 help="hostname on which to launch the program")
585 user = Unicode('', config=True,
585 user = Unicode('', config=True,
586 help="username for ssh")
586 help="username for ssh")
587 location = Unicode('', config=True,
587 location = Unicode('', config=True,
588 help="user@hostname location for ssh in one setting")
588 help="user@hostname location for ssh in one setting")
589 to_fetch = List([], config=True,
589 to_fetch = List([], config=True,
590 help="List of (remote, local) files to fetch after starting")
590 help="List of (remote, local) files to fetch after starting")
591 to_send = List([], config=True,
591 to_send = List([], config=True,
592 help="List of (local, remote) files to send before starting")
592 help="List of (local, remote) files to send before starting")
593
593
594 def _hostname_changed(self, name, old, new):
594 def _hostname_changed(self, name, old, new):
595 if self.user:
595 if self.user:
596 self.location = u'%s@%s' % (self.user, new)
596 self.location = u'%s@%s' % (self.user, new)
597 else:
597 else:
598 self.location = new
598 self.location = new
599
599
600 def _user_changed(self, name, old, new):
600 def _user_changed(self, name, old, new):
601 self.location = u'%s@%s' % (new, self.hostname)
601 self.location = u'%s@%s' % (new, self.hostname)
602
602
603 def find_args(self):
603 def find_args(self):
604 return self.ssh_cmd + self.ssh_args + [self.location] + \
604 return self.ssh_cmd + self.ssh_args + [self.location] + \
605 list(map(pipes.quote, self.program + self.program_args))
605 list(map(pipes.quote, self.program + self.program_args))
606
606
607 def _send_file(self, local, remote):
607 def _send_file(self, local, remote):
608 """send a single file"""
608 """send a single file"""
609 remote = "%s:%s" % (self.location, remote)
609 remote = "%s:%s" % (self.location, remote)
610 for i in range(10):
610 for i in range(10):
611 if not os.path.exists(local):
611 if not os.path.exists(local):
612 self.log.debug("waiting for %s" % local)
612 self.log.debug("waiting for %s" % local)
613 time.sleep(1)
613 time.sleep(1)
614 else:
614 else:
615 break
615 break
616 self.log.info("sending %s to %s", local, remote)
616 self.log.info("sending %s to %s", local, remote)
617 check_output(self.scp_cmd + [local, remote])
617 check_output(self.scp_cmd + [local, remote])
618
618
619 def send_files(self):
619 def send_files(self):
620 """send our files (called before start)"""
620 """send our files (called before start)"""
621 if not self.to_send:
621 if not self.to_send:
622 return
622 return
623 for local_file, remote_file in self.to_send:
623 for local_file, remote_file in self.to_send:
624 self._send_file(local_file, remote_file)
624 self._send_file(local_file, remote_file)
625
625
626 def _fetch_file(self, remote, local):
626 def _fetch_file(self, remote, local):
627 """fetch a single file"""
627 """fetch a single file"""
628 full_remote = "%s:%s" % (self.location, remote)
628 full_remote = "%s:%s" % (self.location, remote)
629 self.log.info("fetching %s from %s", local, full_remote)
629 self.log.info("fetching %s from %s", local, full_remote)
630 for i in range(10):
630 for i in range(10):
631 # wait up to 10s for remote file to exist
631 # wait up to 10s for remote file to exist
632 check = check_output(self.ssh_cmd + self.ssh_args + \
632 check = check_output(self.ssh_cmd + self.ssh_args + \
633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
633 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
634 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
635 if check == u'no':
635 if check == u'no':
636 time.sleep(1)
636 time.sleep(1)
637 elif check == u'yes':
637 elif check == u'yes':
638 break
638 break
639 check_output(self.scp_cmd + [full_remote, local])
639 check_output(self.scp_cmd + [full_remote, local])
640
640
641 def fetch_files(self):
641 def fetch_files(self):
642 """fetch remote files (called after start)"""
642 """fetch remote files (called after start)"""
643 if not self.to_fetch:
643 if not self.to_fetch:
644 return
644 return
645 for remote_file, local_file in self.to_fetch:
645 for remote_file, local_file in self.to_fetch:
646 self._fetch_file(remote_file, local_file)
646 self._fetch_file(remote_file, local_file)
647
647
648 def start(self, hostname=None, user=None):
648 def start(self, hostname=None, user=None):
649 if hostname is not None:
649 if hostname is not None:
650 self.hostname = hostname
650 self.hostname = hostname
651 if user is not None:
651 if user is not None:
652 self.user = user
652 self.user = user
653
653
654 self.send_files()
654 self.send_files()
655 super(SSHLauncher, self).start()
655 super(SSHLauncher, self).start()
656 self.fetch_files()
656 self.fetch_files()
657
657
658 def signal(self, sig):
658 def signal(self, sig):
659 if self.state == 'running':
659 if self.state == 'running':
660 # send escaped ssh connection-closer
660 # send escaped ssh connection-closer
661 self.process.stdin.write('~.')
661 self.process.stdin.write('~.')
662 self.process.stdin.flush()
662 self.process.stdin.flush()
663
663
664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
664 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
665
665
666 remote_profile_dir = Unicode('', config=True,
666 remote_profile_dir = Unicode('', config=True,
667 help="""The remote profile_dir to use.
667 help="""The remote profile_dir to use.
668
668
669 If not specified, use calling profile, stripping out possible leading homedir.
669 If not specified, use calling profile, stripping out possible leading homedir.
670 """)
670 """)
671
671
672 def _profile_dir_changed(self, name, old, new):
672 def _profile_dir_changed(self, name, old, new):
673 if not self.remote_profile_dir:
673 if not self.remote_profile_dir:
674 # trigger remote_profile_dir_default logic again,
674 # trigger remote_profile_dir_default logic again,
675 # in case it was already triggered before profile_dir was set
675 # in case it was already triggered before profile_dir was set
676 self.remote_profile_dir = self._strip_home(new)
676 self.remote_profile_dir = self._strip_home(new)
677
677
678 @staticmethod
678 @staticmethod
679 def _strip_home(path):
679 def _strip_home(path):
680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
680 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
681 home = get_home_dir()
681 home = get_home_dir()
682 if not home.endswith('/'):
682 if not home.endswith('/'):
683 home = home+'/'
683 home = home+'/'
684
684
685 if path.startswith(home):
685 if path.startswith(home):
686 return path[len(home):]
686 return path[len(home):]
687 else:
687 else:
688 return path
688 return path
689
689
690 def _remote_profile_dir_default(self):
690 def _remote_profile_dir_default(self):
691 return self._strip_home(self.profile_dir)
691 return self._strip_home(self.profile_dir)
692
692
693 def _cluster_id_changed(self, name, old, new):
693 def _cluster_id_changed(self, name, old, new):
694 if new:
694 if new:
695 raise ValueError("cluster id not supported by SSH launchers")
695 raise ValueError("cluster id not supported by SSH launchers")
696
696
697 @property
697 @property
698 def cluster_args(self):
698 def cluster_args(self):
699 return ['--profile-dir', self.remote_profile_dir]
699 return ['--profile-dir', self.remote_profile_dir]
700
700
701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
701 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
702
702
703 # alias back to *non-configurable* program[_args] for use in find_args()
703 # alias back to *non-configurable* program[_args] for use in find_args()
704 # this way all Controller/EngineSetLaunchers have the same form, rather
704 # this way all Controller/EngineSetLaunchers have the same form, rather
705 # than *some* having `program_args` and others `controller_args`
705 # than *some* having `program_args` and others `controller_args`
706
706
707 def _controller_cmd_default(self):
707 def _controller_cmd_default(self):
708 return ['ipcontroller']
708 return ['ipcontroller']
709
709
710 @property
710 @property
711 def program(self):
711 def program(self):
712 return self.controller_cmd
712 return self.controller_cmd
713
713
714 @property
714 @property
715 def program_args(self):
715 def program_args(self):
716 return self.cluster_args + self.controller_args
716 return self.cluster_args + self.controller_args
717
717
718 def _to_fetch_default(self):
718 def _to_fetch_default(self):
719 return [
719 return [
720 (os.path.join(self.remote_profile_dir, 'security', cf),
720 (os.path.join(self.remote_profile_dir, 'security', cf),
721 os.path.join(self.profile_dir, 'security', cf),)
721 os.path.join(self.profile_dir, 'security', cf),)
722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
723 ]
723 ]
724
724
725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
725 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
726
726
727 # alias back to *non-configurable* program[_args] for use in find_args()
727 # alias back to *non-configurable* program[_args] for use in find_args()
728 # this way all Controller/EngineSetLaunchers have the same form, rather
728 # this way all Controller/EngineSetLaunchers have the same form, rather
729 # than *some* having `program_args` and others `controller_args`
729 # than *some* having `program_args` and others `controller_args`
730
730
731 def _engine_cmd_default(self):
731 def _engine_cmd_default(self):
732 return ['ipengine']
732 return ['ipengine']
733
733
734 @property
734 @property
735 def program(self):
735 def program(self):
736 return self.engine_cmd
736 return self.engine_cmd
737
737
738 @property
738 @property
739 def program_args(self):
739 def program_args(self):
740 return self.cluster_args + self.engine_args
740 return self.cluster_args + self.engine_args
741
741
742 def _to_send_default(self):
742 def _to_send_default(self):
743 return [
743 return [
744 (os.path.join(self.profile_dir, 'security', cf),
744 (os.path.join(self.profile_dir, 'security', cf),
745 os.path.join(self.remote_profile_dir, 'security', cf))
745 os.path.join(self.remote_profile_dir, 'security', cf))
746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
746 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
747 ]
747 ]
748
748
749
749
750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
750 class SSHEngineSetLauncher(LocalEngineSetLauncher):
751 launcher_class = SSHEngineLauncher
751 launcher_class = SSHEngineLauncher
752 engines = Dict(config=True,
752 engines = Dict(config=True,
753 help="""dict of engines to launch. This is a dict by hostname of ints,
753 help="""dict of engines to launch. This is a dict by hostname of ints,
754 corresponding to the number of engines to start on that host.""")
754 corresponding to the number of engines to start on that host.""")
755
755
756 def _engine_cmd_default(self):
756 def _engine_cmd_default(self):
757 return ['ipengine']
757 return ['ipengine']
758
758
759 @property
759 @property
760 def engine_count(self):
760 def engine_count(self):
761 """determine engine count from `engines` dict"""
761 """determine engine count from `engines` dict"""
762 count = 0
762 count = 0
763 for n in self.engines.itervalues():
763 for n in self.engines.itervalues():
764 if isinstance(n, (tuple,list)):
764 if isinstance(n, (tuple,list)):
765 n,args = n
765 n,args = n
766 count += n
766 count += n
767 return count
767 return count
768
768
769 def start(self, n):
769 def start(self, n):
770 """Start engines by profile or profile_dir.
770 """Start engines by profile or profile_dir.
771 `n` is ignored, and the `engines` config property is used instead.
771 `n` is ignored, and the `engines` config property is used instead.
772 """
772 """
773
773
774 dlist = []
774 dlist = []
775 for host, n in self.engines.iteritems():
775 for host, n in self.engines.iteritems():
776 if isinstance(n, (tuple, list)):
776 if isinstance(n, (tuple, list)):
777 n, args = n
777 n, args = n
778 else:
778 else:
779 args = copy.deepcopy(self.engine_args)
779 args = copy.deepcopy(self.engine_args)
780
780
781 if '@' in host:
781 if '@' in host:
782 user,host = host.split('@',1)
782 user,host = host.split('@',1)
783 else:
783 else:
784 user=None
784 user=None
785 for i in range(n):
785 for i in range(n):
786 if i > 0:
786 if i > 0:
787 time.sleep(self.delay)
787 time.sleep(self.delay)
788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
788 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
789 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
790 )
790 )
791 if i > 0:
791 if i > 0:
792 # only send files for the first engine on each host
792 # only send files for the first engine on each host
793 el.to_send = []
793 el.to_send = []
794
794
795 # Copy the engine args over to each engine launcher.
795 # Copy the engine args over to each engine launcher.
796 el.engine_cmd = self.engine_cmd
796 el.engine_cmd = self.engine_cmd
797 el.engine_args = args
797 el.engine_args = args
798 el.on_stop(self._notice_engine_stopped)
798 el.on_stop(self._notice_engine_stopped)
799 d = el.start(user=user, hostname=host)
799 d = el.start(user=user, hostname=host)
800 self.launchers[ "%s/%i" % (host,i) ] = el
800 self.launchers[ "%s/%i" % (host,i) ] = el
801 dlist.append(d)
801 dlist.append(d)
802 self.notify_start(dlist)
802 self.notify_start(dlist)
803 return dlist
803 return dlist
804
804
805
805
806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
806 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
807 """Launcher for calling
807 """Launcher for calling
808 `ipcluster engines` on a remote machine.
808 `ipcluster engines` on a remote machine.
809
809
810 Requires that remote profile is already configured.
810 Requires that remote profile is already configured.
811 """
811 """
812
812
813 n = Integer()
813 n = Integer()
814 ipcluster_cmd = List(['ipcluster'], config=True)
814 ipcluster_cmd = List(['ipcluster'], config=True)
815
815
816 @property
816 @property
817 def program(self):
817 def program(self):
818 return self.ipcluster_cmd + ['engines']
818 return self.ipcluster_cmd + ['engines']
819
819
820 @property
820 @property
821 def program_args(self):
821 def program_args(self):
822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
822 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
823
823
824 def _to_send_default(self):
824 def _to_send_default(self):
825 return [
825 return [
826 (os.path.join(self.profile_dir, 'security', cf),
826 (os.path.join(self.profile_dir, 'security', cf),
827 os.path.join(self.remote_profile_dir, 'security', cf))
827 os.path.join(self.remote_profile_dir, 'security', cf))
828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
828 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
829 ]
829 ]
830
830
831 def start(self, n):
831 def start(self, n):
832 self.n = n
832 self.n = n
833 super(SSHProxyEngineSetLauncher, self).start()
833 super(SSHProxyEngineSetLauncher, self).start()
834
834
835
835
836 #-----------------------------------------------------------------------------
836 #-----------------------------------------------------------------------------
837 # Windows HPC Server 2008 scheduler launchers
837 # Windows HPC Server 2008 scheduler launchers
838 #-----------------------------------------------------------------------------
838 #-----------------------------------------------------------------------------
839
839
840
840
841 # This is only used on Windows.
841 # This is only used on Windows.
842 def find_job_cmd():
842 def find_job_cmd():
843 if WINDOWS:
843 if WINDOWS:
844 try:
844 try:
845 return find_cmd('job')
845 return find_cmd('job')
846 except (FindCmdError, ImportError):
846 except (FindCmdError, ImportError):
847 # ImportError will be raised if win32api is not installed
847 # ImportError will be raised if win32api is not installed
848 return 'job'
848 return 'job'
849 else:
849 else:
850 return 'job'
850 return 'job'
851
851
852
852
853 class WindowsHPCLauncher(BaseLauncher):
853 class WindowsHPCLauncher(BaseLauncher):
854
854
855 job_id_regexp = CRegExp(r'\d+', config=True,
855 job_id_regexp = CRegExp(r'\d+', config=True,
856 help="""A regular expression used to get the job id from the output of the
856 help="""A regular expression used to get the job id from the output of the
857 submit_command. """
857 submit_command. """
858 )
858 )
859 job_file_name = Unicode(u'ipython_job.xml', config=True,
859 job_file_name = Unicode(u'ipython_job.xml', config=True,
860 help="The filename of the instantiated job script.")
860 help="The filename of the instantiated job script.")
861 # The full path to the instantiated job script. This gets made dynamically
861 # The full path to the instantiated job script. This gets made dynamically
862 # by combining the work_dir with the job_file_name.
862 # by combining the work_dir with the job_file_name.
863 job_file = Unicode(u'')
863 job_file = Unicode(u'')
864 scheduler = Unicode('', config=True,
864 scheduler = Unicode('', config=True,
865 help="The hostname of the scheduler to submit the job to.")
865 help="The hostname of the scheduler to submit the job to.")
866 job_cmd = Unicode(find_job_cmd(), config=True,
866 job_cmd = Unicode(find_job_cmd(), config=True,
867 help="The command for submitting jobs.")
867 help="The command for submitting jobs.")
868
868
869 def __init__(self, work_dir=u'.', config=None, **kwargs):
869 def __init__(self, work_dir=u'.', config=None, **kwargs):
870 super(WindowsHPCLauncher, self).__init__(
870 super(WindowsHPCLauncher, self).__init__(
871 work_dir=work_dir, config=config, **kwargs
871 work_dir=work_dir, config=config, **kwargs
872 )
872 )
873
873
874 @property
874 @property
875 def job_file(self):
875 def job_file(self):
876 return os.path.join(self.work_dir, self.job_file_name)
876 return os.path.join(self.work_dir, self.job_file_name)
877
877
878 def write_job_file(self, n):
878 def write_job_file(self, n):
879 raise NotImplementedError("Implement write_job_file in a subclass.")
879 raise NotImplementedError("Implement write_job_file in a subclass.")
880
880
881 def find_args(self):
881 def find_args(self):
882 return [u'job.exe']
882 return [u'job.exe']
883
883
884 def parse_job_id(self, output):
884 def parse_job_id(self, output):
885 """Take the output of the submit command and return the job id."""
885 """Take the output of the submit command and return the job id."""
886 m = self.job_id_regexp.search(output)
886 m = self.job_id_regexp.search(output)
887 if m is not None:
887 if m is not None:
888 job_id = m.group()
888 job_id = m.group()
889 else:
889 else:
890 raise LauncherError("Job id couldn't be determined: %s" % output)
890 raise LauncherError("Job id couldn't be determined: %s" % output)
891 self.job_id = job_id
891 self.job_id = job_id
892 self.log.info('Job started with id: %r', job_id)
892 self.log.info('Job started with id: %r', job_id)
893 return job_id
893 return job_id
894
894
895 def start(self, n):
895 def start(self, n):
896 """Start n copies of the process using the Win HPC job scheduler."""
896 """Start n copies of the process using the Win HPC job scheduler."""
897 self.write_job_file(n)
897 self.write_job_file(n)
898 args = [
898 args = [
899 'submit',
899 'submit',
900 '/jobfile:%s' % self.job_file,
900 '/jobfile:%s' % self.job_file,
901 '/scheduler:%s' % self.scheduler
901 '/scheduler:%s' % self.scheduler
902 ]
902 ]
903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
903 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904
904
905 output = check_output([self.job_cmd]+args,
905 output = check_output([self.job_cmd]+args,
906 env=os.environ,
906 env=os.environ,
907 cwd=self.work_dir,
907 cwd=self.work_dir,
908 stderr=STDOUT
908 stderr=STDOUT
909 )
909 )
910 output = output.decode(DEFAULT_ENCODING, 'replace')
910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 job_id = self.parse_job_id(output)
911 job_id = self.parse_job_id(output)
912 self.notify_start(job_id)
912 self.notify_start(job_id)
913 return job_id
913 return job_id
914
914
915 def stop(self):
915 def stop(self):
916 args = [
916 args = [
917 'cancel',
917 'cancel',
918 self.job_id,
918 self.job_id,
919 '/scheduler:%s' % self.scheduler
919 '/scheduler:%s' % self.scheduler
920 ]
920 ]
921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
921 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
922 try:
922 try:
923 output = check_output([self.job_cmd]+args,
923 output = check_output([self.job_cmd]+args,
924 env=os.environ,
924 env=os.environ,
925 cwd=self.work_dir,
925 cwd=self.work_dir,
926 stderr=STDOUT
926 stderr=STDOUT
927 )
927 )
928 output = output.decode(DEFAULT_ENCODING, 'replace')
928 output = output.decode(DEFAULT_ENCODING, 'replace')
929 except:
929 except:
930 output = u'The job already appears to be stopped: %r' % self.job_id
930 output = u'The job already appears to be stopped: %r' % self.job_id
931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
931 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
932 return output
932 return output
933
933
934
934
935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
935 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
936
936
937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
937 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
938 help="WinHPC xml job file.")
938 help="WinHPC xml job file.")
939 controller_args = List([], config=False,
939 controller_args = List([], config=False,
940 help="extra args to pass to ipcontroller")
940 help="extra args to pass to ipcontroller")
941
941
942 def write_job_file(self, n):
942 def write_job_file(self, n):
943 job = IPControllerJob(config=self.config)
943 job = IPControllerJob(config=self.config)
944
944
945 t = IPControllerTask(config=self.config)
945 t = IPControllerTask(config=self.config)
946 # The tasks work directory is *not* the actual work directory of
946 # The tasks work directory is *not* the actual work directory of
947 # the controller. It is used as the base path for the stdout/stderr
947 # the controller. It is used as the base path for the stdout/stderr
948 # files that the scheduler redirects to.
948 # files that the scheduler redirects to.
949 t.work_directory = self.profile_dir
949 t.work_directory = self.profile_dir
950 # Add the profile_dir and from self.start().
950 # Add the profile_dir and from self.start().
951 t.controller_args.extend(self.cluster_args)
951 t.controller_args.extend(self.cluster_args)
952 t.controller_args.extend(self.controller_args)
952 t.controller_args.extend(self.controller_args)
953 job.add_task(t)
953 job.add_task(t)
954
954
955 self.log.debug("Writing job description file: %s", self.job_file)
955 self.log.debug("Writing job description file: %s", self.job_file)
956 job.write(self.job_file)
956 job.write(self.job_file)
957
957
958 @property
958 @property
959 def job_file(self):
959 def job_file(self):
960 return os.path.join(self.profile_dir, self.job_file_name)
960 return os.path.join(self.profile_dir, self.job_file_name)
961
961
962 def start(self):
962 def start(self):
963 """Start the controller by profile_dir."""
963 """Start the controller by profile_dir."""
964 return super(WindowsHPCControllerLauncher, self).start(1)
964 return super(WindowsHPCControllerLauncher, self).start(1)
965
965
966
966
967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
967 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
968
968
969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
969 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
970 help="jobfile for ipengines job")
970 help="jobfile for ipengines job")
971 engine_args = List([], config=False,
971 engine_args = List([], config=False,
972 help="extra args to pas to ipengine")
972 help="extra args to pas to ipengine")
973
973
974 def write_job_file(self, n):
974 def write_job_file(self, n):
975 job = IPEngineSetJob(config=self.config)
975 job = IPEngineSetJob(config=self.config)
976
976
977 for i in range(n):
977 for i in range(n):
978 t = IPEngineTask(config=self.config)
978 t = IPEngineTask(config=self.config)
979 # The tasks work directory is *not* the actual work directory of
979 # The tasks work directory is *not* the actual work directory of
980 # the engine. It is used as the base path for the stdout/stderr
980 # the engine. It is used as the base path for the stdout/stderr
981 # files that the scheduler redirects to.
981 # files that the scheduler redirects to.
982 t.work_directory = self.profile_dir
982 t.work_directory = self.profile_dir
983 # Add the profile_dir and from self.start().
983 # Add the profile_dir and from self.start().
984 t.engine_args.extend(self.cluster_args)
984 t.engine_args.extend(self.cluster_args)
985 t.engine_args.extend(self.engine_args)
985 t.engine_args.extend(self.engine_args)
986 job.add_task(t)
986 job.add_task(t)
987
987
988 self.log.debug("Writing job description file: %s", self.job_file)
988 self.log.debug("Writing job description file: %s", self.job_file)
989 job.write(self.job_file)
989 job.write(self.job_file)
990
990
991 @property
991 @property
992 def job_file(self):
992 def job_file(self):
993 return os.path.join(self.profile_dir, self.job_file_name)
993 return os.path.join(self.profile_dir, self.job_file_name)
994
994
995 def start(self, n):
995 def start(self, n):
996 """Start the controller by profile_dir."""
996 """Start the controller by profile_dir."""
997 return super(WindowsHPCEngineSetLauncher, self).start(n)
997 return super(WindowsHPCEngineSetLauncher, self).start(n)
998
998
999
999
1000 #-----------------------------------------------------------------------------
1000 #-----------------------------------------------------------------------------
1001 # Batch (PBS) system launchers
1001 # Batch (PBS) system launchers
1002 #-----------------------------------------------------------------------------
1002 #-----------------------------------------------------------------------------
1003
1003
1004 class BatchClusterAppMixin(ClusterAppMixin):
1004 class BatchClusterAppMixin(ClusterAppMixin):
1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1005 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1006 def _profile_dir_changed(self, name, old, new):
1006 def _profile_dir_changed(self, name, old, new):
1007 self.context[name] = new
1007 self.context[name] = new
1008 _cluster_id_changed = _profile_dir_changed
1008 _cluster_id_changed = _profile_dir_changed
1009
1009
1010 def _profile_dir_default(self):
1010 def _profile_dir_default(self):
1011 self.context['profile_dir'] = ''
1011 self.context['profile_dir'] = ''
1012 return ''
1012 return ''
1013 def _cluster_id_default(self):
1013 def _cluster_id_default(self):
1014 self.context['cluster_id'] = ''
1014 self.context['cluster_id'] = ''
1015 return ''
1015 return ''
1016
1016
1017
1017
1018 class BatchSystemLauncher(BaseLauncher):
1018 class BatchSystemLauncher(BaseLauncher):
1019 """Launch an external process using a batch system.
1019 """Launch an external process using a batch system.
1020
1020
1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1021 This class is designed to work with UNIX batch systems like PBS, LSF,
1022 GridEngine, etc. The overall model is that there are different commands
1022 GridEngine, etc. The overall model is that there are different commands
1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1023 like qsub, qdel, etc. that handle the starting and stopping of the process.
1024
1024
1025 This class also has the notion of a batch script. The ``batch_template``
1025 This class also has the notion of a batch script. The ``batch_template``
1026 attribute can be set to a string that is a template for the batch script.
1026 attribute can be set to a string that is a template for the batch script.
1027 This template is instantiated using string formatting. Thus the template can
1027 This template is instantiated using string formatting. Thus the template can
1028 use {n} fot the number of instances. Subclasses can add additional variables
1028 use {n} fot the number of instances. Subclasses can add additional variables
1029 to the template dict.
1029 to the template dict.
1030 """
1030 """
1031
1031
1032 # Subclasses must fill these in. See PBSEngineSet
1032 # Subclasses must fill these in. See PBSEngineSet
1033 submit_command = List([''], config=True,
1033 submit_command = List([''], config=True,
1034 help="The name of the command line program used to submit jobs.")
1034 help="The name of the command line program used to submit jobs.")
1035 delete_command = List([''], config=True,
1035 delete_command = List([''], config=True,
1036 help="The name of the command line program used to delete jobs.")
1036 help="The name of the command line program used to delete jobs.")
1037 job_id_regexp = CRegExp('', config=True,
1037 job_id_regexp = CRegExp('', config=True,
1038 help="""A regular expression used to get the job id from the output of the
1038 help="""A regular expression used to get the job id from the output of the
1039 submit_command.""")
1039 submit_command.""")
1040 job_id_regexp_group = Integer(0, config=True,
1040 job_id_regexp_group = Integer(0, config=True,
1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1041 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1042 batch_template = Unicode('', config=True,
1042 batch_template = Unicode('', config=True,
1043 help="The string that is the batch script template itself.")
1043 help="The string that is the batch script template itself.")
1044 batch_template_file = Unicode(u'', config=True,
1044 batch_template_file = Unicode(u'', config=True,
1045 help="The file that contains the batch template.")
1045 help="The file that contains the batch template.")
1046 batch_file_name = Unicode(u'batch_script', config=True,
1046 batch_file_name = Unicode(u'batch_script', config=True,
1047 help="The filename of the instantiated batch script.")
1047 help="The filename of the instantiated batch script.")
1048 queue = Unicode(u'', config=True,
1048 queue = Unicode(u'', config=True,
1049 help="The PBS Queue.")
1049 help="The PBS Queue.")
1050
1050
1051 def _queue_changed(self, name, old, new):
1051 def _queue_changed(self, name, old, new):
1052 self.context[name] = new
1052 self.context[name] = new
1053
1053
1054 n = Integer(1)
1054 n = Integer(1)
1055 _n_changed = _queue_changed
1055 _n_changed = _queue_changed
1056
1056
1057 # not configurable, override in subclasses
1057 # not configurable, override in subclasses
1058 # PBS Job Array regex
1058 # PBS Job Array regex
1059 job_array_regexp = CRegExp('')
1059 job_array_regexp = CRegExp('')
1060 job_array_template = Unicode('')
1060 job_array_template = Unicode('')
1061 # PBS Queue regex
1061 # PBS Queue regex
1062 queue_regexp = CRegExp('')
1062 queue_regexp = CRegExp('')
1063 queue_template = Unicode('')
1063 queue_template = Unicode('')
1064 # The default batch template, override in subclasses
1064 # The default batch template, override in subclasses
1065 default_template = Unicode('')
1065 default_template = Unicode('')
1066 # The full path to the instantiated batch script.
1066 # The full path to the instantiated batch script.
1067 batch_file = Unicode(u'')
1067 batch_file = Unicode(u'')
1068 # the format dict used with batch_template:
1068 # the format dict used with batch_template:
1069 context = Dict()
1069 context = Dict()
1070
1070
1071 def _context_default(self):
1071 def _context_default(self):
1072 """load the default context with the default values for the basic keys
1072 """load the default context with the default values for the basic keys
1073
1073
1074 because the _trait_changed methods only load the context if they
1074 because the _trait_changed methods only load the context if they
1075 are set to something other than the default value.
1075 are set to something other than the default value.
1076 """
1076 """
1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1077 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1078
1078
1079 # the Formatter instance for rendering the templates:
1079 # the Formatter instance for rendering the templates:
1080 formatter = Instance(EvalFormatter, (), {})
1080 formatter = Instance(EvalFormatter, (), {})
1081
1081
1082 def find_args(self):
1082 def find_args(self):
1083 return self.submit_command + [self.batch_file]
1083 return self.submit_command + [self.batch_file]
1084
1084
1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1085 def __init__(self, work_dir=u'.', config=None, **kwargs):
1086 super(BatchSystemLauncher, self).__init__(
1086 super(BatchSystemLauncher, self).__init__(
1087 work_dir=work_dir, config=config, **kwargs
1087 work_dir=work_dir, config=config, **kwargs
1088 )
1088 )
1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1089 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1090
1090
1091 def parse_job_id(self, output):
1091 def parse_job_id(self, output):
1092 """Take the output of the submit command and return the job id."""
1092 """Take the output of the submit command and return the job id."""
1093 m = self.job_id_regexp.search(output)
1093 m = self.job_id_regexp.search(output)
1094 if m is not None:
1094 if m is not None:
1095 job_id = m.group(self.job_id_regexp_group)
1095 job_id = m.group(self.job_id_regexp_group)
1096 else:
1096 else:
1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1097 raise LauncherError("Job id couldn't be determined: %s" % output)
1098 self.job_id = job_id
1098 self.job_id = job_id
1099 self.log.info('Job submitted with job id: %r', job_id)
1099 self.log.info('Job submitted with job id: %r', job_id)
1100 return job_id
1100 return job_id
1101
1101
1102 def write_batch_script(self, n):
1102 def write_batch_script(self, n):
1103 """Instantiate and write the batch script to the work_dir."""
1103 """Instantiate and write the batch script to the work_dir."""
1104 self.n = n
1104 self.n = n
1105 # first priority is batch_template if set
1105 # first priority is batch_template if set
1106 if self.batch_template_file and not self.batch_template:
1106 if self.batch_template_file and not self.batch_template:
1107 # second priority is batch_template_file
1107 # second priority is batch_template_file
1108 with open(self.batch_template_file) as f:
1108 with open(self.batch_template_file) as f:
1109 self.batch_template = f.read()
1109 self.batch_template = f.read()
1110 if not self.batch_template:
1110 if not self.batch_template:
1111 # third (last) priority is default_template
1111 # third (last) priority is default_template
1112 self.batch_template = self.default_template
1112 self.batch_template = self.default_template
1113 # add jobarray or queue lines to user-specified template
1113 # add jobarray or queue lines to user-specified template
1114 # note that this is *only* when user did not specify a template.
1114 # note that this is *only* when user did not specify a template.
1115 self._insert_queue_in_script()
1115 self._insert_queue_in_script()
1116 self._insert_job_array_in_script()
1116 self._insert_job_array_in_script()
1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1117 script_as_string = self.formatter.format(self.batch_template, **self.context)
1118 self.log.debug('Writing batch script: %s', self.batch_file)
1118 self.log.debug('Writing batch script: %s', self.batch_file)
1119 with open(self.batch_file, 'w') as f:
1119 with open(self.batch_file, 'w') as f:
1120 f.write(script_as_string)
1120 f.write(script_as_string)
1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1121 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1122
1122
1123 def _insert_queue_in_script(self):
1123 def _insert_queue_in_script(self):
1124 """Inserts a queue if required into the batch script.
1124 """Inserts a queue if required into the batch script.
1125 """
1125 """
1126 if self.queue and not self.queue_regexp.search(self.batch_template):
1126 if self.queue and not self.queue_regexp.search(self.batch_template):
1127 self.log.debug("adding PBS queue settings to batch script")
1127 self.log.debug("adding PBS queue settings to batch script")
1128 firstline, rest = self.batch_template.split('\n',1)
1128 firstline, rest = self.batch_template.split('\n',1)
1129 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1129 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1130
1130
1131 def _insert_job_array_in_script(self):
1131 def _insert_job_array_in_script(self):
1132 """Inserts a job array if required into the batch script.
1132 """Inserts a job array if required into the batch script.
1133 """
1133 """
1134 if not self.job_array_regexp.search(self.batch_template):
1134 if not self.job_array_regexp.search(self.batch_template):
1135 self.log.debug("adding job array settings to batch script")
1135 self.log.debug("adding job array settings to batch script")
1136 firstline, rest = self.batch_template.split('\n',1)
1136 firstline, rest = self.batch_template.split('\n',1)
1137 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1137 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1138
1138
1139 def start(self, n):
1139 def start(self, n):
1140 """Start n copies of the process using a batch system."""
1140 """Start n copies of the process using a batch system."""
1141 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1141 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1142 # Here we save profile_dir in the context so they
1142 # Here we save profile_dir in the context so they
1143 # can be used in the batch script template as {profile_dir}
1143 # can be used in the batch script template as {profile_dir}
1144 self.write_batch_script(n)
1144 self.write_batch_script(n)
1145 output = check_output(self.args, env=os.environ)
1145 output = check_output(self.args, env=os.environ)
1146 output = output.decode(DEFAULT_ENCODING, 'replace')
1146 output = output.decode(DEFAULT_ENCODING, 'replace')
1147
1147
1148 job_id = self.parse_job_id(output)
1148 job_id = self.parse_job_id(output)
1149 self.notify_start(job_id)
1149 self.notify_start(job_id)
1150 return job_id
1150 return job_id
1151
1151
1152 def stop(self):
1152 def stop(self):
1153 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1153 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1154 output = output.decode(DEFAULT_ENCODING, 'replace')
1154 output = output.decode(DEFAULT_ENCODING, 'replace')
1155 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1155 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1156 return output
1156 return output
1157
1157
1158
1158
1159 class PBSLauncher(BatchSystemLauncher):
1159 class PBSLauncher(BatchSystemLauncher):
1160 """A BatchSystemLauncher subclass for PBS."""
1160 """A BatchSystemLauncher subclass for PBS."""
1161
1161
1162 submit_command = List(['qsub'], config=True,
1162 submit_command = List(['qsub'], config=True,
1163 help="The PBS submit command ['qsub']")
1163 help="The PBS submit command ['qsub']")
1164 delete_command = List(['qdel'], config=True,
1164 delete_command = List(['qdel'], config=True,
1165 help="The PBS delete command ['qsub']")
1165 help="The PBS delete command ['qsub']")
1166 job_id_regexp = CRegExp(r'\d+', config=True,
1166 job_id_regexp = CRegExp(r'\d+', config=True,
1167 help="Regular expresion for identifying the job ID [r'\d+']")
1167 help="Regular expresion for identifying the job ID [r'\d+']")
1168
1168
1169 batch_file = Unicode(u'')
1169 batch_file = Unicode(u'')
1170 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1170 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1171 job_array_template = Unicode('#PBS -t 1-{n}')
1171 job_array_template = Unicode('#PBS -t 1-{n}')
1172 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1172 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1173 queue_template = Unicode('#PBS -q {queue}')
1173 queue_template = Unicode('#PBS -q {queue}')
1174
1174
1175
1175
1176 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1176 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1177 """Launch a controller using PBS."""
1177 """Launch a controller using PBS."""
1178
1178
1179 batch_file_name = Unicode(u'pbs_controller', config=True,
1179 batch_file_name = Unicode(u'pbs_controller', config=True,
1180 help="batch file name for the controller job.")
1180 help="batch file name for the controller job.")
1181 default_template= Unicode("""#!/bin/sh
1181 default_template= Unicode("""#!/bin/sh
1182 #PBS -V
1182 #PBS -V
1183 #PBS -N ipcontroller
1183 #PBS -N ipcontroller
1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1186
1186
1187
1188 def start(self):
1187 def start(self):
1189 """Start the controller by profile or profile_dir."""
1188 """Start the controller by profile or profile_dir."""
1190 return super(PBSControllerLauncher, self).start(1)
1189 return super(PBSControllerLauncher, self).start(1)
1191
1190
1192
1191
1193 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1192 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1194 """Launch Engines using PBS"""
1193 """Launch Engines using PBS"""
1195 batch_file_name = Unicode(u'pbs_engines', config=True,
1194 batch_file_name = Unicode(u'pbs_engines', config=True,
1196 help="batch file name for the engine(s) job.")
1195 help="batch file name for the engine(s) job.")
1197 default_template= Unicode(u"""#!/bin/sh
1196 default_template= Unicode(u"""#!/bin/sh
1198 #PBS -V
1197 #PBS -V
1199 #PBS -N ipengine
1198 #PBS -N ipengine
1200 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1199 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1201 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1200 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1202
1201
1203 def start(self, n):
1204 """Start n engines by profile or profile_dir."""
1205 return super(PBSEngineSetLauncher, self).start(n)
1206
1207
1202
1208 #SGE is very similar to PBS
1203 #SGE is very similar to PBS
1209
1204
1210 class SGELauncher(PBSLauncher):
1205 class SGELauncher(PBSLauncher):
1211 """Sun GridEngine is a PBS clone with slightly different syntax"""
1206 """Sun GridEngine is a PBS clone with slightly different syntax"""
1212 job_array_regexp = CRegExp('#\$\W+\-t')
1207 job_array_regexp = CRegExp('#\$\W+\-t')
1213 job_array_template = Unicode('#$ -t 1-{n}')
1208 job_array_template = Unicode('#$ -t 1-{n}')
1214 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1209 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1215 queue_template = Unicode('#$ -q {queue}')
1210 queue_template = Unicode('#$ -q {queue}')
1216
1211
1217
1212
1218 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1213 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1219 """Launch a controller using SGE."""
1214 """Launch a controller using SGE."""
1220
1215
1221 batch_file_name = Unicode(u'sge_controller', config=True,
1216 batch_file_name = Unicode(u'sge_controller', config=True,
1222 help="batch file name for the ipontroller job.")
1217 help="batch file name for the ipontroller job.")
1223 default_template= Unicode(u"""#$ -V
1218 default_template= Unicode(u"""#$ -V
1224 #$ -S /bin/sh
1219 #$ -S /bin/sh
1225 #$ -N ipcontroller
1220 #$ -N ipcontroller
1226 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1221 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1227 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1222 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1228
1223
1229 def start(self):
1224 def start(self):
1230 """Start the controller by profile or profile_dir."""
1225 """Start the controller by profile or profile_dir."""
1231 return super(SGEControllerLauncher, self).start(1)
1226 return super(SGEControllerLauncher, self).start(1)
1232
1227
1233
1228
1234 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1229 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1235 """Launch Engines with SGE"""
1230 """Launch Engines with SGE"""
1236 batch_file_name = Unicode(u'sge_engines', config=True,
1231 batch_file_name = Unicode(u'sge_engines', config=True,
1237 help="batch file name for the engine(s) job.")
1232 help="batch file name for the engine(s) job.")
1238 default_template = Unicode("""#$ -V
1233 default_template = Unicode("""#$ -V
1239 #$ -S /bin/sh
1234 #$ -S /bin/sh
1240 #$ -N ipengine
1235 #$ -N ipengine
1241 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1236 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1242 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1237 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1243
1238
1244 def start(self, n):
1245 """Start n engines by profile or profile_dir."""
1246 return super(SGEEngineSetLauncher, self).start(n)
1247
1248
1239
1249 # LSF launchers
1240 # LSF launchers
1250
1241
1251 class LSFLauncher(BatchSystemLauncher):
1242 class LSFLauncher(BatchSystemLauncher):
1252 """A BatchSystemLauncher subclass for LSF."""
1243 """A BatchSystemLauncher subclass for LSF."""
1253
1244
1254 submit_command = List(['bsub'], config=True,
1245 submit_command = List(['bsub'], config=True,
1255 help="The PBS submit command ['bsub']")
1246 help="The PBS submit command ['bsub']")
1256 delete_command = List(['bkill'], config=True,
1247 delete_command = List(['bkill'], config=True,
1257 help="The PBS delete command ['bkill']")
1248 help="The PBS delete command ['bkill']")
1258 job_id_regexp = CRegExp(r'\d+', config=True,
1249 job_id_regexp = CRegExp(r'\d+', config=True,
1259 help="Regular expresion for identifying the job ID [r'\d+']")
1250 help="Regular expresion for identifying the job ID [r'\d+']")
1260
1251
1261 batch_file = Unicode(u'')
1252 batch_file = Unicode(u'')
1262 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1253 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1263 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1254 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1264 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1255 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1265 queue_template = Unicode('#BSUB -q {queue}')
1256 queue_template = Unicode('#BSUB -q {queue}')
1266
1257
1267 def start(self, n):
1258 def start(self, n):
1268 """Start n copies of the process using LSF batch system.
1259 """Start n copies of the process using LSF batch system.
1269 This cant inherit from the base class because bsub expects
1260 This cant inherit from the base class because bsub expects
1270 to be piped a shell script in order to honor the #BSUB directives :
1261 to be piped a shell script in order to honor the #BSUB directives :
1271 bsub < script
1262 bsub < script
1272 """
1263 """
1273 # Here we save profile_dir in the context so they
1264 # Here we save profile_dir in the context so they
1274 # can be used in the batch script template as {profile_dir}
1265 # can be used in the batch script template as {profile_dir}
1275 self.write_batch_script(n)
1266 self.write_batch_script(n)
1276 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1267 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1277 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1268 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1278 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1269 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1279 output,err = p.communicate()
1270 output,err = p.communicate()
1280 output = output.decode(DEFAULT_ENCODING, 'replace')
1271 output = output.decode(DEFAULT_ENCODING, 'replace')
1281 job_id = self.parse_job_id(output)
1272 job_id = self.parse_job_id(output)
1282 self.notify_start(job_id)
1273 self.notify_start(job_id)
1283 return job_id
1274 return job_id
1284
1275
1285
1276
1286 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1277 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1287 """Launch a controller using LSF."""
1278 """Launch a controller using LSF."""
1288
1279
1289 batch_file_name = Unicode(u'lsf_controller', config=True,
1280 batch_file_name = Unicode(u'lsf_controller', config=True,
1290 help="batch file name for the controller job.")
1281 help="batch file name for the controller job.")
1291 default_template= Unicode("""#!/bin/sh
1282 default_template= Unicode("""#!/bin/sh
1292 #BSUB -J ipcontroller
1283 #BSUB -J ipcontroller
1293 #BSUB -oo ipcontroller.o.%%J
1284 #BSUB -oo ipcontroller.o.%%J
1294 #BSUB -eo ipcontroller.e.%%J
1285 #BSUB -eo ipcontroller.e.%%J
1295 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1286 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1296 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1287 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1297
1288
1298 def start(self):
1289 def start(self):
1299 """Start the controller by profile or profile_dir."""
1290 """Start the controller by profile or profile_dir."""
1300 return super(LSFControllerLauncher, self).start(1)
1291 return super(LSFControllerLauncher, self).start(1)
1301
1292
1302
1293
1303 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1294 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1304 """Launch Engines using LSF"""
1295 """Launch Engines using LSF"""
1305 batch_file_name = Unicode(u'lsf_engines', config=True,
1296 batch_file_name = Unicode(u'lsf_engines', config=True,
1306 help="batch file name for the engine(s) job.")
1297 help="batch file name for the engine(s) job.")
1307 default_template= Unicode(u"""#!/bin/sh
1298 default_template= Unicode(u"""#!/bin/sh
1308 #BSUB -oo ipengine.o.%%J
1299 #BSUB -oo ipengine.o.%%J
1309 #BSUB -eo ipengine.e.%%J
1300 #BSUB -eo ipengine.e.%%J
1310 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1301 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1311 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1302 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1312
1303
1313 def start(self, n):
1314 """Start n engines by profile or profile_dir."""
1315 return super(LSFEngineSetLauncher, self).start(n)
1316
1317
1304
1318 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1305 # Condor Requires that we launch the ipengine/ipcontroller scripts rather
1319 # that the python instance but otherwise is very similar to PBS
1306 # that the python instance but otherwise is very similar to PBS
1320
1307
1321 class CondorLauncher(BatchSystemLauncher):
1308 class CondorLauncher(BatchSystemLauncher):
1322 """A BatchSystemLauncher subclass for Condor."""
1309 """A BatchSystemLauncher subclass for Condor."""
1323
1310
1324 submit_command = List(['condor_submit'], config=True,
1311 submit_command = List(['condor_submit'], config=True,
1325 help="The Condor submit command ['condor_submit']")
1312 help="The Condor submit command ['condor_submit']")
1326 delete_command = List(['condor_rm'], config=True,
1313 delete_command = List(['condor_rm'], config=True,
1327 help="The Condor delete command ['condor_rm']")
1314 help="The Condor delete command ['condor_rm']")
1328 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1315 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1329 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1316 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1330 job_id_regexp_group = Integer(1, config=True,
1317 job_id_regexp_group = Integer(1, config=True,
1331 help="""The group we wish to match in job_id_regexp [1]""")
1318 help="""The group we wish to match in job_id_regexp [1]""")
1332
1319
1333 job_array_regexp = CRegExp('queue\W+\$')
1320 job_array_regexp = CRegExp('queue\W+\$')
1334 job_array_template = Unicode('queue {n}')
1321 job_array_template = Unicode('queue {n}')
1335
1322
1336
1323
1337 def _insert_job_array_in_script(self):
1324 def _insert_job_array_in_script(self):
1338 """Inserts a job array if required into the batch script.
1325 """Inserts a job array if required into the batch script.
1339 """
1326 """
1340 if not self.job_array_regexp.search(self.batch_template):
1327 if not self.job_array_regexp.search(self.batch_template):
1341 self.log.debug("adding job array settings to batch script")
1328 self.log.debug("adding job array settings to batch script")
1342 #Condor requires that the job array goes at the bottom of the script
1329 #Condor requires that the job array goes at the bottom of the script
1343 self.batch_template = '\n'.join([self.batch_template,
1330 self.batch_template = '\n'.join([self.batch_template,
1344 self.job_array_template])
1331 self.job_array_template])
1345
1332
1346 def _insert_queue_in_script(self):
1333 def _insert_queue_in_script(self):
1347 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1334 """AFAIK, Condor doesn't have a concept of multiple queues that can be
1348 specified in the script..
1335 specified in the script..
1349 """
1336 """
1350 pass
1337 pass
1351
1338
1352
1339
1353 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1340 class CondorControllerLauncher(CondorLauncher, BatchClusterAppMixin):
1354 """Launch a controller using Condor."""
1341 """Launch a controller using Condor."""
1355
1342
1356 batch_file_name = Unicode(u'condor_controller', config=True,
1343 batch_file_name = Unicode(u'condor_controller', config=True,
1357 help="batch file name for the controller job.")
1344 help="batch file name for the controller job.")
1358 default_template = Unicode(r"""
1345 default_template = Unicode(r"""
1359 universe = vanilla
1346 universe = vanilla
1360 executable = %s
1347 executable = %s
1361 # by default we expect a shared file system
1348 # by default we expect a shared file system
1362 transfer_executable = False
1349 transfer_executable = False
1363 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1350 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1364 """ % condor_ipcontroller_cmd_argv)
1351 """ % condor_ipcontroller_cmd_argv)
1365
1352
1366 def start(self):
1353 def start(self):
1367 """Start the controller by profile or profile_dir."""
1354 """Start the controller by profile or profile_dir."""
1368 return super(CondorControllerLauncher, self).start(1)
1355 return super(CondorControllerLauncher, self).start(1)
1369
1356
1370
1357
1371 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1358 class CondorEngineSetLauncher(CondorLauncher, BatchClusterAppMixin):
1372 """Launch Engines using Condor"""
1359 """Launch Engines using Condor"""
1373 batch_file_name = Unicode(u'condor_engines', config=True,
1360 batch_file_name = Unicode(u'condor_engines', config=True,
1374 help="batch file name for the engine(s) job.")
1361 help="batch file name for the engine(s) job.")
1375 default_template = Unicode("""
1362 default_template = Unicode("""
1376 universe = vanilla
1363 universe = vanilla
1377 executable = %s
1364 executable = %s
1378 # by default we expect a shared file system
1365 # by default we expect a shared file system
1379 transfer_executable = False
1366 transfer_executable = False
1380 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1367 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1381 """ % condor_ipengine_cmd_argv)
1368 """ % condor_ipengine_cmd_argv)
1382
1369
1383 def start(self, n):
1384 """Start n engines by profile or profile_dir."""
1385 return super(CondorEngineSetLauncher, self).start(n)
1386
1387
1370
1388 #-----------------------------------------------------------------------------
1371 #-----------------------------------------------------------------------------
1389 # A launcher for ipcluster itself!
1372 # A launcher for ipcluster itself!
1390 #-----------------------------------------------------------------------------
1373 #-----------------------------------------------------------------------------
1391
1374
1392
1375
1393 class IPClusterLauncher(LocalProcessLauncher):
1376 class IPClusterLauncher(LocalProcessLauncher):
1394 """Launch the ipcluster program in an external process."""
1377 """Launch the ipcluster program in an external process."""
1395
1378
1396 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1379 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1397 help="Popen command for ipcluster")
1380 help="Popen command for ipcluster")
1398 ipcluster_args = List(
1381 ipcluster_args = List(
1399 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1382 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1400 help="Command line arguments to pass to ipcluster.")
1383 help="Command line arguments to pass to ipcluster.")
1401 ipcluster_subcommand = Unicode('start')
1384 ipcluster_subcommand = Unicode('start')
1402 profile = Unicode('default')
1385 profile = Unicode('default')
1403 n = Integer(2)
1386 n = Integer(2)
1404
1387
1405 def find_args(self):
1388 def find_args(self):
1406 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1389 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1407 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1390 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1408 self.ipcluster_args
1391 self.ipcluster_args
1409
1392
1410 def start(self):
1393 def start(self):
1411 return super(IPClusterLauncher, self).start()
1394 return super(IPClusterLauncher, self).start()
1412
1395
1413 #-----------------------------------------------------------------------------
1396 #-----------------------------------------------------------------------------
1414 # Collections of launchers
1397 # Collections of launchers
1415 #-----------------------------------------------------------------------------
1398 #-----------------------------------------------------------------------------
1416
1399
1417 local_launchers = [
1400 local_launchers = [
1418 LocalControllerLauncher,
1401 LocalControllerLauncher,
1419 LocalEngineLauncher,
1402 LocalEngineLauncher,
1420 LocalEngineSetLauncher,
1403 LocalEngineSetLauncher,
1421 ]
1404 ]
1422 mpi_launchers = [
1405 mpi_launchers = [
1423 MPILauncher,
1406 MPILauncher,
1424 MPIControllerLauncher,
1407 MPIControllerLauncher,
1425 MPIEngineSetLauncher,
1408 MPIEngineSetLauncher,
1426 ]
1409 ]
1427 ssh_launchers = [
1410 ssh_launchers = [
1428 SSHLauncher,
1411 SSHLauncher,
1429 SSHControllerLauncher,
1412 SSHControllerLauncher,
1430 SSHEngineLauncher,
1413 SSHEngineLauncher,
1431 SSHEngineSetLauncher,
1414 SSHEngineSetLauncher,
1432 SSHProxyEngineSetLauncher,
1415 SSHProxyEngineSetLauncher,
1433 ]
1416 ]
1434 winhpc_launchers = [
1417 winhpc_launchers = [
1435 WindowsHPCLauncher,
1418 WindowsHPCLauncher,
1436 WindowsHPCControllerLauncher,
1419 WindowsHPCControllerLauncher,
1437 WindowsHPCEngineSetLauncher,
1420 WindowsHPCEngineSetLauncher,
1438 ]
1421 ]
1439 pbs_launchers = [
1422 pbs_launchers = [
1440 PBSLauncher,
1423 PBSLauncher,
1441 PBSControllerLauncher,
1424 PBSControllerLauncher,
1442 PBSEngineSetLauncher,
1425 PBSEngineSetLauncher,
1443 ]
1426 ]
1444 sge_launchers = [
1427 sge_launchers = [
1445 SGELauncher,
1428 SGELauncher,
1446 SGEControllerLauncher,
1429 SGEControllerLauncher,
1447 SGEEngineSetLauncher,
1430 SGEEngineSetLauncher,
1448 ]
1431 ]
1449 lsf_launchers = [
1432 lsf_launchers = [
1450 LSFLauncher,
1433 LSFLauncher,
1451 LSFControllerLauncher,
1434 LSFControllerLauncher,
1452 LSFEngineSetLauncher,
1435 LSFEngineSetLauncher,
1453 ]
1436 ]
1454 condor_launchers = [
1437 condor_launchers = [
1455 CondorLauncher,
1438 CondorLauncher,
1456 CondorControllerLauncher,
1439 CondorControllerLauncher,
1457 CondorEngineSetLauncher,
1440 CondorEngineSetLauncher,
1458 ]
1441 ]
1459 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1442 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1460 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
1443 + pbs_launchers + sge_launchers + lsf_launchers + condor_launchers
General Comments 0
You need to be logged in to leave comments. Login now