##// END OF EJS Templates
ensure scp destination directories exist (with mkdir -p)...
MinRK -
Show More
@@ -1,1455 +1,1463 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67 from IPython.utils.py3compat import iteritems, itervalues
67 from IPython.utils.py3compat import iteritems, itervalues
68
68
69 from .win32support import forward_read_events
69 from .win32support import forward_read_events
70
70
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72
72
73 WINDOWS = os.name == 'nt'
73 WINDOWS = os.name == 'nt'
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # Paths to the kernel apps
76 # Paths to the kernel apps
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80
80
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82
82
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84
84
85 if WINDOWS and sys.version_info < (3,):
85 if WINDOWS and sys.version_info < (3,):
86 # `python -m package` doesn't work on Windows Python 2,
86 # `python -m package` doesn't work on Windows Python 2,
87 # but `python -m module` does.
87 # but `python -m module` does.
88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
90
90
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 # Base launchers and errors
92 # Base launchers and errors
93 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
94
94
95 class LauncherError(Exception):
95 class LauncherError(Exception):
96 pass
96 pass
97
97
98
98
99 class ProcessStateError(LauncherError):
99 class ProcessStateError(LauncherError):
100 pass
100 pass
101
101
102
102
103 class UnknownStatus(LauncherError):
103 class UnknownStatus(LauncherError):
104 pass
104 pass
105
105
106
106
107 class BaseLauncher(LoggingConfigurable):
107 class BaseLauncher(LoggingConfigurable):
108 """An asbtraction for starting, stopping and signaling a process."""
108 """An asbtraction for starting, stopping and signaling a process."""
109
109
110 # In all of the launchers, the work_dir is where child processes will be
110 # In all of the launchers, the work_dir is where child processes will be
111 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # run. This will usually be the profile_dir, but may not be. any work_dir
112 # passed into the __init__ method will override the config value.
112 # passed into the __init__ method will override the config value.
113 # This should not be used to set the work_dir for the actual engine
113 # This should not be used to set the work_dir for the actual engine
114 # and controller. Instead, use their own config files or the
114 # and controller. Instead, use their own config files or the
115 # controller_args, engine_args attributes of the launchers to add
115 # controller_args, engine_args attributes of the launchers to add
116 # the work_dir option.
116 # the work_dir option.
117 work_dir = Unicode(u'.')
117 work_dir = Unicode(u'.')
118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
119
119
120 start_data = Any()
120 start_data = Any()
121 stop_data = Any()
121 stop_data = Any()
122
122
123 def _loop_default(self):
123 def _loop_default(self):
124 return ioloop.IOLoop.instance()
124 return ioloop.IOLoop.instance()
125
125
126 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 def __init__(self, work_dir=u'.', config=None, **kwargs):
127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
128 self.state = 'before' # can be before, running, after
128 self.state = 'before' # can be before, running, after
129 self.stop_callbacks = []
129 self.stop_callbacks = []
130 self.start_data = None
130 self.start_data = None
131 self.stop_data = None
131 self.stop_data = None
132
132
133 @property
133 @property
134 def args(self):
134 def args(self):
135 """A list of cmd and args that will be used to start the process.
135 """A list of cmd and args that will be used to start the process.
136
136
137 This is what is passed to :func:`spawnProcess` and the first element
137 This is what is passed to :func:`spawnProcess` and the first element
138 will be the process name.
138 will be the process name.
139 """
139 """
140 return self.find_args()
140 return self.find_args()
141
141
142 def find_args(self):
142 def find_args(self):
143 """The ``.args`` property calls this to find the args list.
143 """The ``.args`` property calls this to find the args list.
144
144
145 Subcommand should implement this to construct the cmd and args.
145 Subcommand should implement this to construct the cmd and args.
146 """
146 """
147 raise NotImplementedError('find_args must be implemented in a subclass')
147 raise NotImplementedError('find_args must be implemented in a subclass')
148
148
149 @property
149 @property
150 def arg_str(self):
150 def arg_str(self):
151 """The string form of the program arguments."""
151 """The string form of the program arguments."""
152 return ' '.join(self.args)
152 return ' '.join(self.args)
153
153
154 @property
154 @property
155 def running(self):
155 def running(self):
156 """Am I running."""
156 """Am I running."""
157 if self.state == 'running':
157 if self.state == 'running':
158 return True
158 return True
159 else:
159 else:
160 return False
160 return False
161
161
162 def start(self):
162 def start(self):
163 """Start the process."""
163 """Start the process."""
164 raise NotImplementedError('start must be implemented in a subclass')
164 raise NotImplementedError('start must be implemented in a subclass')
165
165
166 def stop(self):
166 def stop(self):
167 """Stop the process and notify observers of stopping.
167 """Stop the process and notify observers of stopping.
168
168
169 This method will return None immediately.
169 This method will return None immediately.
170 To observe the actual process stopping, see :meth:`on_stop`.
170 To observe the actual process stopping, see :meth:`on_stop`.
171 """
171 """
172 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
173
173
174 def on_stop(self, f):
174 def on_stop(self, f):
175 """Register a callback to be called with this Launcher's stop_data
175 """Register a callback to be called with this Launcher's stop_data
176 when the process actually finishes.
176 when the process actually finishes.
177 """
177 """
178 if self.state=='after':
178 if self.state=='after':
179 return f(self.stop_data)
179 return f(self.stop_data)
180 else:
180 else:
181 self.stop_callbacks.append(f)
181 self.stop_callbacks.append(f)
182
182
183 def notify_start(self, data):
183 def notify_start(self, data):
184 """Call this to trigger startup actions.
184 """Call this to trigger startup actions.
185
185
186 This logs the process startup and sets the state to 'running'. It is
186 This logs the process startup and sets the state to 'running'. It is
187 a pass-through so it can be used as a callback.
187 a pass-through so it can be used as a callback.
188 """
188 """
189
189
190 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.log.debug('Process %r started: %r', self.args[0], data)
191 self.start_data = data
191 self.start_data = data
192 self.state = 'running'
192 self.state = 'running'
193 return data
193 return data
194
194
195 def notify_stop(self, data):
195 def notify_stop(self, data):
196 """Call this to trigger process stop actions.
196 """Call this to trigger process stop actions.
197
197
198 This logs the process stopping and sets the state to 'after'. Call
198 This logs the process stopping and sets the state to 'after'. Call
199 this to trigger callbacks registered via :meth:`on_stop`."""
199 this to trigger callbacks registered via :meth:`on_stop`."""
200
200
201 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.log.debug('Process %r stopped: %r', self.args[0], data)
202 self.stop_data = data
202 self.stop_data = data
203 self.state = 'after'
203 self.state = 'after'
204 for i in range(len(self.stop_callbacks)):
204 for i in range(len(self.stop_callbacks)):
205 d = self.stop_callbacks.pop()
205 d = self.stop_callbacks.pop()
206 d(data)
206 d(data)
207 return data
207 return data
208
208
209 def signal(self, sig):
209 def signal(self, sig):
210 """Signal the process.
210 """Signal the process.
211
211
212 Parameters
212 Parameters
213 ----------
213 ----------
214 sig : str or int
214 sig : str or int
215 'KILL', 'INT', etc., or any signal number
215 'KILL', 'INT', etc., or any signal number
216 """
216 """
217 raise NotImplementedError('signal must be implemented in a subclass')
217 raise NotImplementedError('signal must be implemented in a subclass')
218
218
219 class ClusterAppMixin(HasTraits):
219 class ClusterAppMixin(HasTraits):
220 """MixIn for cluster args as traits"""
220 """MixIn for cluster args as traits"""
221 profile_dir=Unicode('')
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
222 cluster_id=Unicode('')
223
223
224 @property
224 @property
225 def cluster_args(self):
225 def cluster_args(self):
226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
227
227
228 class ControllerMixin(ClusterAppMixin):
228 class ControllerMixin(ClusterAppMixin):
229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
230 help="""Popen command to launch ipcontroller.""")
230 help="""Popen command to launch ipcontroller.""")
231 # Command line arguments to ipcontroller.
231 # Command line arguments to ipcontroller.
232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
233 help="""command-line args to pass to ipcontroller""")
233 help="""command-line args to pass to ipcontroller""")
234
234
235 class EngineMixin(ClusterAppMixin):
235 class EngineMixin(ClusterAppMixin):
236 engine_cmd = List(ipengine_cmd_argv, config=True,
236 engine_cmd = List(ipengine_cmd_argv, config=True,
237 help="""command to launch the Engine.""")
237 help="""command to launch the Engine.""")
238 # Command line arguments for ipengine.
238 # Command line arguments for ipengine.
239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
240 help="command-line arguments to pass to ipengine"
240 help="command-line arguments to pass to ipengine"
241 )
241 )
242
242
243
243
244 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
245 # Local process launchers
245 # Local process launchers
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247
247
248
248
249 class LocalProcessLauncher(BaseLauncher):
249 class LocalProcessLauncher(BaseLauncher):
250 """Start and stop an external process in an asynchronous manner.
250 """Start and stop an external process in an asynchronous manner.
251
251
252 This will launch the external process with a working directory of
252 This will launch the external process with a working directory of
253 ``self.work_dir``.
253 ``self.work_dir``.
254 """
254 """
255
255
256 # This is used to to construct self.args, which is passed to
256 # This is used to to construct self.args, which is passed to
257 # spawnProcess.
257 # spawnProcess.
258 cmd_and_args = List([])
258 cmd_and_args = List([])
259 poll_frequency = Integer(100) # in ms
259 poll_frequency = Integer(100) # in ms
260
260
261 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 def __init__(self, work_dir=u'.', config=None, **kwargs):
262 super(LocalProcessLauncher, self).__init__(
262 super(LocalProcessLauncher, self).__init__(
263 work_dir=work_dir, config=config, **kwargs
263 work_dir=work_dir, config=config, **kwargs
264 )
264 )
265 self.process = None
265 self.process = None
266 self.poller = None
266 self.poller = None
267
267
268 def find_args(self):
268 def find_args(self):
269 return self.cmd_and_args
269 return self.cmd_and_args
270
270
271 def start(self):
271 def start(self):
272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
273 if self.state == 'before':
273 if self.state == 'before':
274 self.process = Popen(self.args,
274 self.process = Popen(self.args,
275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 env=os.environ,
276 env=os.environ,
277 cwd=self.work_dir
277 cwd=self.work_dir
278 )
278 )
279 if WINDOWS:
279 if WINDOWS:
280 self.stdout = forward_read_events(self.process.stdout)
280 self.stdout = forward_read_events(self.process.stdout)
281 self.stderr = forward_read_events(self.process.stderr)
281 self.stderr = forward_read_events(self.process.stderr)
282 else:
282 else:
283 self.stdout = self.process.stdout.fileno()
283 self.stdout = self.process.stdout.fileno()
284 self.stderr = self.process.stderr.fileno()
284 self.stderr = self.process.stderr.fileno()
285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller.start()
288 self.poller.start()
289 self.notify_start(self.process.pid)
289 self.notify_start(self.process.pid)
290 else:
290 else:
291 s = 'The process was already started and has state: %r' % self.state
291 s = 'The process was already started and has state: %r' % self.state
292 raise ProcessStateError(s)
292 raise ProcessStateError(s)
293
293
294 def stop(self):
294 def stop(self):
295 return self.interrupt_then_kill()
295 return self.interrupt_then_kill()
296
296
297 def signal(self, sig):
297 def signal(self, sig):
298 if self.state == 'running':
298 if self.state == 'running':
299 if WINDOWS and sig != SIGINT:
299 if WINDOWS and sig != SIGINT:
300 # use Windows tree-kill for better child cleanup
300 # use Windows tree-kill for better child cleanup
301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 else:
302 else:
303 self.process.send_signal(sig)
303 self.process.send_signal(sig)
304
304
305 def interrupt_then_kill(self, delay=2.0):
305 def interrupt_then_kill(self, delay=2.0):
306 """Send INT, wait a delay and then send KILL."""
306 """Send INT, wait a delay and then send KILL."""
307 try:
307 try:
308 self.signal(SIGINT)
308 self.signal(SIGINT)
309 except Exception:
309 except Exception:
310 self.log.debug("interrupt failed")
310 self.log.debug("interrupt failed")
311 pass
311 pass
312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer.start()
313 self.killer.start()
314
314
315 # callbacks, etc:
315 # callbacks, etc:
316
316
317 def handle_stdout(self, fd, events):
317 def handle_stdout(self, fd, events):
318 if WINDOWS:
318 if WINDOWS:
319 line = self.stdout.recv()
319 line = self.stdout.recv()
320 else:
320 else:
321 line = self.process.stdout.readline()
321 line = self.process.stdout.readline()
322 # a stopped process will be readable but return empty strings
322 # a stopped process will be readable but return empty strings
323 if line:
323 if line:
324 self.log.debug(line[:-1])
324 self.log.debug(line[:-1])
325 else:
325 else:
326 self.poll()
326 self.poll()
327
327
328 def handle_stderr(self, fd, events):
328 def handle_stderr(self, fd, events):
329 if WINDOWS:
329 if WINDOWS:
330 line = self.stderr.recv()
330 line = self.stderr.recv()
331 else:
331 else:
332 line = self.process.stderr.readline()
332 line = self.process.stderr.readline()
333 # a stopped process will be readable but return empty strings
333 # a stopped process will be readable but return empty strings
334 if line:
334 if line:
335 self.log.debug(line[:-1])
335 self.log.debug(line[:-1])
336 else:
336 else:
337 self.poll()
337 self.poll()
338
338
339 def poll(self):
339 def poll(self):
340 status = self.process.poll()
340 status = self.process.poll()
341 if status is not None:
341 if status is not None:
342 self.poller.stop()
342 self.poller.stop()
343 self.loop.remove_handler(self.stdout)
343 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stderr)
344 self.loop.remove_handler(self.stderr)
345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 return status
346 return status
347
347
348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 """Launch a controller as a regular external process."""
349 """Launch a controller as a regular external process."""
350
350
351 def find_args(self):
351 def find_args(self):
352 return self.controller_cmd + self.cluster_args + self.controller_args
352 return self.controller_cmd + self.cluster_args + self.controller_args
353
353
354 def start(self):
354 def start(self):
355 """Start the controller by profile_dir."""
355 """Start the controller by profile_dir."""
356 return super(LocalControllerLauncher, self).start()
356 return super(LocalControllerLauncher, self).start()
357
357
358
358
359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
360 """Launch a single engine as a regular externall process."""
360 """Launch a single engine as a regular externall process."""
361
361
362 def find_args(self):
362 def find_args(self):
363 return self.engine_cmd + self.cluster_args + self.engine_args
363 return self.engine_cmd + self.cluster_args + self.engine_args
364
364
365
365
366 class LocalEngineSetLauncher(LocalEngineLauncher):
366 class LocalEngineSetLauncher(LocalEngineLauncher):
367 """Launch a set of engines as regular external processes."""
367 """Launch a set of engines as regular external processes."""
368
368
369 delay = CFloat(0.1, config=True,
369 delay = CFloat(0.1, config=True,
370 help="""delay (in seconds) between starting each engine after the first.
370 help="""delay (in seconds) between starting each engine after the first.
371 This can help force the engines to get their ids in order, or limit
371 This can help force the engines to get their ids in order, or limit
372 process flood when starting many engines."""
372 process flood when starting many engines."""
373 )
373 )
374
374
375 # launcher class
375 # launcher class
376 launcher_class = LocalEngineLauncher
376 launcher_class = LocalEngineLauncher
377
377
378 launchers = Dict()
378 launchers = Dict()
379 stop_data = Dict()
379 stop_data = Dict()
380
380
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 super(LocalEngineSetLauncher, self).__init__(
382 super(LocalEngineSetLauncher, self).__init__(
383 work_dir=work_dir, config=config, **kwargs
383 work_dir=work_dir, config=config, **kwargs
384 )
384 )
385 self.stop_data = {}
385 self.stop_data = {}
386
386
387 def start(self, n):
387 def start(self, n):
388 """Start n engines by profile or profile_dir."""
388 """Start n engines by profile or profile_dir."""
389 dlist = []
389 dlist = []
390 for i in range(n):
390 for i in range(n):
391 if i > 0:
391 if i > 0:
392 time.sleep(self.delay)
392 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
395 )
395 )
396
396
397 # Copy the engine args over to each engine launcher.
397 # Copy the engine args over to each engine launcher.
398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
399 el.engine_args = copy.deepcopy(self.engine_args)
399 el.engine_args = copy.deepcopy(self.engine_args)
400 el.on_stop(self._notice_engine_stopped)
400 el.on_stop(self._notice_engine_stopped)
401 d = el.start()
401 d = el.start()
402 self.launchers[i] = el
402 self.launchers[i] = el
403 dlist.append(d)
403 dlist.append(d)
404 self.notify_start(dlist)
404 self.notify_start(dlist)
405 return dlist
405 return dlist
406
406
407 def find_args(self):
407 def find_args(self):
408 return ['engine set']
408 return ['engine set']
409
409
410 def signal(self, sig):
410 def signal(self, sig):
411 dlist = []
411 dlist = []
412 for el in itervalues(self.launchers):
412 for el in itervalues(self.launchers):
413 d = el.signal(sig)
413 d = el.signal(sig)
414 dlist.append(d)
414 dlist.append(d)
415 return dlist
415 return dlist
416
416
417 def interrupt_then_kill(self, delay=1.0):
417 def interrupt_then_kill(self, delay=1.0):
418 dlist = []
418 dlist = []
419 for el in itervalues(self.launchers):
419 for el in itervalues(self.launchers):
420 d = el.interrupt_then_kill(delay)
420 d = el.interrupt_then_kill(delay)
421 dlist.append(d)
421 dlist.append(d)
422 return dlist
422 return dlist
423
423
424 def stop(self):
424 def stop(self):
425 return self.interrupt_then_kill()
425 return self.interrupt_then_kill()
426
426
427 def _notice_engine_stopped(self, data):
427 def _notice_engine_stopped(self, data):
428 pid = data['pid']
428 pid = data['pid']
429 for idx,el in iteritems(self.launchers):
429 for idx,el in iteritems(self.launchers):
430 if el.process.pid == pid:
430 if el.process.pid == pid:
431 break
431 break
432 self.launchers.pop(idx)
432 self.launchers.pop(idx)
433 self.stop_data[idx] = data
433 self.stop_data[idx] = data
434 if not self.launchers:
434 if not self.launchers:
435 self.notify_stop(self.stop_data)
435 self.notify_stop(self.stop_data)
436
436
437
437
438 #-----------------------------------------------------------------------------
438 #-----------------------------------------------------------------------------
439 # MPI launchers
439 # MPI launchers
440 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
441
441
442
442
443 class MPILauncher(LocalProcessLauncher):
443 class MPILauncher(LocalProcessLauncher):
444 """Launch an external process using mpiexec."""
444 """Launch an external process using mpiexec."""
445
445
446 mpi_cmd = List(['mpiexec'], config=True,
446 mpi_cmd = List(['mpiexec'], config=True,
447 help="The mpiexec command to use in starting the process."
447 help="The mpiexec command to use in starting the process."
448 )
448 )
449 mpi_args = List([], config=True,
449 mpi_args = List([], config=True,
450 help="The command line arguments to pass to mpiexec."
450 help="The command line arguments to pass to mpiexec."
451 )
451 )
452 program = List(['date'],
452 program = List(['date'],
453 help="The program to start via mpiexec.")
453 help="The program to start via mpiexec.")
454 program_args = List([],
454 program_args = List([],
455 help="The command line argument to the program."
455 help="The command line argument to the program."
456 )
456 )
457 n = Integer(1)
457 n = Integer(1)
458
458
459 def __init__(self, *args, **kwargs):
459 def __init__(self, *args, **kwargs):
460 # deprecation for old MPIExec names:
460 # deprecation for old MPIExec names:
461 config = kwargs.get('config', {})
461 config = kwargs.get('config', {})
462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
463 deprecated = config.get(oldname)
463 deprecated = config.get(oldname)
464 if deprecated:
464 if deprecated:
465 newname = oldname.replace('MPIExec', 'MPI')
465 newname = oldname.replace('MPIExec', 'MPI')
466 config[newname].update(deprecated)
466 config[newname].update(deprecated)
467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
468
468
469 super(MPILauncher, self).__init__(*args, **kwargs)
469 super(MPILauncher, self).__init__(*args, **kwargs)
470
470
471 def find_args(self):
471 def find_args(self):
472 """Build self.args using all the fields."""
472 """Build self.args using all the fields."""
473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
474 self.program + self.program_args
474 self.program + self.program_args
475
475
476 def start(self, n):
476 def start(self, n):
477 """Start n instances of the program using mpiexec."""
477 """Start n instances of the program using mpiexec."""
478 self.n = n
478 self.n = n
479 return super(MPILauncher, self).start()
479 return super(MPILauncher, self).start()
480
480
481
481
482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
483 """Launch a controller using mpiexec."""
483 """Launch a controller using mpiexec."""
484
484
485 # alias back to *non-configurable* program[_args] for use in find_args()
485 # alias back to *non-configurable* program[_args] for use in find_args()
486 # this way all Controller/EngineSetLaunchers have the same form, rather
486 # this way all Controller/EngineSetLaunchers have the same form, rather
487 # than *some* having `program_args` and others `controller_args`
487 # than *some* having `program_args` and others `controller_args`
488 @property
488 @property
489 def program(self):
489 def program(self):
490 return self.controller_cmd
490 return self.controller_cmd
491
491
492 @property
492 @property
493 def program_args(self):
493 def program_args(self):
494 return self.cluster_args + self.controller_args
494 return self.cluster_args + self.controller_args
495
495
496 def start(self):
496 def start(self):
497 """Start the controller by profile_dir."""
497 """Start the controller by profile_dir."""
498 return super(MPIControllerLauncher, self).start(1)
498 return super(MPIControllerLauncher, self).start(1)
499
499
500
500
501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
502 """Launch engines using mpiexec"""
502 """Launch engines using mpiexec"""
503
503
504 # alias back to *non-configurable* program[_args] for use in find_args()
504 # alias back to *non-configurable* program[_args] for use in find_args()
505 # this way all Controller/EngineSetLaunchers have the same form, rather
505 # this way all Controller/EngineSetLaunchers have the same form, rather
506 # than *some* having `program_args` and others `controller_args`
506 # than *some* having `program_args` and others `controller_args`
507 @property
507 @property
508 def program(self):
508 def program(self):
509 return self.engine_cmd
509 return self.engine_cmd
510
510
511 @property
511 @property
512 def program_args(self):
512 def program_args(self):
513 return self.cluster_args + self.engine_args
513 return self.cluster_args + self.engine_args
514
514
515 def start(self, n):
515 def start(self, n):
516 """Start n engines by profile or profile_dir."""
516 """Start n engines by profile or profile_dir."""
517 self.n = n
517 self.n = n
518 return super(MPIEngineSetLauncher, self).start(n)
518 return super(MPIEngineSetLauncher, self).start(n)
519
519
520 # deprecated MPIExec names
520 # deprecated MPIExec names
521 class DeprecatedMPILauncher(object):
521 class DeprecatedMPILauncher(object):
522 def warn(self):
522 def warn(self):
523 oldname = self.__class__.__name__
523 oldname = self.__class__.__name__
524 newname = oldname.replace('MPIExec', 'MPI')
524 newname = oldname.replace('MPIExec', 'MPI')
525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
526
526
527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
528 """Deprecated, use MPILauncher"""
528 """Deprecated, use MPILauncher"""
529 def __init__(self, *args, **kwargs):
529 def __init__(self, *args, **kwargs):
530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
531 self.warn()
531 self.warn()
532
532
533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
534 """Deprecated, use MPIControllerLauncher"""
534 """Deprecated, use MPIControllerLauncher"""
535 def __init__(self, *args, **kwargs):
535 def __init__(self, *args, **kwargs):
536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
537 self.warn()
537 self.warn()
538
538
539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
540 """Deprecated, use MPIEngineSetLauncher"""
540 """Deprecated, use MPIEngineSetLauncher"""
541 def __init__(self, *args, **kwargs):
541 def __init__(self, *args, **kwargs):
542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
543 self.warn()
543 self.warn()
544
544
545
545
546 #-----------------------------------------------------------------------------
546 #-----------------------------------------------------------------------------
547 # SSH launchers
547 # SSH launchers
548 #-----------------------------------------------------------------------------
548 #-----------------------------------------------------------------------------
549
549
550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
551
551
552 class SSHLauncher(LocalProcessLauncher):
552 class SSHLauncher(LocalProcessLauncher):
553 """A minimal launcher for ssh.
553 """A minimal launcher for ssh.
554
554
555 To be useful this will probably have to be extended to use the ``sshx``
555 To be useful this will probably have to be extended to use the ``sshx``
556 idea for environment variables. There could be other things this needs
556 idea for environment variables. There could be other things this needs
557 as well.
557 as well.
558 """
558 """
559
559
560 ssh_cmd = List(['ssh'], config=True,
560 ssh_cmd = List(['ssh'], config=True,
561 help="command for starting ssh")
561 help="command for starting ssh")
562 ssh_args = List(['-tt'], config=True,
562 ssh_args = List(['-tt'], config=True,
563 help="args to pass to ssh")
563 help="args to pass to ssh")
564 scp_cmd = List(['scp'], config=True,
564 scp_cmd = List(['scp'], config=True,
565 help="command for sending files")
565 help="command for sending files")
566 program = List(['date'],
566 program = List(['date'],
567 help="Program to launch via ssh")
567 help="Program to launch via ssh")
568 program_args = List([],
568 program_args = List([],
569 help="args to pass to remote program")
569 help="args to pass to remote program")
570 hostname = Unicode('', config=True,
570 hostname = Unicode('', config=True,
571 help="hostname on which to launch the program")
571 help="hostname on which to launch the program")
572 user = Unicode('', config=True,
572 user = Unicode('', config=True,
573 help="username for ssh")
573 help="username for ssh")
574 location = Unicode('', config=True,
574 location = Unicode('', config=True,
575 help="user@hostname location for ssh in one setting")
575 help="user@hostname location for ssh in one setting")
576 to_fetch = List([], config=True,
576 to_fetch = List([], config=True,
577 help="List of (remote, local) files to fetch after starting")
577 help="List of (remote, local) files to fetch after starting")
578 to_send = List([], config=True,
578 to_send = List([], config=True,
579 help="List of (local, remote) files to send before starting")
579 help="List of (local, remote) files to send before starting")
580
580
581 def _hostname_changed(self, name, old, new):
581 def _hostname_changed(self, name, old, new):
582 if self.user:
582 if self.user:
583 self.location = u'%s@%s' % (self.user, new)
583 self.location = u'%s@%s' % (self.user, new)
584 else:
584 else:
585 self.location = new
585 self.location = new
586
586
587 def _user_changed(self, name, old, new):
587 def _user_changed(self, name, old, new):
588 self.location = u'%s@%s' % (new, self.hostname)
588 self.location = u'%s@%s' % (new, self.hostname)
589
589
590 def find_args(self):
590 def find_args(self):
591 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 return self.ssh_cmd + self.ssh_args + [self.location] + \
592 list(map(pipes.quote, self.program + self.program_args))
592 list(map(pipes.quote, self.program + self.program_args))
593
593
594 def _send_file(self, local, remote):
594 def _send_file(self, local, remote):
595 """send a single file"""
595 """send a single file"""
596 remote = "%s:%s" % (self.location, remote)
596 remote = "%s:%s" % (self.location, remote)
597 for i in range(10):
597 for i in range(10):
598 if not os.path.exists(local):
598 if not os.path.exists(local):
599 self.log.debug("waiting for %s" % local)
599 self.log.debug("waiting for %s" % local)
600 time.sleep(1)
600 time.sleep(1)
601 else:
601 else:
602 break
602 break
603 remote_dir = os.path.dirname(remote)
604 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
605 check_output(self.ssh_cmd + self.ssh_args + \
606 [self.location, 'mkdir', '-p', '--', remote_dir]
607 )
603 self.log.info("sending %s to %s", local, remote)
608 self.log.info("sending %s to %s", local, remote)
604 check_output(self.scp_cmd + [local, remote])
609 check_output(self.scp_cmd + [local, remote])
605
610
606 def send_files(self):
611 def send_files(self):
607 """send our files (called before start)"""
612 """send our files (called before start)"""
608 if not self.to_send:
613 if not self.to_send:
609 return
614 return
610 for local_file, remote_file in self.to_send:
615 for local_file, remote_file in self.to_send:
611 self._send_file(local_file, remote_file)
616 self._send_file(local_file, remote_file)
612
617
613 def _fetch_file(self, remote, local):
618 def _fetch_file(self, remote, local):
614 """fetch a single file"""
619 """fetch a single file"""
615 full_remote = "%s:%s" % (self.location, remote)
620 full_remote = "%s:%s" % (self.location, remote)
616 self.log.info("fetching %s from %s", local, full_remote)
621 self.log.info("fetching %s from %s", local, full_remote)
617 for i in range(10):
622 for i in range(10):
618 # wait up to 10s for remote file to exist
623 # wait up to 10s for remote file to exist
619 check = check_output(self.ssh_cmd + self.ssh_args + \
624 check = check_output(self.ssh_cmd + self.ssh_args + \
620 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
625 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
621 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
626 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
622 if check == u'no':
627 if check == u'no':
623 time.sleep(1)
628 time.sleep(1)
624 elif check == u'yes':
629 elif check == u'yes':
625 break
630 break
631 local_dir = os.path.dirname(local)
632 if not os.path.exists(local_dir):
633 os.makedirs(local_dir)
626 check_output(self.scp_cmd + [full_remote, local])
634 check_output(self.scp_cmd + [full_remote, local])
627
635
628 def fetch_files(self):
636 def fetch_files(self):
629 """fetch remote files (called after start)"""
637 """fetch remote files (called after start)"""
630 if not self.to_fetch:
638 if not self.to_fetch:
631 return
639 return
632 for remote_file, local_file in self.to_fetch:
640 for remote_file, local_file in self.to_fetch:
633 self._fetch_file(remote_file, local_file)
641 self._fetch_file(remote_file, local_file)
634
642
635 def start(self, hostname=None, user=None):
643 def start(self, hostname=None, user=None):
636 if hostname is not None:
644 if hostname is not None:
637 self.hostname = hostname
645 self.hostname = hostname
638 if user is not None:
646 if user is not None:
639 self.user = user
647 self.user = user
640
648
641 self.send_files()
649 self.send_files()
642 super(SSHLauncher, self).start()
650 super(SSHLauncher, self).start()
643 self.fetch_files()
651 self.fetch_files()
644
652
645 def signal(self, sig):
653 def signal(self, sig):
646 if self.state == 'running':
654 if self.state == 'running':
647 # send escaped ssh connection-closer
655 # send escaped ssh connection-closer
648 self.process.stdin.write('~.')
656 self.process.stdin.write('~.')
649 self.process.stdin.flush()
657 self.process.stdin.flush()
650
658
651 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
659 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
652
660
653 remote_profile_dir = Unicode('', config=True,
661 remote_profile_dir = Unicode('', config=True,
654 help="""The remote profile_dir to use.
662 help="""The remote profile_dir to use.
655
663
656 If not specified, use calling profile, stripping out possible leading homedir.
664 If not specified, use calling profile, stripping out possible leading homedir.
657 """)
665 """)
658
666
659 def _profile_dir_changed(self, name, old, new):
667 def _profile_dir_changed(self, name, old, new):
660 if not self.remote_profile_dir:
668 if not self.remote_profile_dir:
661 # trigger remote_profile_dir_default logic again,
669 # trigger remote_profile_dir_default logic again,
662 # in case it was already triggered before profile_dir was set
670 # in case it was already triggered before profile_dir was set
663 self.remote_profile_dir = self._strip_home(new)
671 self.remote_profile_dir = self._strip_home(new)
664
672
665 @staticmethod
673 @staticmethod
666 def _strip_home(path):
674 def _strip_home(path):
667 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
675 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
668 home = get_home_dir()
676 home = get_home_dir()
669 if not home.endswith('/'):
677 if not home.endswith('/'):
670 home = home+'/'
678 home = home+'/'
671
679
672 if path.startswith(home):
680 if path.startswith(home):
673 return path[len(home):]
681 return path[len(home):]
674 else:
682 else:
675 return path
683 return path
676
684
677 def _remote_profile_dir_default(self):
685 def _remote_profile_dir_default(self):
678 return self._strip_home(self.profile_dir)
686 return self._strip_home(self.profile_dir)
679
687
680 def _cluster_id_changed(self, name, old, new):
688 def _cluster_id_changed(self, name, old, new):
681 if new:
689 if new:
682 raise ValueError("cluster id not supported by SSH launchers")
690 raise ValueError("cluster id not supported by SSH launchers")
683
691
684 @property
692 @property
685 def cluster_args(self):
693 def cluster_args(self):
686 return ['--profile-dir', self.remote_profile_dir]
694 return ['--profile-dir', self.remote_profile_dir]
687
695
688 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
696 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
689
697
690 # alias back to *non-configurable* program[_args] for use in find_args()
698 # alias back to *non-configurable* program[_args] for use in find_args()
691 # this way all Controller/EngineSetLaunchers have the same form, rather
699 # this way all Controller/EngineSetLaunchers have the same form, rather
692 # than *some* having `program_args` and others `controller_args`
700 # than *some* having `program_args` and others `controller_args`
693
701
694 def _controller_cmd_default(self):
702 def _controller_cmd_default(self):
695 return ['ipcontroller']
703 return ['ipcontroller']
696
704
697 @property
705 @property
698 def program(self):
706 def program(self):
699 return self.controller_cmd
707 return self.controller_cmd
700
708
701 @property
709 @property
702 def program_args(self):
710 def program_args(self):
703 return self.cluster_args + self.controller_args
711 return self.cluster_args + self.controller_args
704
712
705 def _to_fetch_default(self):
713 def _to_fetch_default(self):
706 return [
714 return [
707 (os.path.join(self.remote_profile_dir, 'security', cf),
715 (os.path.join(self.remote_profile_dir, 'security', cf),
708 os.path.join(self.profile_dir, 'security', cf),)
716 os.path.join(self.profile_dir, 'security', cf),)
709 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
717 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
710 ]
718 ]
711
719
712 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
720 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
713
721
714 # alias back to *non-configurable* program[_args] for use in find_args()
722 # alias back to *non-configurable* program[_args] for use in find_args()
715 # this way all Controller/EngineSetLaunchers have the same form, rather
723 # this way all Controller/EngineSetLaunchers have the same form, rather
716 # than *some* having `program_args` and others `controller_args`
724 # than *some* having `program_args` and others `controller_args`
717
725
718 def _engine_cmd_default(self):
726 def _engine_cmd_default(self):
719 return ['ipengine']
727 return ['ipengine']
720
728
721 @property
729 @property
722 def program(self):
730 def program(self):
723 return self.engine_cmd
731 return self.engine_cmd
724
732
725 @property
733 @property
726 def program_args(self):
734 def program_args(self):
727 return self.cluster_args + self.engine_args
735 return self.cluster_args + self.engine_args
728
736
729 def _to_send_default(self):
737 def _to_send_default(self):
730 return [
738 return [
731 (os.path.join(self.profile_dir, 'security', cf),
739 (os.path.join(self.profile_dir, 'security', cf),
732 os.path.join(self.remote_profile_dir, 'security', cf))
740 os.path.join(self.remote_profile_dir, 'security', cf))
733 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
741 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
734 ]
742 ]
735
743
736
744
737 class SSHEngineSetLauncher(LocalEngineSetLauncher):
745 class SSHEngineSetLauncher(LocalEngineSetLauncher):
738 launcher_class = SSHEngineLauncher
746 launcher_class = SSHEngineLauncher
739 engines = Dict(config=True,
747 engines = Dict(config=True,
740 help="""dict of engines to launch. This is a dict by hostname of ints,
748 help="""dict of engines to launch. This is a dict by hostname of ints,
741 corresponding to the number of engines to start on that host.""")
749 corresponding to the number of engines to start on that host.""")
742
750
743 def _engine_cmd_default(self):
751 def _engine_cmd_default(self):
744 return ['ipengine']
752 return ['ipengine']
745
753
746 @property
754 @property
747 def engine_count(self):
755 def engine_count(self):
748 """determine engine count from `engines` dict"""
756 """determine engine count from `engines` dict"""
749 count = 0
757 count = 0
750 for n in itervalues(self.engines):
758 for n in itervalues(self.engines):
751 if isinstance(n, (tuple,list)):
759 if isinstance(n, (tuple,list)):
752 n,args = n
760 n,args = n
753 count += n
761 count += n
754 return count
762 return count
755
763
756 def start(self, n):
764 def start(self, n):
757 """Start engines by profile or profile_dir.
765 """Start engines by profile or profile_dir.
758 `n` is ignored, and the `engines` config property is used instead.
766 `n` is ignored, and the `engines` config property is used instead.
759 """
767 """
760
768
761 dlist = []
769 dlist = []
762 for host, n in iteritems(self.engines):
770 for host, n in iteritems(self.engines):
763 if isinstance(n, (tuple, list)):
771 if isinstance(n, (tuple, list)):
764 n, args = n
772 n, args = n
765 else:
773 else:
766 args = copy.deepcopy(self.engine_args)
774 args = copy.deepcopy(self.engine_args)
767
775
768 if '@' in host:
776 if '@' in host:
769 user,host = host.split('@',1)
777 user,host = host.split('@',1)
770 else:
778 else:
771 user=None
779 user=None
772 for i in range(n):
780 for i in range(n):
773 if i > 0:
781 if i > 0:
774 time.sleep(self.delay)
782 time.sleep(self.delay)
775 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
783 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
776 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
784 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
777 )
785 )
778 if i > 0:
786 if i > 0:
779 # only send files for the first engine on each host
787 # only send files for the first engine on each host
780 el.to_send = []
788 el.to_send = []
781
789
782 # Copy the engine args over to each engine launcher.
790 # Copy the engine args over to each engine launcher.
783 el.engine_cmd = self.engine_cmd
791 el.engine_cmd = self.engine_cmd
784 el.engine_args = args
792 el.engine_args = args
785 el.on_stop(self._notice_engine_stopped)
793 el.on_stop(self._notice_engine_stopped)
786 d = el.start(user=user, hostname=host)
794 d = el.start(user=user, hostname=host)
787 self.launchers[ "%s/%i" % (host,i) ] = el
795 self.launchers[ "%s/%i" % (host,i) ] = el
788 dlist.append(d)
796 dlist.append(d)
789 self.notify_start(dlist)
797 self.notify_start(dlist)
790 return dlist
798 return dlist
791
799
792
800
793 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
801 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
794 """Launcher for calling
802 """Launcher for calling
795 `ipcluster engines` on a remote machine.
803 `ipcluster engines` on a remote machine.
796
804
797 Requires that remote profile is already configured.
805 Requires that remote profile is already configured.
798 """
806 """
799
807
800 n = Integer()
808 n = Integer()
801 ipcluster_cmd = List(['ipcluster'], config=True)
809 ipcluster_cmd = List(['ipcluster'], config=True)
802
810
803 @property
811 @property
804 def program(self):
812 def program(self):
805 return self.ipcluster_cmd + ['engines']
813 return self.ipcluster_cmd + ['engines']
806
814
807 @property
815 @property
808 def program_args(self):
816 def program_args(self):
809 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
817 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
810
818
811 def _to_send_default(self):
819 def _to_send_default(self):
812 return [
820 return [
813 (os.path.join(self.profile_dir, 'security', cf),
821 (os.path.join(self.profile_dir, 'security', cf),
814 os.path.join(self.remote_profile_dir, 'security', cf))
822 os.path.join(self.remote_profile_dir, 'security', cf))
815 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
823 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
816 ]
824 ]
817
825
818 def start(self, n):
826 def start(self, n):
819 self.n = n
827 self.n = n
820 super(SSHProxyEngineSetLauncher, self).start()
828 super(SSHProxyEngineSetLauncher, self).start()
821
829
822
830
823 #-----------------------------------------------------------------------------
831 #-----------------------------------------------------------------------------
824 # Windows HPC Server 2008 scheduler launchers
832 # Windows HPC Server 2008 scheduler launchers
825 #-----------------------------------------------------------------------------
833 #-----------------------------------------------------------------------------
826
834
827
835
828 # This is only used on Windows.
836 # This is only used on Windows.
829 def find_job_cmd():
837 def find_job_cmd():
830 if WINDOWS:
838 if WINDOWS:
831 try:
839 try:
832 return find_cmd('job')
840 return find_cmd('job')
833 except (FindCmdError, ImportError):
841 except (FindCmdError, ImportError):
834 # ImportError will be raised if win32api is not installed
842 # ImportError will be raised if win32api is not installed
835 return 'job'
843 return 'job'
836 else:
844 else:
837 return 'job'
845 return 'job'
838
846
839
847
840 class WindowsHPCLauncher(BaseLauncher):
848 class WindowsHPCLauncher(BaseLauncher):
841
849
842 job_id_regexp = CRegExp(r'\d+', config=True,
850 job_id_regexp = CRegExp(r'\d+', config=True,
843 help="""A regular expression used to get the job id from the output of the
851 help="""A regular expression used to get the job id from the output of the
844 submit_command. """
852 submit_command. """
845 )
853 )
846 job_file_name = Unicode(u'ipython_job.xml', config=True,
854 job_file_name = Unicode(u'ipython_job.xml', config=True,
847 help="The filename of the instantiated job script.")
855 help="The filename of the instantiated job script.")
848 # The full path to the instantiated job script. This gets made dynamically
856 # The full path to the instantiated job script. This gets made dynamically
849 # by combining the work_dir with the job_file_name.
857 # by combining the work_dir with the job_file_name.
850 job_file = Unicode(u'')
858 job_file = Unicode(u'')
851 scheduler = Unicode('', config=True,
859 scheduler = Unicode('', config=True,
852 help="The hostname of the scheduler to submit the job to.")
860 help="The hostname of the scheduler to submit the job to.")
853 job_cmd = Unicode(find_job_cmd(), config=True,
861 job_cmd = Unicode(find_job_cmd(), config=True,
854 help="The command for submitting jobs.")
862 help="The command for submitting jobs.")
855
863
856 def __init__(self, work_dir=u'.', config=None, **kwargs):
864 def __init__(self, work_dir=u'.', config=None, **kwargs):
857 super(WindowsHPCLauncher, self).__init__(
865 super(WindowsHPCLauncher, self).__init__(
858 work_dir=work_dir, config=config, **kwargs
866 work_dir=work_dir, config=config, **kwargs
859 )
867 )
860
868
861 @property
869 @property
862 def job_file(self):
870 def job_file(self):
863 return os.path.join(self.work_dir, self.job_file_name)
871 return os.path.join(self.work_dir, self.job_file_name)
864
872
865 def write_job_file(self, n):
873 def write_job_file(self, n):
866 raise NotImplementedError("Implement write_job_file in a subclass.")
874 raise NotImplementedError("Implement write_job_file in a subclass.")
867
875
868 def find_args(self):
876 def find_args(self):
869 return [u'job.exe']
877 return [u'job.exe']
870
878
871 def parse_job_id(self, output):
879 def parse_job_id(self, output):
872 """Take the output of the submit command and return the job id."""
880 """Take the output of the submit command and return the job id."""
873 m = self.job_id_regexp.search(output)
881 m = self.job_id_regexp.search(output)
874 if m is not None:
882 if m is not None:
875 job_id = m.group()
883 job_id = m.group()
876 else:
884 else:
877 raise LauncherError("Job id couldn't be determined: %s" % output)
885 raise LauncherError("Job id couldn't be determined: %s" % output)
878 self.job_id = job_id
886 self.job_id = job_id
879 self.log.info('Job started with id: %r', job_id)
887 self.log.info('Job started with id: %r', job_id)
880 return job_id
888 return job_id
881
889
882 def start(self, n):
890 def start(self, n):
883 """Start n copies of the process using the Win HPC job scheduler."""
891 """Start n copies of the process using the Win HPC job scheduler."""
884 self.write_job_file(n)
892 self.write_job_file(n)
885 args = [
893 args = [
886 'submit',
894 'submit',
887 '/jobfile:%s' % self.job_file,
895 '/jobfile:%s' % self.job_file,
888 '/scheduler:%s' % self.scheduler
896 '/scheduler:%s' % self.scheduler
889 ]
897 ]
890 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
898 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
891
899
892 output = check_output([self.job_cmd]+args,
900 output = check_output([self.job_cmd]+args,
893 env=os.environ,
901 env=os.environ,
894 cwd=self.work_dir,
902 cwd=self.work_dir,
895 stderr=STDOUT
903 stderr=STDOUT
896 )
904 )
897 output = output.decode(DEFAULT_ENCODING, 'replace')
905 output = output.decode(DEFAULT_ENCODING, 'replace')
898 job_id = self.parse_job_id(output)
906 job_id = self.parse_job_id(output)
899 self.notify_start(job_id)
907 self.notify_start(job_id)
900 return job_id
908 return job_id
901
909
902 def stop(self):
910 def stop(self):
903 args = [
911 args = [
904 'cancel',
912 'cancel',
905 self.job_id,
913 self.job_id,
906 '/scheduler:%s' % self.scheduler
914 '/scheduler:%s' % self.scheduler
907 ]
915 ]
908 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
916 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
909 try:
917 try:
910 output = check_output([self.job_cmd]+args,
918 output = check_output([self.job_cmd]+args,
911 env=os.environ,
919 env=os.environ,
912 cwd=self.work_dir,
920 cwd=self.work_dir,
913 stderr=STDOUT
921 stderr=STDOUT
914 )
922 )
915 output = output.decode(DEFAULT_ENCODING, 'replace')
923 output = output.decode(DEFAULT_ENCODING, 'replace')
916 except:
924 except:
917 output = u'The job already appears to be stopped: %r' % self.job_id
925 output = u'The job already appears to be stopped: %r' % self.job_id
918 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
926 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
919 return output
927 return output
920
928
921
929
922 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
930 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
923
931
924 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
932 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
925 help="WinHPC xml job file.")
933 help="WinHPC xml job file.")
926 controller_args = List([], config=False,
934 controller_args = List([], config=False,
927 help="extra args to pass to ipcontroller")
935 help="extra args to pass to ipcontroller")
928
936
929 def write_job_file(self, n):
937 def write_job_file(self, n):
930 job = IPControllerJob(parent=self)
938 job = IPControllerJob(parent=self)
931
939
932 t = IPControllerTask(parent=self)
940 t = IPControllerTask(parent=self)
933 # The tasks work directory is *not* the actual work directory of
941 # The tasks work directory is *not* the actual work directory of
934 # the controller. It is used as the base path for the stdout/stderr
942 # the controller. It is used as the base path for the stdout/stderr
935 # files that the scheduler redirects to.
943 # files that the scheduler redirects to.
936 t.work_directory = self.profile_dir
944 t.work_directory = self.profile_dir
937 # Add the profile_dir and from self.start().
945 # Add the profile_dir and from self.start().
938 t.controller_args.extend(self.cluster_args)
946 t.controller_args.extend(self.cluster_args)
939 t.controller_args.extend(self.controller_args)
947 t.controller_args.extend(self.controller_args)
940 job.add_task(t)
948 job.add_task(t)
941
949
942 self.log.debug("Writing job description file: %s", self.job_file)
950 self.log.debug("Writing job description file: %s", self.job_file)
943 job.write(self.job_file)
951 job.write(self.job_file)
944
952
945 @property
953 @property
946 def job_file(self):
954 def job_file(self):
947 return os.path.join(self.profile_dir, self.job_file_name)
955 return os.path.join(self.profile_dir, self.job_file_name)
948
956
949 def start(self):
957 def start(self):
950 """Start the controller by profile_dir."""
958 """Start the controller by profile_dir."""
951 return super(WindowsHPCControllerLauncher, self).start(1)
959 return super(WindowsHPCControllerLauncher, self).start(1)
952
960
953
961
954 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
962 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
955
963
956 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
964 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
957 help="jobfile for ipengines job")
965 help="jobfile for ipengines job")
958 engine_args = List([], config=False,
966 engine_args = List([], config=False,
959 help="extra args to pas to ipengine")
967 help="extra args to pas to ipengine")
960
968
961 def write_job_file(self, n):
969 def write_job_file(self, n):
962 job = IPEngineSetJob(parent=self)
970 job = IPEngineSetJob(parent=self)
963
971
964 for i in range(n):
972 for i in range(n):
965 t = IPEngineTask(parent=self)
973 t = IPEngineTask(parent=self)
966 # The tasks work directory is *not* the actual work directory of
974 # The tasks work directory is *not* the actual work directory of
967 # the engine. It is used as the base path for the stdout/stderr
975 # the engine. It is used as the base path for the stdout/stderr
968 # files that the scheduler redirects to.
976 # files that the scheduler redirects to.
969 t.work_directory = self.profile_dir
977 t.work_directory = self.profile_dir
970 # Add the profile_dir and from self.start().
978 # Add the profile_dir and from self.start().
971 t.engine_args.extend(self.cluster_args)
979 t.engine_args.extend(self.cluster_args)
972 t.engine_args.extend(self.engine_args)
980 t.engine_args.extend(self.engine_args)
973 job.add_task(t)
981 job.add_task(t)
974
982
975 self.log.debug("Writing job description file: %s", self.job_file)
983 self.log.debug("Writing job description file: %s", self.job_file)
976 job.write(self.job_file)
984 job.write(self.job_file)
977
985
978 @property
986 @property
979 def job_file(self):
987 def job_file(self):
980 return os.path.join(self.profile_dir, self.job_file_name)
988 return os.path.join(self.profile_dir, self.job_file_name)
981
989
982 def start(self, n):
990 def start(self, n):
983 """Start the controller by profile_dir."""
991 """Start the controller by profile_dir."""
984 return super(WindowsHPCEngineSetLauncher, self).start(n)
992 return super(WindowsHPCEngineSetLauncher, self).start(n)
985
993
986
994
987 #-----------------------------------------------------------------------------
995 #-----------------------------------------------------------------------------
988 # Batch (PBS) system launchers
996 # Batch (PBS) system launchers
989 #-----------------------------------------------------------------------------
997 #-----------------------------------------------------------------------------
990
998
991 class BatchClusterAppMixin(ClusterAppMixin):
999 class BatchClusterAppMixin(ClusterAppMixin):
992 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1000 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
993 def _profile_dir_changed(self, name, old, new):
1001 def _profile_dir_changed(self, name, old, new):
994 self.context[name] = new
1002 self.context[name] = new
995 _cluster_id_changed = _profile_dir_changed
1003 _cluster_id_changed = _profile_dir_changed
996
1004
997 def _profile_dir_default(self):
1005 def _profile_dir_default(self):
998 self.context['profile_dir'] = ''
1006 self.context['profile_dir'] = ''
999 return ''
1007 return ''
1000 def _cluster_id_default(self):
1008 def _cluster_id_default(self):
1001 self.context['cluster_id'] = ''
1009 self.context['cluster_id'] = ''
1002 return ''
1010 return ''
1003
1011
1004
1012
1005 class BatchSystemLauncher(BaseLauncher):
1013 class BatchSystemLauncher(BaseLauncher):
1006 """Launch an external process using a batch system.
1014 """Launch an external process using a batch system.
1007
1015
1008 This class is designed to work with UNIX batch systems like PBS, LSF,
1016 This class is designed to work with UNIX batch systems like PBS, LSF,
1009 GridEngine, etc. The overall model is that there are different commands
1017 GridEngine, etc. The overall model is that there are different commands
1010 like qsub, qdel, etc. that handle the starting and stopping of the process.
1018 like qsub, qdel, etc. that handle the starting and stopping of the process.
1011
1019
1012 This class also has the notion of a batch script. The ``batch_template``
1020 This class also has the notion of a batch script. The ``batch_template``
1013 attribute can be set to a string that is a template for the batch script.
1021 attribute can be set to a string that is a template for the batch script.
1014 This template is instantiated using string formatting. Thus the template can
1022 This template is instantiated using string formatting. Thus the template can
1015 use {n} fot the number of instances. Subclasses can add additional variables
1023 use {n} fot the number of instances. Subclasses can add additional variables
1016 to the template dict.
1024 to the template dict.
1017 """
1025 """
1018
1026
1019 # Subclasses must fill these in. See PBSEngineSet
1027 # Subclasses must fill these in. See PBSEngineSet
1020 submit_command = List([''], config=True,
1028 submit_command = List([''], config=True,
1021 help="The name of the command line program used to submit jobs.")
1029 help="The name of the command line program used to submit jobs.")
1022 delete_command = List([''], config=True,
1030 delete_command = List([''], config=True,
1023 help="The name of the command line program used to delete jobs.")
1031 help="The name of the command line program used to delete jobs.")
1024 job_id_regexp = CRegExp('', config=True,
1032 job_id_regexp = CRegExp('', config=True,
1025 help="""A regular expression used to get the job id from the output of the
1033 help="""A regular expression used to get the job id from the output of the
1026 submit_command.""")
1034 submit_command.""")
1027 job_id_regexp_group = Integer(0, config=True,
1035 job_id_regexp_group = Integer(0, config=True,
1028 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1036 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1029 batch_template = Unicode('', config=True,
1037 batch_template = Unicode('', config=True,
1030 help="The string that is the batch script template itself.")
1038 help="The string that is the batch script template itself.")
1031 batch_template_file = Unicode(u'', config=True,
1039 batch_template_file = Unicode(u'', config=True,
1032 help="The file that contains the batch template.")
1040 help="The file that contains the batch template.")
1033 batch_file_name = Unicode(u'batch_script', config=True,
1041 batch_file_name = Unicode(u'batch_script', config=True,
1034 help="The filename of the instantiated batch script.")
1042 help="The filename of the instantiated batch script.")
1035 queue = Unicode(u'', config=True,
1043 queue = Unicode(u'', config=True,
1036 help="The PBS Queue.")
1044 help="The PBS Queue.")
1037
1045
1038 def _queue_changed(self, name, old, new):
1046 def _queue_changed(self, name, old, new):
1039 self.context[name] = new
1047 self.context[name] = new
1040
1048
1041 n = Integer(1)
1049 n = Integer(1)
1042 _n_changed = _queue_changed
1050 _n_changed = _queue_changed
1043
1051
1044 # not configurable, override in subclasses
1052 # not configurable, override in subclasses
1045 # PBS Job Array regex
1053 # PBS Job Array regex
1046 job_array_regexp = CRegExp('')
1054 job_array_regexp = CRegExp('')
1047 job_array_template = Unicode('')
1055 job_array_template = Unicode('')
1048 # PBS Queue regex
1056 # PBS Queue regex
1049 queue_regexp = CRegExp('')
1057 queue_regexp = CRegExp('')
1050 queue_template = Unicode('')
1058 queue_template = Unicode('')
1051 # The default batch template, override in subclasses
1059 # The default batch template, override in subclasses
1052 default_template = Unicode('')
1060 default_template = Unicode('')
1053 # The full path to the instantiated batch script.
1061 # The full path to the instantiated batch script.
1054 batch_file = Unicode(u'')
1062 batch_file = Unicode(u'')
1055 # the format dict used with batch_template:
1063 # the format dict used with batch_template:
1056 context = Dict()
1064 context = Dict()
1057
1065
1058 def _context_default(self):
1066 def _context_default(self):
1059 """load the default context with the default values for the basic keys
1067 """load the default context with the default values for the basic keys
1060
1068
1061 because the _trait_changed methods only load the context if they
1069 because the _trait_changed methods only load the context if they
1062 are set to something other than the default value.
1070 are set to something other than the default value.
1063 """
1071 """
1064 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1072 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1065
1073
1066 # the Formatter instance for rendering the templates:
1074 # the Formatter instance for rendering the templates:
1067 formatter = Instance(EvalFormatter, (), {})
1075 formatter = Instance(EvalFormatter, (), {})
1068
1076
1069 def find_args(self):
1077 def find_args(self):
1070 return self.submit_command + [self.batch_file]
1078 return self.submit_command + [self.batch_file]
1071
1079
1072 def __init__(self, work_dir=u'.', config=None, **kwargs):
1080 def __init__(self, work_dir=u'.', config=None, **kwargs):
1073 super(BatchSystemLauncher, self).__init__(
1081 super(BatchSystemLauncher, self).__init__(
1074 work_dir=work_dir, config=config, **kwargs
1082 work_dir=work_dir, config=config, **kwargs
1075 )
1083 )
1076 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1084 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1077
1085
1078 def parse_job_id(self, output):
1086 def parse_job_id(self, output):
1079 """Take the output of the submit command and return the job id."""
1087 """Take the output of the submit command and return the job id."""
1080 m = self.job_id_regexp.search(output)
1088 m = self.job_id_regexp.search(output)
1081 if m is not None:
1089 if m is not None:
1082 job_id = m.group(self.job_id_regexp_group)
1090 job_id = m.group(self.job_id_regexp_group)
1083 else:
1091 else:
1084 raise LauncherError("Job id couldn't be determined: %s" % output)
1092 raise LauncherError("Job id couldn't be determined: %s" % output)
1085 self.job_id = job_id
1093 self.job_id = job_id
1086 self.log.info('Job submitted with job id: %r', job_id)
1094 self.log.info('Job submitted with job id: %r', job_id)
1087 return job_id
1095 return job_id
1088
1096
1089 def write_batch_script(self, n):
1097 def write_batch_script(self, n):
1090 """Instantiate and write the batch script to the work_dir."""
1098 """Instantiate and write the batch script to the work_dir."""
1091 self.n = n
1099 self.n = n
1092 # first priority is batch_template if set
1100 # first priority is batch_template if set
1093 if self.batch_template_file and not self.batch_template:
1101 if self.batch_template_file and not self.batch_template:
1094 # second priority is batch_template_file
1102 # second priority is batch_template_file
1095 with open(self.batch_template_file) as f:
1103 with open(self.batch_template_file) as f:
1096 self.batch_template = f.read()
1104 self.batch_template = f.read()
1097 if not self.batch_template:
1105 if not self.batch_template:
1098 # third (last) priority is default_template
1106 # third (last) priority is default_template
1099 self.batch_template = self.default_template
1107 self.batch_template = self.default_template
1100 # add jobarray or queue lines to user-specified template
1108 # add jobarray or queue lines to user-specified template
1101 # note that this is *only* when user did not specify a template.
1109 # note that this is *only* when user did not specify a template.
1102 self._insert_queue_in_script()
1110 self._insert_queue_in_script()
1103 self._insert_job_array_in_script()
1111 self._insert_job_array_in_script()
1104 script_as_string = self.formatter.format(self.batch_template, **self.context)
1112 script_as_string = self.formatter.format(self.batch_template, **self.context)
1105 self.log.debug('Writing batch script: %s', self.batch_file)
1113 self.log.debug('Writing batch script: %s', self.batch_file)
1106 with open(self.batch_file, 'w') as f:
1114 with open(self.batch_file, 'w') as f:
1107 f.write(script_as_string)
1115 f.write(script_as_string)
1108 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1116 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1109
1117
1110 def _insert_queue_in_script(self):
1118 def _insert_queue_in_script(self):
1111 """Inserts a queue if required into the batch script.
1119 """Inserts a queue if required into the batch script.
1112 """
1120 """
1113 if self.queue and not self.queue_regexp.search(self.batch_template):
1121 if self.queue and not self.queue_regexp.search(self.batch_template):
1114 self.log.debug("adding PBS queue settings to batch script")
1122 self.log.debug("adding PBS queue settings to batch script")
1115 firstline, rest = self.batch_template.split('\n',1)
1123 firstline, rest = self.batch_template.split('\n',1)
1116 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1124 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1117
1125
1118 def _insert_job_array_in_script(self):
1126 def _insert_job_array_in_script(self):
1119 """Inserts a job array if required into the batch script.
1127 """Inserts a job array if required into the batch script.
1120 """
1128 """
1121 if not self.job_array_regexp.search(self.batch_template):
1129 if not self.job_array_regexp.search(self.batch_template):
1122 self.log.debug("adding job array settings to batch script")
1130 self.log.debug("adding job array settings to batch script")
1123 firstline, rest = self.batch_template.split('\n',1)
1131 firstline, rest = self.batch_template.split('\n',1)
1124 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1132 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1125
1133
1126 def start(self, n):
1134 def start(self, n):
1127 """Start n copies of the process using a batch system."""
1135 """Start n copies of the process using a batch system."""
1128 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1136 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1129 # Here we save profile_dir in the context so they
1137 # Here we save profile_dir in the context so they
1130 # can be used in the batch script template as {profile_dir}
1138 # can be used in the batch script template as {profile_dir}
1131 self.write_batch_script(n)
1139 self.write_batch_script(n)
1132 output = check_output(self.args, env=os.environ)
1140 output = check_output(self.args, env=os.environ)
1133 output = output.decode(DEFAULT_ENCODING, 'replace')
1141 output = output.decode(DEFAULT_ENCODING, 'replace')
1134
1142
1135 job_id = self.parse_job_id(output)
1143 job_id = self.parse_job_id(output)
1136 self.notify_start(job_id)
1144 self.notify_start(job_id)
1137 return job_id
1145 return job_id
1138
1146
1139 def stop(self):
1147 def stop(self):
1140 try:
1148 try:
1141 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1149 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1142 stdout=PIPE, stderr=PIPE)
1150 stdout=PIPE, stderr=PIPE)
1143 out, err = p.communicate()
1151 out, err = p.communicate()
1144 output = out + err
1152 output = out + err
1145 except:
1153 except:
1146 self.log.exception("Problem stopping cluster with command: %s" %
1154 self.log.exception("Problem stopping cluster with command: %s" %
1147 (self.delete_command + [self.job_id]))
1155 (self.delete_command + [self.job_id]))
1148 output = ""
1156 output = ""
1149 output = output.decode(DEFAULT_ENCODING, 'replace')
1157 output = output.decode(DEFAULT_ENCODING, 'replace')
1150 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1158 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1151 return output
1159 return output
1152
1160
1153
1161
1154 class PBSLauncher(BatchSystemLauncher):
1162 class PBSLauncher(BatchSystemLauncher):
1155 """A BatchSystemLauncher subclass for PBS."""
1163 """A BatchSystemLauncher subclass for PBS."""
1156
1164
1157 submit_command = List(['qsub'], config=True,
1165 submit_command = List(['qsub'], config=True,
1158 help="The PBS submit command ['qsub']")
1166 help="The PBS submit command ['qsub']")
1159 delete_command = List(['qdel'], config=True,
1167 delete_command = List(['qdel'], config=True,
1160 help="The PBS delete command ['qsub']")
1168 help="The PBS delete command ['qsub']")
1161 job_id_regexp = CRegExp(r'\d+', config=True,
1169 job_id_regexp = CRegExp(r'\d+', config=True,
1162 help="Regular expresion for identifying the job ID [r'\d+']")
1170 help="Regular expresion for identifying the job ID [r'\d+']")
1163
1171
1164 batch_file = Unicode(u'')
1172 batch_file = Unicode(u'')
1165 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1173 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1166 job_array_template = Unicode('#PBS -t 1-{n}')
1174 job_array_template = Unicode('#PBS -t 1-{n}')
1167 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1175 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1168 queue_template = Unicode('#PBS -q {queue}')
1176 queue_template = Unicode('#PBS -q {queue}')
1169
1177
1170
1178
1171 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1179 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1172 """Launch a controller using PBS."""
1180 """Launch a controller using PBS."""
1173
1181
1174 batch_file_name = Unicode(u'pbs_controller', config=True,
1182 batch_file_name = Unicode(u'pbs_controller', config=True,
1175 help="batch file name for the controller job.")
1183 help="batch file name for the controller job.")
1176 default_template= Unicode("""#!/bin/sh
1184 default_template= Unicode("""#!/bin/sh
1177 #PBS -V
1185 #PBS -V
1178 #PBS -N ipcontroller
1186 #PBS -N ipcontroller
1179 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1180 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1181
1189
1182 def start(self):
1190 def start(self):
1183 """Start the controller by profile or profile_dir."""
1191 """Start the controller by profile or profile_dir."""
1184 return super(PBSControllerLauncher, self).start(1)
1192 return super(PBSControllerLauncher, self).start(1)
1185
1193
1186
1194
1187 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1188 """Launch Engines using PBS"""
1196 """Launch Engines using PBS"""
1189 batch_file_name = Unicode(u'pbs_engines', config=True,
1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1190 help="batch file name for the engine(s) job.")
1198 help="batch file name for the engine(s) job.")
1191 default_template= Unicode(u"""#!/bin/sh
1199 default_template= Unicode(u"""#!/bin/sh
1192 #PBS -V
1200 #PBS -V
1193 #PBS -N ipengine
1201 #PBS -N ipengine
1194 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1195 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1196
1204
1197
1205
1198 #SGE is very similar to PBS
1206 #SGE is very similar to PBS
1199
1207
1200 class SGELauncher(PBSLauncher):
1208 class SGELauncher(PBSLauncher):
1201 """Sun GridEngine is a PBS clone with slightly different syntax"""
1209 """Sun GridEngine is a PBS clone with slightly different syntax"""
1202 job_array_regexp = CRegExp('#\$\W+\-t')
1210 job_array_regexp = CRegExp('#\$\W+\-t')
1203 job_array_template = Unicode('#$ -t 1-{n}')
1211 job_array_template = Unicode('#$ -t 1-{n}')
1204 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1212 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1205 queue_template = Unicode('#$ -q {queue}')
1213 queue_template = Unicode('#$ -q {queue}')
1206
1214
1207
1215
1208 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1216 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1209 """Launch a controller using SGE."""
1217 """Launch a controller using SGE."""
1210
1218
1211 batch_file_name = Unicode(u'sge_controller', config=True,
1219 batch_file_name = Unicode(u'sge_controller', config=True,
1212 help="batch file name for the ipontroller job.")
1220 help="batch file name for the ipontroller job.")
1213 default_template= Unicode(u"""#$ -V
1221 default_template= Unicode(u"""#$ -V
1214 #$ -S /bin/sh
1222 #$ -S /bin/sh
1215 #$ -N ipcontroller
1223 #$ -N ipcontroller
1216 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1224 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1217 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1225 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1218
1226
1219 def start(self):
1227 def start(self):
1220 """Start the controller by profile or profile_dir."""
1228 """Start the controller by profile or profile_dir."""
1221 return super(SGEControllerLauncher, self).start(1)
1229 return super(SGEControllerLauncher, self).start(1)
1222
1230
1223
1231
1224 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1232 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1225 """Launch Engines with SGE"""
1233 """Launch Engines with SGE"""
1226 batch_file_name = Unicode(u'sge_engines', config=True,
1234 batch_file_name = Unicode(u'sge_engines', config=True,
1227 help="batch file name for the engine(s) job.")
1235 help="batch file name for the engine(s) job.")
1228 default_template = Unicode("""#$ -V
1236 default_template = Unicode("""#$ -V
1229 #$ -S /bin/sh
1237 #$ -S /bin/sh
1230 #$ -N ipengine
1238 #$ -N ipengine
1231 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1239 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1232 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1240 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1233
1241
1234
1242
1235 # LSF launchers
1243 # LSF launchers
1236
1244
1237 class LSFLauncher(BatchSystemLauncher):
1245 class LSFLauncher(BatchSystemLauncher):
1238 """A BatchSystemLauncher subclass for LSF."""
1246 """A BatchSystemLauncher subclass for LSF."""
1239
1247
1240 submit_command = List(['bsub'], config=True,
1248 submit_command = List(['bsub'], config=True,
1241 help="The PBS submit command ['bsub']")
1249 help="The PBS submit command ['bsub']")
1242 delete_command = List(['bkill'], config=True,
1250 delete_command = List(['bkill'], config=True,
1243 help="The PBS delete command ['bkill']")
1251 help="The PBS delete command ['bkill']")
1244 job_id_regexp = CRegExp(r'\d+', config=True,
1252 job_id_regexp = CRegExp(r'\d+', config=True,
1245 help="Regular expresion for identifying the job ID [r'\d+']")
1253 help="Regular expresion for identifying the job ID [r'\d+']")
1246
1254
1247 batch_file = Unicode(u'')
1255 batch_file = Unicode(u'')
1248 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1256 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1249 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1257 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1250 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1258 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1251 queue_template = Unicode('#BSUB -q {queue}')
1259 queue_template = Unicode('#BSUB -q {queue}')
1252
1260
1253 def start(self, n):
1261 def start(self, n):
1254 """Start n copies of the process using LSF batch system.
1262 """Start n copies of the process using LSF batch system.
1255 This cant inherit from the base class because bsub expects
1263 This cant inherit from the base class because bsub expects
1256 to be piped a shell script in order to honor the #BSUB directives :
1264 to be piped a shell script in order to honor the #BSUB directives :
1257 bsub < script
1265 bsub < script
1258 """
1266 """
1259 # Here we save profile_dir in the context so they
1267 # Here we save profile_dir in the context so they
1260 # can be used in the batch script template as {profile_dir}
1268 # can be used in the batch script template as {profile_dir}
1261 self.write_batch_script(n)
1269 self.write_batch_script(n)
1262 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1270 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1263 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1271 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1264 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1272 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1265 output,err = p.communicate()
1273 output,err = p.communicate()
1266 output = output.decode(DEFAULT_ENCODING, 'replace')
1274 output = output.decode(DEFAULT_ENCODING, 'replace')
1267 job_id = self.parse_job_id(output)
1275 job_id = self.parse_job_id(output)
1268 self.notify_start(job_id)
1276 self.notify_start(job_id)
1269 return job_id
1277 return job_id
1270
1278
1271
1279
1272 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1280 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1273 """Launch a controller using LSF."""
1281 """Launch a controller using LSF."""
1274
1282
1275 batch_file_name = Unicode(u'lsf_controller', config=True,
1283 batch_file_name = Unicode(u'lsf_controller', config=True,
1276 help="batch file name for the controller job.")
1284 help="batch file name for the controller job.")
1277 default_template= Unicode("""#!/bin/sh
1285 default_template= Unicode("""#!/bin/sh
1278 #BSUB -J ipcontroller
1286 #BSUB -J ipcontroller
1279 #BSUB -oo ipcontroller.o.%%J
1287 #BSUB -oo ipcontroller.o.%%J
1280 #BSUB -eo ipcontroller.e.%%J
1288 #BSUB -eo ipcontroller.e.%%J
1281 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1289 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1282 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1290 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1283
1291
1284 def start(self):
1292 def start(self):
1285 """Start the controller by profile or profile_dir."""
1293 """Start the controller by profile or profile_dir."""
1286 return super(LSFControllerLauncher, self).start(1)
1294 return super(LSFControllerLauncher, self).start(1)
1287
1295
1288
1296
1289 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1297 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1290 """Launch Engines using LSF"""
1298 """Launch Engines using LSF"""
1291 batch_file_name = Unicode(u'lsf_engines', config=True,
1299 batch_file_name = Unicode(u'lsf_engines', config=True,
1292 help="batch file name for the engine(s) job.")
1300 help="batch file name for the engine(s) job.")
1293 default_template= Unicode(u"""#!/bin/sh
1301 default_template= Unicode(u"""#!/bin/sh
1294 #BSUB -oo ipengine.o.%%J
1302 #BSUB -oo ipengine.o.%%J
1295 #BSUB -eo ipengine.e.%%J
1303 #BSUB -eo ipengine.e.%%J
1296 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1304 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1297 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1305 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1298
1306
1299
1307
1300
1308
1301 class HTCondorLauncher(BatchSystemLauncher):
1309 class HTCondorLauncher(BatchSystemLauncher):
1302 """A BatchSystemLauncher subclass for HTCondor.
1310 """A BatchSystemLauncher subclass for HTCondor.
1303
1311
1304 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1312 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1305 that the python instance but otherwise is very similar to PBS. This is because
1313 that the python instance but otherwise is very similar to PBS. This is because
1306 HTCondor destroys sys.executable when launching remote processes - a launched
1314 HTCondor destroys sys.executable when launching remote processes - a launched
1307 python process depends on sys.executable to effectively evaluate its
1315 python process depends on sys.executable to effectively evaluate its
1308 module search paths. Without it, regardless of which python interpreter you launch
1316 module search paths. Without it, regardless of which python interpreter you launch
1309 you will get the to built in module search paths.
1317 you will get the to built in module search paths.
1310
1318
1311 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1319 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1312 this - the mechanism of shebanged scripts means that the python binary will be
1320 this - the mechanism of shebanged scripts means that the python binary will be
1313 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1321 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1314 scripts on the remote node*. This means you need to take care that:
1322 scripts on the remote node*. This means you need to take care that:
1315
1323
1316 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1324 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1317 of the python environment you wish to execute code in having top precedence.
1325 of the python environment you wish to execute code in having top precedence.
1318 b. This functionality is untested on Windows.
1326 b. This functionality is untested on Windows.
1319
1327
1320 If you need different behavior, consider making you own template.
1328 If you need different behavior, consider making you own template.
1321 """
1329 """
1322
1330
1323 submit_command = List(['condor_submit'], config=True,
1331 submit_command = List(['condor_submit'], config=True,
1324 help="The HTCondor submit command ['condor_submit']")
1332 help="The HTCondor submit command ['condor_submit']")
1325 delete_command = List(['condor_rm'], config=True,
1333 delete_command = List(['condor_rm'], config=True,
1326 help="The HTCondor delete command ['condor_rm']")
1334 help="The HTCondor delete command ['condor_rm']")
1327 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1335 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1328 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1336 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1329 job_id_regexp_group = Integer(1, config=True,
1337 job_id_regexp_group = Integer(1, config=True,
1330 help="""The group we wish to match in job_id_regexp [1]""")
1338 help="""The group we wish to match in job_id_regexp [1]""")
1331
1339
1332 job_array_regexp = CRegExp('queue\W+\$')
1340 job_array_regexp = CRegExp('queue\W+\$')
1333 job_array_template = Unicode('queue {n}')
1341 job_array_template = Unicode('queue {n}')
1334
1342
1335
1343
1336 def _insert_job_array_in_script(self):
1344 def _insert_job_array_in_script(self):
1337 """Inserts a job array if required into the batch script.
1345 """Inserts a job array if required into the batch script.
1338 """
1346 """
1339 if not self.job_array_regexp.search(self.batch_template):
1347 if not self.job_array_regexp.search(self.batch_template):
1340 self.log.debug("adding job array settings to batch script")
1348 self.log.debug("adding job array settings to batch script")
1341 #HTCondor requires that the job array goes at the bottom of the script
1349 #HTCondor requires that the job array goes at the bottom of the script
1342 self.batch_template = '\n'.join([self.batch_template,
1350 self.batch_template = '\n'.join([self.batch_template,
1343 self.job_array_template])
1351 self.job_array_template])
1344
1352
1345 def _insert_queue_in_script(self):
1353 def _insert_queue_in_script(self):
1346 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1354 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1347 specified in the script.
1355 specified in the script.
1348 """
1356 """
1349 pass
1357 pass
1350
1358
1351
1359
1352 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1360 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1353 """Launch a controller using HTCondor."""
1361 """Launch a controller using HTCondor."""
1354
1362
1355 batch_file_name = Unicode(u'htcondor_controller', config=True,
1363 batch_file_name = Unicode(u'htcondor_controller', config=True,
1356 help="batch file name for the controller job.")
1364 help="batch file name for the controller job.")
1357 default_template = Unicode(r"""
1365 default_template = Unicode(r"""
1358 universe = vanilla
1366 universe = vanilla
1359 executable = ipcontroller
1367 executable = ipcontroller
1360 # by default we expect a shared file system
1368 # by default we expect a shared file system
1361 transfer_executable = False
1369 transfer_executable = False
1362 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1370 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1363 """)
1371 """)
1364
1372
1365 def start(self):
1373 def start(self):
1366 """Start the controller by profile or profile_dir."""
1374 """Start the controller by profile or profile_dir."""
1367 return super(HTCondorControllerLauncher, self).start(1)
1375 return super(HTCondorControllerLauncher, self).start(1)
1368
1376
1369
1377
1370 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1378 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1371 """Launch Engines using HTCondor"""
1379 """Launch Engines using HTCondor"""
1372 batch_file_name = Unicode(u'htcondor_engines', config=True,
1380 batch_file_name = Unicode(u'htcondor_engines', config=True,
1373 help="batch file name for the engine(s) job.")
1381 help="batch file name for the engine(s) job.")
1374 default_template = Unicode("""
1382 default_template = Unicode("""
1375 universe = vanilla
1383 universe = vanilla
1376 executable = ipengine
1384 executable = ipengine
1377 # by default we expect a shared file system
1385 # by default we expect a shared file system
1378 transfer_executable = False
1386 transfer_executable = False
1379 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1387 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1380 """)
1388 """)
1381
1389
1382
1390
1383 #-----------------------------------------------------------------------------
1391 #-----------------------------------------------------------------------------
1384 # A launcher for ipcluster itself!
1392 # A launcher for ipcluster itself!
1385 #-----------------------------------------------------------------------------
1393 #-----------------------------------------------------------------------------
1386
1394
1387
1395
1388 class IPClusterLauncher(LocalProcessLauncher):
1396 class IPClusterLauncher(LocalProcessLauncher):
1389 """Launch the ipcluster program in an external process."""
1397 """Launch the ipcluster program in an external process."""
1390
1398
1391 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1392 help="Popen command for ipcluster")
1400 help="Popen command for ipcluster")
1393 ipcluster_args = List(
1401 ipcluster_args = List(
1394 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1395 help="Command line arguments to pass to ipcluster.")
1403 help="Command line arguments to pass to ipcluster.")
1396 ipcluster_subcommand = Unicode('start')
1404 ipcluster_subcommand = Unicode('start')
1397 profile = Unicode('default')
1405 profile = Unicode('default')
1398 n = Integer(2)
1406 n = Integer(2)
1399
1407
1400 def find_args(self):
1408 def find_args(self):
1401 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1402 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1403 self.ipcluster_args
1411 self.ipcluster_args
1404
1412
1405 def start(self):
1413 def start(self):
1406 return super(IPClusterLauncher, self).start()
1414 return super(IPClusterLauncher, self).start()
1407
1415
1408 #-----------------------------------------------------------------------------
1416 #-----------------------------------------------------------------------------
1409 # Collections of launchers
1417 # Collections of launchers
1410 #-----------------------------------------------------------------------------
1418 #-----------------------------------------------------------------------------
1411
1419
1412 local_launchers = [
1420 local_launchers = [
1413 LocalControllerLauncher,
1421 LocalControllerLauncher,
1414 LocalEngineLauncher,
1422 LocalEngineLauncher,
1415 LocalEngineSetLauncher,
1423 LocalEngineSetLauncher,
1416 ]
1424 ]
1417 mpi_launchers = [
1425 mpi_launchers = [
1418 MPILauncher,
1426 MPILauncher,
1419 MPIControllerLauncher,
1427 MPIControllerLauncher,
1420 MPIEngineSetLauncher,
1428 MPIEngineSetLauncher,
1421 ]
1429 ]
1422 ssh_launchers = [
1430 ssh_launchers = [
1423 SSHLauncher,
1431 SSHLauncher,
1424 SSHControllerLauncher,
1432 SSHControllerLauncher,
1425 SSHEngineLauncher,
1433 SSHEngineLauncher,
1426 SSHEngineSetLauncher,
1434 SSHEngineSetLauncher,
1427 SSHProxyEngineSetLauncher,
1435 SSHProxyEngineSetLauncher,
1428 ]
1436 ]
1429 winhpc_launchers = [
1437 winhpc_launchers = [
1430 WindowsHPCLauncher,
1438 WindowsHPCLauncher,
1431 WindowsHPCControllerLauncher,
1439 WindowsHPCControllerLauncher,
1432 WindowsHPCEngineSetLauncher,
1440 WindowsHPCEngineSetLauncher,
1433 ]
1441 ]
1434 pbs_launchers = [
1442 pbs_launchers = [
1435 PBSLauncher,
1443 PBSLauncher,
1436 PBSControllerLauncher,
1444 PBSControllerLauncher,
1437 PBSEngineSetLauncher,
1445 PBSEngineSetLauncher,
1438 ]
1446 ]
1439 sge_launchers = [
1447 sge_launchers = [
1440 SGELauncher,
1448 SGELauncher,
1441 SGEControllerLauncher,
1449 SGEControllerLauncher,
1442 SGEEngineSetLauncher,
1450 SGEEngineSetLauncher,
1443 ]
1451 ]
1444 lsf_launchers = [
1452 lsf_launchers = [
1445 LSFLauncher,
1453 LSFLauncher,
1446 LSFControllerLauncher,
1454 LSFControllerLauncher,
1447 LSFEngineSetLauncher,
1455 LSFEngineSetLauncher,
1448 ]
1456 ]
1449 htcondor_launchers = [
1457 htcondor_launchers = [
1450 HTCondorLauncher,
1458 HTCondorLauncher,
1451 HTCondorControllerLauncher,
1459 HTCondorControllerLauncher,
1452 HTCondorEngineSetLauncher,
1460 HTCondorEngineSetLauncher,
1453 ]
1461 ]
1454 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1455 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
1463 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now