##// END OF EJS Templates
set default permission to 775 with makedirs
MinRK -
Show More
@@ -1,1463 +1,1463 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67 from IPython.utils.py3compat import iteritems, itervalues
67 from IPython.utils.py3compat import iteritems, itervalues
68
68
69 from .win32support import forward_read_events
69 from .win32support import forward_read_events
70
70
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72
72
73 WINDOWS = os.name == 'nt'
73 WINDOWS = os.name == 'nt'
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # Paths to the kernel apps
76 # Paths to the kernel apps
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80
80
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82
82
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84
84
85 if WINDOWS and sys.version_info < (3,):
85 if WINDOWS and sys.version_info < (3,):
86 # `python -m package` doesn't work on Windows Python 2,
86 # `python -m package` doesn't work on Windows Python 2,
87 # but `python -m module` does.
87 # but `python -m module` does.
88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
90
90
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92 # Base launchers and errors
92 # Base launchers and errors
93 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
94
94
95 class LauncherError(Exception):
95 class LauncherError(Exception):
96 pass
96 pass
97
97
98
98
99 class ProcessStateError(LauncherError):
99 class ProcessStateError(LauncherError):
100 pass
100 pass
101
101
102
102
103 class UnknownStatus(LauncherError):
103 class UnknownStatus(LauncherError):
104 pass
104 pass
105
105
106
106
107 class BaseLauncher(LoggingConfigurable):
107 class BaseLauncher(LoggingConfigurable):
108 """An asbtraction for starting, stopping and signaling a process."""
108 """An asbtraction for starting, stopping and signaling a process."""
109
109
110 # In all of the launchers, the work_dir is where child processes will be
110 # In all of the launchers, the work_dir is where child processes will be
111 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # run. This will usually be the profile_dir, but may not be. any work_dir
112 # passed into the __init__ method will override the config value.
112 # passed into the __init__ method will override the config value.
113 # This should not be used to set the work_dir for the actual engine
113 # This should not be used to set the work_dir for the actual engine
114 # and controller. Instead, use their own config files or the
114 # and controller. Instead, use their own config files or the
115 # controller_args, engine_args attributes of the launchers to add
115 # controller_args, engine_args attributes of the launchers to add
116 # the work_dir option.
116 # the work_dir option.
117 work_dir = Unicode(u'.')
117 work_dir = Unicode(u'.')
118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
119
119
120 start_data = Any()
120 start_data = Any()
121 stop_data = Any()
121 stop_data = Any()
122
122
123 def _loop_default(self):
123 def _loop_default(self):
124 return ioloop.IOLoop.instance()
124 return ioloop.IOLoop.instance()
125
125
126 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 def __init__(self, work_dir=u'.', config=None, **kwargs):
127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
128 self.state = 'before' # can be before, running, after
128 self.state = 'before' # can be before, running, after
129 self.stop_callbacks = []
129 self.stop_callbacks = []
130 self.start_data = None
130 self.start_data = None
131 self.stop_data = None
131 self.stop_data = None
132
132
133 @property
133 @property
134 def args(self):
134 def args(self):
135 """A list of cmd and args that will be used to start the process.
135 """A list of cmd and args that will be used to start the process.
136
136
137 This is what is passed to :func:`spawnProcess` and the first element
137 This is what is passed to :func:`spawnProcess` and the first element
138 will be the process name.
138 will be the process name.
139 """
139 """
140 return self.find_args()
140 return self.find_args()
141
141
142 def find_args(self):
142 def find_args(self):
143 """The ``.args`` property calls this to find the args list.
143 """The ``.args`` property calls this to find the args list.
144
144
145 Subcommand should implement this to construct the cmd and args.
145 Subcommand should implement this to construct the cmd and args.
146 """
146 """
147 raise NotImplementedError('find_args must be implemented in a subclass')
147 raise NotImplementedError('find_args must be implemented in a subclass')
148
148
149 @property
149 @property
150 def arg_str(self):
150 def arg_str(self):
151 """The string form of the program arguments."""
151 """The string form of the program arguments."""
152 return ' '.join(self.args)
152 return ' '.join(self.args)
153
153
154 @property
154 @property
155 def running(self):
155 def running(self):
156 """Am I running."""
156 """Am I running."""
157 if self.state == 'running':
157 if self.state == 'running':
158 return True
158 return True
159 else:
159 else:
160 return False
160 return False
161
161
162 def start(self):
162 def start(self):
163 """Start the process."""
163 """Start the process."""
164 raise NotImplementedError('start must be implemented in a subclass')
164 raise NotImplementedError('start must be implemented in a subclass')
165
165
166 def stop(self):
166 def stop(self):
167 """Stop the process and notify observers of stopping.
167 """Stop the process and notify observers of stopping.
168
168
169 This method will return None immediately.
169 This method will return None immediately.
170 To observe the actual process stopping, see :meth:`on_stop`.
170 To observe the actual process stopping, see :meth:`on_stop`.
171 """
171 """
172 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
173
173
174 def on_stop(self, f):
174 def on_stop(self, f):
175 """Register a callback to be called with this Launcher's stop_data
175 """Register a callback to be called with this Launcher's stop_data
176 when the process actually finishes.
176 when the process actually finishes.
177 """
177 """
178 if self.state=='after':
178 if self.state=='after':
179 return f(self.stop_data)
179 return f(self.stop_data)
180 else:
180 else:
181 self.stop_callbacks.append(f)
181 self.stop_callbacks.append(f)
182
182
183 def notify_start(self, data):
183 def notify_start(self, data):
184 """Call this to trigger startup actions.
184 """Call this to trigger startup actions.
185
185
186 This logs the process startup and sets the state to 'running'. It is
186 This logs the process startup and sets the state to 'running'. It is
187 a pass-through so it can be used as a callback.
187 a pass-through so it can be used as a callback.
188 """
188 """
189
189
190 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.log.debug('Process %r started: %r', self.args[0], data)
191 self.start_data = data
191 self.start_data = data
192 self.state = 'running'
192 self.state = 'running'
193 return data
193 return data
194
194
195 def notify_stop(self, data):
195 def notify_stop(self, data):
196 """Call this to trigger process stop actions.
196 """Call this to trigger process stop actions.
197
197
198 This logs the process stopping and sets the state to 'after'. Call
198 This logs the process stopping and sets the state to 'after'. Call
199 this to trigger callbacks registered via :meth:`on_stop`."""
199 this to trigger callbacks registered via :meth:`on_stop`."""
200
200
201 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.log.debug('Process %r stopped: %r', self.args[0], data)
202 self.stop_data = data
202 self.stop_data = data
203 self.state = 'after'
203 self.state = 'after'
204 for i in range(len(self.stop_callbacks)):
204 for i in range(len(self.stop_callbacks)):
205 d = self.stop_callbacks.pop()
205 d = self.stop_callbacks.pop()
206 d(data)
206 d(data)
207 return data
207 return data
208
208
209 def signal(self, sig):
209 def signal(self, sig):
210 """Signal the process.
210 """Signal the process.
211
211
212 Parameters
212 Parameters
213 ----------
213 ----------
214 sig : str or int
214 sig : str or int
215 'KILL', 'INT', etc., or any signal number
215 'KILL', 'INT', etc., or any signal number
216 """
216 """
217 raise NotImplementedError('signal must be implemented in a subclass')
217 raise NotImplementedError('signal must be implemented in a subclass')
218
218
219 class ClusterAppMixin(HasTraits):
219 class ClusterAppMixin(HasTraits):
220 """MixIn for cluster args as traits"""
220 """MixIn for cluster args as traits"""
221 profile_dir=Unicode('')
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
222 cluster_id=Unicode('')
223
223
224 @property
224 @property
225 def cluster_args(self):
225 def cluster_args(self):
226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
227
227
228 class ControllerMixin(ClusterAppMixin):
228 class ControllerMixin(ClusterAppMixin):
229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
230 help="""Popen command to launch ipcontroller.""")
230 help="""Popen command to launch ipcontroller.""")
231 # Command line arguments to ipcontroller.
231 # Command line arguments to ipcontroller.
232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
233 help="""command-line args to pass to ipcontroller""")
233 help="""command-line args to pass to ipcontroller""")
234
234
235 class EngineMixin(ClusterAppMixin):
235 class EngineMixin(ClusterAppMixin):
236 engine_cmd = List(ipengine_cmd_argv, config=True,
236 engine_cmd = List(ipengine_cmd_argv, config=True,
237 help="""command to launch the Engine.""")
237 help="""command to launch the Engine.""")
238 # Command line arguments for ipengine.
238 # Command line arguments for ipengine.
239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
240 help="command-line arguments to pass to ipengine"
240 help="command-line arguments to pass to ipengine"
241 )
241 )
242
242
243
243
244 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
245 # Local process launchers
245 # Local process launchers
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247
247
248
248
249 class LocalProcessLauncher(BaseLauncher):
249 class LocalProcessLauncher(BaseLauncher):
250 """Start and stop an external process in an asynchronous manner.
250 """Start and stop an external process in an asynchronous manner.
251
251
252 This will launch the external process with a working directory of
252 This will launch the external process with a working directory of
253 ``self.work_dir``.
253 ``self.work_dir``.
254 """
254 """
255
255
256 # This is used to to construct self.args, which is passed to
256 # This is used to to construct self.args, which is passed to
257 # spawnProcess.
257 # spawnProcess.
258 cmd_and_args = List([])
258 cmd_and_args = List([])
259 poll_frequency = Integer(100) # in ms
259 poll_frequency = Integer(100) # in ms
260
260
261 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 def __init__(self, work_dir=u'.', config=None, **kwargs):
262 super(LocalProcessLauncher, self).__init__(
262 super(LocalProcessLauncher, self).__init__(
263 work_dir=work_dir, config=config, **kwargs
263 work_dir=work_dir, config=config, **kwargs
264 )
264 )
265 self.process = None
265 self.process = None
266 self.poller = None
266 self.poller = None
267
267
268 def find_args(self):
268 def find_args(self):
269 return self.cmd_and_args
269 return self.cmd_and_args
270
270
271 def start(self):
271 def start(self):
272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
273 if self.state == 'before':
273 if self.state == 'before':
274 self.process = Popen(self.args,
274 self.process = Popen(self.args,
275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 env=os.environ,
276 env=os.environ,
277 cwd=self.work_dir
277 cwd=self.work_dir
278 )
278 )
279 if WINDOWS:
279 if WINDOWS:
280 self.stdout = forward_read_events(self.process.stdout)
280 self.stdout = forward_read_events(self.process.stdout)
281 self.stderr = forward_read_events(self.process.stderr)
281 self.stderr = forward_read_events(self.process.stderr)
282 else:
282 else:
283 self.stdout = self.process.stdout.fileno()
283 self.stdout = self.process.stdout.fileno()
284 self.stderr = self.process.stderr.fileno()
284 self.stderr = self.process.stderr.fileno()
285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller.start()
288 self.poller.start()
289 self.notify_start(self.process.pid)
289 self.notify_start(self.process.pid)
290 else:
290 else:
291 s = 'The process was already started and has state: %r' % self.state
291 s = 'The process was already started and has state: %r' % self.state
292 raise ProcessStateError(s)
292 raise ProcessStateError(s)
293
293
294 def stop(self):
294 def stop(self):
295 return self.interrupt_then_kill()
295 return self.interrupt_then_kill()
296
296
297 def signal(self, sig):
297 def signal(self, sig):
298 if self.state == 'running':
298 if self.state == 'running':
299 if WINDOWS and sig != SIGINT:
299 if WINDOWS and sig != SIGINT:
300 # use Windows tree-kill for better child cleanup
300 # use Windows tree-kill for better child cleanup
301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 else:
302 else:
303 self.process.send_signal(sig)
303 self.process.send_signal(sig)
304
304
305 def interrupt_then_kill(self, delay=2.0):
305 def interrupt_then_kill(self, delay=2.0):
306 """Send INT, wait a delay and then send KILL."""
306 """Send INT, wait a delay and then send KILL."""
307 try:
307 try:
308 self.signal(SIGINT)
308 self.signal(SIGINT)
309 except Exception:
309 except Exception:
310 self.log.debug("interrupt failed")
310 self.log.debug("interrupt failed")
311 pass
311 pass
312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer.start()
313 self.killer.start()
314
314
315 # callbacks, etc:
315 # callbacks, etc:
316
316
317 def handle_stdout(self, fd, events):
317 def handle_stdout(self, fd, events):
318 if WINDOWS:
318 if WINDOWS:
319 line = self.stdout.recv()
319 line = self.stdout.recv()
320 else:
320 else:
321 line = self.process.stdout.readline()
321 line = self.process.stdout.readline()
322 # a stopped process will be readable but return empty strings
322 # a stopped process will be readable but return empty strings
323 if line:
323 if line:
324 self.log.debug(line[:-1])
324 self.log.debug(line[:-1])
325 else:
325 else:
326 self.poll()
326 self.poll()
327
327
328 def handle_stderr(self, fd, events):
328 def handle_stderr(self, fd, events):
329 if WINDOWS:
329 if WINDOWS:
330 line = self.stderr.recv()
330 line = self.stderr.recv()
331 else:
331 else:
332 line = self.process.stderr.readline()
332 line = self.process.stderr.readline()
333 # a stopped process will be readable but return empty strings
333 # a stopped process will be readable but return empty strings
334 if line:
334 if line:
335 self.log.debug(line[:-1])
335 self.log.debug(line[:-1])
336 else:
336 else:
337 self.poll()
337 self.poll()
338
338
339 def poll(self):
339 def poll(self):
340 status = self.process.poll()
340 status = self.process.poll()
341 if status is not None:
341 if status is not None:
342 self.poller.stop()
342 self.poller.stop()
343 self.loop.remove_handler(self.stdout)
343 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stderr)
344 self.loop.remove_handler(self.stderr)
345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 return status
346 return status
347
347
348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 """Launch a controller as a regular external process."""
349 """Launch a controller as a regular external process."""
350
350
351 def find_args(self):
351 def find_args(self):
352 return self.controller_cmd + self.cluster_args + self.controller_args
352 return self.controller_cmd + self.cluster_args + self.controller_args
353
353
354 def start(self):
354 def start(self):
355 """Start the controller by profile_dir."""
355 """Start the controller by profile_dir."""
356 return super(LocalControllerLauncher, self).start()
356 return super(LocalControllerLauncher, self).start()
357
357
358
358
359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
360 """Launch a single engine as a regular externall process."""
360 """Launch a single engine as a regular externall process."""
361
361
362 def find_args(self):
362 def find_args(self):
363 return self.engine_cmd + self.cluster_args + self.engine_args
363 return self.engine_cmd + self.cluster_args + self.engine_args
364
364
365
365
366 class LocalEngineSetLauncher(LocalEngineLauncher):
366 class LocalEngineSetLauncher(LocalEngineLauncher):
367 """Launch a set of engines as regular external processes."""
367 """Launch a set of engines as regular external processes."""
368
368
369 delay = CFloat(0.1, config=True,
369 delay = CFloat(0.1, config=True,
370 help="""delay (in seconds) between starting each engine after the first.
370 help="""delay (in seconds) between starting each engine after the first.
371 This can help force the engines to get their ids in order, or limit
371 This can help force the engines to get their ids in order, or limit
372 process flood when starting many engines."""
372 process flood when starting many engines."""
373 )
373 )
374
374
375 # launcher class
375 # launcher class
376 launcher_class = LocalEngineLauncher
376 launcher_class = LocalEngineLauncher
377
377
378 launchers = Dict()
378 launchers = Dict()
379 stop_data = Dict()
379 stop_data = Dict()
380
380
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 super(LocalEngineSetLauncher, self).__init__(
382 super(LocalEngineSetLauncher, self).__init__(
383 work_dir=work_dir, config=config, **kwargs
383 work_dir=work_dir, config=config, **kwargs
384 )
384 )
385 self.stop_data = {}
385 self.stop_data = {}
386
386
387 def start(self, n):
387 def start(self, n):
388 """Start n engines by profile or profile_dir."""
388 """Start n engines by profile or profile_dir."""
389 dlist = []
389 dlist = []
390 for i in range(n):
390 for i in range(n):
391 if i > 0:
391 if i > 0:
392 time.sleep(self.delay)
392 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
395 )
395 )
396
396
397 # Copy the engine args over to each engine launcher.
397 # Copy the engine args over to each engine launcher.
398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
399 el.engine_args = copy.deepcopy(self.engine_args)
399 el.engine_args = copy.deepcopy(self.engine_args)
400 el.on_stop(self._notice_engine_stopped)
400 el.on_stop(self._notice_engine_stopped)
401 d = el.start()
401 d = el.start()
402 self.launchers[i] = el
402 self.launchers[i] = el
403 dlist.append(d)
403 dlist.append(d)
404 self.notify_start(dlist)
404 self.notify_start(dlist)
405 return dlist
405 return dlist
406
406
407 def find_args(self):
407 def find_args(self):
408 return ['engine set']
408 return ['engine set']
409
409
410 def signal(self, sig):
410 def signal(self, sig):
411 dlist = []
411 dlist = []
412 for el in itervalues(self.launchers):
412 for el in itervalues(self.launchers):
413 d = el.signal(sig)
413 d = el.signal(sig)
414 dlist.append(d)
414 dlist.append(d)
415 return dlist
415 return dlist
416
416
417 def interrupt_then_kill(self, delay=1.0):
417 def interrupt_then_kill(self, delay=1.0):
418 dlist = []
418 dlist = []
419 for el in itervalues(self.launchers):
419 for el in itervalues(self.launchers):
420 d = el.interrupt_then_kill(delay)
420 d = el.interrupt_then_kill(delay)
421 dlist.append(d)
421 dlist.append(d)
422 return dlist
422 return dlist
423
423
424 def stop(self):
424 def stop(self):
425 return self.interrupt_then_kill()
425 return self.interrupt_then_kill()
426
426
427 def _notice_engine_stopped(self, data):
427 def _notice_engine_stopped(self, data):
428 pid = data['pid']
428 pid = data['pid']
429 for idx,el in iteritems(self.launchers):
429 for idx,el in iteritems(self.launchers):
430 if el.process.pid == pid:
430 if el.process.pid == pid:
431 break
431 break
432 self.launchers.pop(idx)
432 self.launchers.pop(idx)
433 self.stop_data[idx] = data
433 self.stop_data[idx] = data
434 if not self.launchers:
434 if not self.launchers:
435 self.notify_stop(self.stop_data)
435 self.notify_stop(self.stop_data)
436
436
437
437
438 #-----------------------------------------------------------------------------
438 #-----------------------------------------------------------------------------
439 # MPI launchers
439 # MPI launchers
440 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
441
441
442
442
443 class MPILauncher(LocalProcessLauncher):
443 class MPILauncher(LocalProcessLauncher):
444 """Launch an external process using mpiexec."""
444 """Launch an external process using mpiexec."""
445
445
446 mpi_cmd = List(['mpiexec'], config=True,
446 mpi_cmd = List(['mpiexec'], config=True,
447 help="The mpiexec command to use in starting the process."
447 help="The mpiexec command to use in starting the process."
448 )
448 )
449 mpi_args = List([], config=True,
449 mpi_args = List([], config=True,
450 help="The command line arguments to pass to mpiexec."
450 help="The command line arguments to pass to mpiexec."
451 )
451 )
452 program = List(['date'],
452 program = List(['date'],
453 help="The program to start via mpiexec.")
453 help="The program to start via mpiexec.")
454 program_args = List([],
454 program_args = List([],
455 help="The command line argument to the program."
455 help="The command line argument to the program."
456 )
456 )
457 n = Integer(1)
457 n = Integer(1)
458
458
459 def __init__(self, *args, **kwargs):
459 def __init__(self, *args, **kwargs):
460 # deprecation for old MPIExec names:
460 # deprecation for old MPIExec names:
461 config = kwargs.get('config', {})
461 config = kwargs.get('config', {})
462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
463 deprecated = config.get(oldname)
463 deprecated = config.get(oldname)
464 if deprecated:
464 if deprecated:
465 newname = oldname.replace('MPIExec', 'MPI')
465 newname = oldname.replace('MPIExec', 'MPI')
466 config[newname].update(deprecated)
466 config[newname].update(deprecated)
467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
468
468
469 super(MPILauncher, self).__init__(*args, **kwargs)
469 super(MPILauncher, self).__init__(*args, **kwargs)
470
470
471 def find_args(self):
471 def find_args(self):
472 """Build self.args using all the fields."""
472 """Build self.args using all the fields."""
473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
474 self.program + self.program_args
474 self.program + self.program_args
475
475
476 def start(self, n):
476 def start(self, n):
477 """Start n instances of the program using mpiexec."""
477 """Start n instances of the program using mpiexec."""
478 self.n = n
478 self.n = n
479 return super(MPILauncher, self).start()
479 return super(MPILauncher, self).start()
480
480
481
481
482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
483 """Launch a controller using mpiexec."""
483 """Launch a controller using mpiexec."""
484
484
485 # alias back to *non-configurable* program[_args] for use in find_args()
485 # alias back to *non-configurable* program[_args] for use in find_args()
486 # this way all Controller/EngineSetLaunchers have the same form, rather
486 # this way all Controller/EngineSetLaunchers have the same form, rather
487 # than *some* having `program_args` and others `controller_args`
487 # than *some* having `program_args` and others `controller_args`
488 @property
488 @property
489 def program(self):
489 def program(self):
490 return self.controller_cmd
490 return self.controller_cmd
491
491
492 @property
492 @property
493 def program_args(self):
493 def program_args(self):
494 return self.cluster_args + self.controller_args
494 return self.cluster_args + self.controller_args
495
495
496 def start(self):
496 def start(self):
497 """Start the controller by profile_dir."""
497 """Start the controller by profile_dir."""
498 return super(MPIControllerLauncher, self).start(1)
498 return super(MPIControllerLauncher, self).start(1)
499
499
500
500
501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
502 """Launch engines using mpiexec"""
502 """Launch engines using mpiexec"""
503
503
504 # alias back to *non-configurable* program[_args] for use in find_args()
504 # alias back to *non-configurable* program[_args] for use in find_args()
505 # this way all Controller/EngineSetLaunchers have the same form, rather
505 # this way all Controller/EngineSetLaunchers have the same form, rather
506 # than *some* having `program_args` and others `controller_args`
506 # than *some* having `program_args` and others `controller_args`
507 @property
507 @property
508 def program(self):
508 def program(self):
509 return self.engine_cmd
509 return self.engine_cmd
510
510
511 @property
511 @property
512 def program_args(self):
512 def program_args(self):
513 return self.cluster_args + self.engine_args
513 return self.cluster_args + self.engine_args
514
514
515 def start(self, n):
515 def start(self, n):
516 """Start n engines by profile or profile_dir."""
516 """Start n engines by profile or profile_dir."""
517 self.n = n
517 self.n = n
518 return super(MPIEngineSetLauncher, self).start(n)
518 return super(MPIEngineSetLauncher, self).start(n)
519
519
520 # deprecated MPIExec names
520 # deprecated MPIExec names
521 class DeprecatedMPILauncher(object):
521 class DeprecatedMPILauncher(object):
522 def warn(self):
522 def warn(self):
523 oldname = self.__class__.__name__
523 oldname = self.__class__.__name__
524 newname = oldname.replace('MPIExec', 'MPI')
524 newname = oldname.replace('MPIExec', 'MPI')
525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
526
526
527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
528 """Deprecated, use MPILauncher"""
528 """Deprecated, use MPILauncher"""
529 def __init__(self, *args, **kwargs):
529 def __init__(self, *args, **kwargs):
530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
531 self.warn()
531 self.warn()
532
532
533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
534 """Deprecated, use MPIControllerLauncher"""
534 """Deprecated, use MPIControllerLauncher"""
535 def __init__(self, *args, **kwargs):
535 def __init__(self, *args, **kwargs):
536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
537 self.warn()
537 self.warn()
538
538
539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
540 """Deprecated, use MPIEngineSetLauncher"""
540 """Deprecated, use MPIEngineSetLauncher"""
541 def __init__(self, *args, **kwargs):
541 def __init__(self, *args, **kwargs):
542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
543 self.warn()
543 self.warn()
544
544
545
545
546 #-----------------------------------------------------------------------------
546 #-----------------------------------------------------------------------------
547 # SSH launchers
547 # SSH launchers
548 #-----------------------------------------------------------------------------
548 #-----------------------------------------------------------------------------
549
549
550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
551
551
552 class SSHLauncher(LocalProcessLauncher):
552 class SSHLauncher(LocalProcessLauncher):
553 """A minimal launcher for ssh.
553 """A minimal launcher for ssh.
554
554
555 To be useful this will probably have to be extended to use the ``sshx``
555 To be useful this will probably have to be extended to use the ``sshx``
556 idea for environment variables. There could be other things this needs
556 idea for environment variables. There could be other things this needs
557 as well.
557 as well.
558 """
558 """
559
559
560 ssh_cmd = List(['ssh'], config=True,
560 ssh_cmd = List(['ssh'], config=True,
561 help="command for starting ssh")
561 help="command for starting ssh")
562 ssh_args = List(['-tt'], config=True,
562 ssh_args = List(['-tt'], config=True,
563 help="args to pass to ssh")
563 help="args to pass to ssh")
564 scp_cmd = List(['scp'], config=True,
564 scp_cmd = List(['scp'], config=True,
565 help="command for sending files")
565 help="command for sending files")
566 program = List(['date'],
566 program = List(['date'],
567 help="Program to launch via ssh")
567 help="Program to launch via ssh")
568 program_args = List([],
568 program_args = List([],
569 help="args to pass to remote program")
569 help="args to pass to remote program")
570 hostname = Unicode('', config=True,
570 hostname = Unicode('', config=True,
571 help="hostname on which to launch the program")
571 help="hostname on which to launch the program")
572 user = Unicode('', config=True,
572 user = Unicode('', config=True,
573 help="username for ssh")
573 help="username for ssh")
574 location = Unicode('', config=True,
574 location = Unicode('', config=True,
575 help="user@hostname location for ssh in one setting")
575 help="user@hostname location for ssh in one setting")
576 to_fetch = List([], config=True,
576 to_fetch = List([], config=True,
577 help="List of (remote, local) files to fetch after starting")
577 help="List of (remote, local) files to fetch after starting")
578 to_send = List([], config=True,
578 to_send = List([], config=True,
579 help="List of (local, remote) files to send before starting")
579 help="List of (local, remote) files to send before starting")
580
580
581 def _hostname_changed(self, name, old, new):
581 def _hostname_changed(self, name, old, new):
582 if self.user:
582 if self.user:
583 self.location = u'%s@%s' % (self.user, new)
583 self.location = u'%s@%s' % (self.user, new)
584 else:
584 else:
585 self.location = new
585 self.location = new
586
586
587 def _user_changed(self, name, old, new):
587 def _user_changed(self, name, old, new):
588 self.location = u'%s@%s' % (new, self.hostname)
588 self.location = u'%s@%s' % (new, self.hostname)
589
589
590 def find_args(self):
590 def find_args(self):
591 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 return self.ssh_cmd + self.ssh_args + [self.location] + \
592 list(map(pipes.quote, self.program + self.program_args))
592 list(map(pipes.quote, self.program + self.program_args))
593
593
594 def _send_file(self, local, remote):
594 def _send_file(self, local, remote):
595 """send a single file"""
595 """send a single file"""
596 remote = "%s:%s" % (self.location, remote)
596 remote = "%s:%s" % (self.location, remote)
597 for i in range(10):
597 for i in range(10):
598 if not os.path.exists(local):
598 if not os.path.exists(local):
599 self.log.debug("waiting for %s" % local)
599 self.log.debug("waiting for %s" % local)
600 time.sleep(1)
600 time.sleep(1)
601 else:
601 else:
602 break
602 break
603 remote_dir = os.path.dirname(remote)
603 remote_dir = os.path.dirname(remote)
604 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
604 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
605 check_output(self.ssh_cmd + self.ssh_args + \
605 check_output(self.ssh_cmd + self.ssh_args + \
606 [self.location, 'mkdir', '-p', '--', remote_dir]
606 [self.location, 'mkdir', '-p', '--', remote_dir]
607 )
607 )
608 self.log.info("sending %s to %s", local, remote)
608 self.log.info("sending %s to %s", local, remote)
609 check_output(self.scp_cmd + [local, remote])
609 check_output(self.scp_cmd + [local, remote])
610
610
611 def send_files(self):
611 def send_files(self):
612 """send our files (called before start)"""
612 """send our files (called before start)"""
613 if not self.to_send:
613 if not self.to_send:
614 return
614 return
615 for local_file, remote_file in self.to_send:
615 for local_file, remote_file in self.to_send:
616 self._send_file(local_file, remote_file)
616 self._send_file(local_file, remote_file)
617
617
618 def _fetch_file(self, remote, local):
618 def _fetch_file(self, remote, local):
619 """fetch a single file"""
619 """fetch a single file"""
620 full_remote = "%s:%s" % (self.location, remote)
620 full_remote = "%s:%s" % (self.location, remote)
621 self.log.info("fetching %s from %s", local, full_remote)
621 self.log.info("fetching %s from %s", local, full_remote)
622 for i in range(10):
622 for i in range(10):
623 # wait up to 10s for remote file to exist
623 # wait up to 10s for remote file to exist
624 check = check_output(self.ssh_cmd + self.ssh_args + \
624 check = check_output(self.ssh_cmd + self.ssh_args + \
625 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
625 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
626 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
626 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
627 if check == u'no':
627 if check == u'no':
628 time.sleep(1)
628 time.sleep(1)
629 elif check == u'yes':
629 elif check == u'yes':
630 break
630 break
631 local_dir = os.path.dirname(local)
631 local_dir = os.path.dirname(local)
632 if not os.path.exists(local_dir):
632 if not os.path.exists(local_dir):
633 os.makedirs(local_dir)
633 os.makedirs(local_dir, 775)
634 check_output(self.scp_cmd + [full_remote, local])
634 check_output(self.scp_cmd + [full_remote, local])
635
635
636 def fetch_files(self):
636 def fetch_files(self):
637 """fetch remote files (called after start)"""
637 """fetch remote files (called after start)"""
638 if not self.to_fetch:
638 if not self.to_fetch:
639 return
639 return
640 for remote_file, local_file in self.to_fetch:
640 for remote_file, local_file in self.to_fetch:
641 self._fetch_file(remote_file, local_file)
641 self._fetch_file(remote_file, local_file)
642
642
643 def start(self, hostname=None, user=None):
643 def start(self, hostname=None, user=None):
644 if hostname is not None:
644 if hostname is not None:
645 self.hostname = hostname
645 self.hostname = hostname
646 if user is not None:
646 if user is not None:
647 self.user = user
647 self.user = user
648
648
649 self.send_files()
649 self.send_files()
650 super(SSHLauncher, self).start()
650 super(SSHLauncher, self).start()
651 self.fetch_files()
651 self.fetch_files()
652
652
653 def signal(self, sig):
653 def signal(self, sig):
654 if self.state == 'running':
654 if self.state == 'running':
655 # send escaped ssh connection-closer
655 # send escaped ssh connection-closer
656 self.process.stdin.write('~.')
656 self.process.stdin.write('~.')
657 self.process.stdin.flush()
657 self.process.stdin.flush()
658
658
659 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
659 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
660
660
661 remote_profile_dir = Unicode('', config=True,
661 remote_profile_dir = Unicode('', config=True,
662 help="""The remote profile_dir to use.
662 help="""The remote profile_dir to use.
663
663
664 If not specified, use calling profile, stripping out possible leading homedir.
664 If not specified, use calling profile, stripping out possible leading homedir.
665 """)
665 """)
666
666
667 def _profile_dir_changed(self, name, old, new):
667 def _profile_dir_changed(self, name, old, new):
668 if not self.remote_profile_dir:
668 if not self.remote_profile_dir:
669 # trigger remote_profile_dir_default logic again,
669 # trigger remote_profile_dir_default logic again,
670 # in case it was already triggered before profile_dir was set
670 # in case it was already triggered before profile_dir was set
671 self.remote_profile_dir = self._strip_home(new)
671 self.remote_profile_dir = self._strip_home(new)
672
672
673 @staticmethod
673 @staticmethod
674 def _strip_home(path):
674 def _strip_home(path):
675 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
675 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
676 home = get_home_dir()
676 home = get_home_dir()
677 if not home.endswith('/'):
677 if not home.endswith('/'):
678 home = home+'/'
678 home = home+'/'
679
679
680 if path.startswith(home):
680 if path.startswith(home):
681 return path[len(home):]
681 return path[len(home):]
682 else:
682 else:
683 return path
683 return path
684
684
685 def _remote_profile_dir_default(self):
685 def _remote_profile_dir_default(self):
686 return self._strip_home(self.profile_dir)
686 return self._strip_home(self.profile_dir)
687
687
688 def _cluster_id_changed(self, name, old, new):
688 def _cluster_id_changed(self, name, old, new):
689 if new:
689 if new:
690 raise ValueError("cluster id not supported by SSH launchers")
690 raise ValueError("cluster id not supported by SSH launchers")
691
691
692 @property
692 @property
693 def cluster_args(self):
693 def cluster_args(self):
694 return ['--profile-dir', self.remote_profile_dir]
694 return ['--profile-dir', self.remote_profile_dir]
695
695
696 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
696 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
697
697
698 # alias back to *non-configurable* program[_args] for use in find_args()
698 # alias back to *non-configurable* program[_args] for use in find_args()
699 # this way all Controller/EngineSetLaunchers have the same form, rather
699 # this way all Controller/EngineSetLaunchers have the same form, rather
700 # than *some* having `program_args` and others `controller_args`
700 # than *some* having `program_args` and others `controller_args`
701
701
702 def _controller_cmd_default(self):
702 def _controller_cmd_default(self):
703 return ['ipcontroller']
703 return ['ipcontroller']
704
704
705 @property
705 @property
706 def program(self):
706 def program(self):
707 return self.controller_cmd
707 return self.controller_cmd
708
708
709 @property
709 @property
710 def program_args(self):
710 def program_args(self):
711 return self.cluster_args + self.controller_args
711 return self.cluster_args + self.controller_args
712
712
713 def _to_fetch_default(self):
713 def _to_fetch_default(self):
714 return [
714 return [
715 (os.path.join(self.remote_profile_dir, 'security', cf),
715 (os.path.join(self.remote_profile_dir, 'security', cf),
716 os.path.join(self.profile_dir, 'security', cf),)
716 os.path.join(self.profile_dir, 'security', cf),)
717 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
717 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
718 ]
718 ]
719
719
720 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
720 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
721
721
722 # alias back to *non-configurable* program[_args] for use in find_args()
722 # alias back to *non-configurable* program[_args] for use in find_args()
723 # this way all Controller/EngineSetLaunchers have the same form, rather
723 # this way all Controller/EngineSetLaunchers have the same form, rather
724 # than *some* having `program_args` and others `controller_args`
724 # than *some* having `program_args` and others `controller_args`
725
725
726 def _engine_cmd_default(self):
726 def _engine_cmd_default(self):
727 return ['ipengine']
727 return ['ipengine']
728
728
729 @property
729 @property
730 def program(self):
730 def program(self):
731 return self.engine_cmd
731 return self.engine_cmd
732
732
733 @property
733 @property
734 def program_args(self):
734 def program_args(self):
735 return self.cluster_args + self.engine_args
735 return self.cluster_args + self.engine_args
736
736
737 def _to_send_default(self):
737 def _to_send_default(self):
738 return [
738 return [
739 (os.path.join(self.profile_dir, 'security', cf),
739 (os.path.join(self.profile_dir, 'security', cf),
740 os.path.join(self.remote_profile_dir, 'security', cf))
740 os.path.join(self.remote_profile_dir, 'security', cf))
741 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
741 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
742 ]
742 ]
743
743
744
744
745 class SSHEngineSetLauncher(LocalEngineSetLauncher):
745 class SSHEngineSetLauncher(LocalEngineSetLauncher):
746 launcher_class = SSHEngineLauncher
746 launcher_class = SSHEngineLauncher
747 engines = Dict(config=True,
747 engines = Dict(config=True,
748 help="""dict of engines to launch. This is a dict by hostname of ints,
748 help="""dict of engines to launch. This is a dict by hostname of ints,
749 corresponding to the number of engines to start on that host.""")
749 corresponding to the number of engines to start on that host.""")
750
750
751 def _engine_cmd_default(self):
751 def _engine_cmd_default(self):
752 return ['ipengine']
752 return ['ipengine']
753
753
754 @property
754 @property
755 def engine_count(self):
755 def engine_count(self):
756 """determine engine count from `engines` dict"""
756 """determine engine count from `engines` dict"""
757 count = 0
757 count = 0
758 for n in itervalues(self.engines):
758 for n in itervalues(self.engines):
759 if isinstance(n, (tuple,list)):
759 if isinstance(n, (tuple,list)):
760 n,args = n
760 n,args = n
761 count += n
761 count += n
762 return count
762 return count
763
763
764 def start(self, n):
764 def start(self, n):
765 """Start engines by profile or profile_dir.
765 """Start engines by profile or profile_dir.
766 `n` is ignored, and the `engines` config property is used instead.
766 `n` is ignored, and the `engines` config property is used instead.
767 """
767 """
768
768
769 dlist = []
769 dlist = []
770 for host, n in iteritems(self.engines):
770 for host, n in iteritems(self.engines):
771 if isinstance(n, (tuple, list)):
771 if isinstance(n, (tuple, list)):
772 n, args = n
772 n, args = n
773 else:
773 else:
774 args = copy.deepcopy(self.engine_args)
774 args = copy.deepcopy(self.engine_args)
775
775
776 if '@' in host:
776 if '@' in host:
777 user,host = host.split('@',1)
777 user,host = host.split('@',1)
778 else:
778 else:
779 user=None
779 user=None
780 for i in range(n):
780 for i in range(n):
781 if i > 0:
781 if i > 0:
782 time.sleep(self.delay)
782 time.sleep(self.delay)
783 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
783 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
784 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
784 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
785 )
785 )
786 if i > 0:
786 if i > 0:
787 # only send files for the first engine on each host
787 # only send files for the first engine on each host
788 el.to_send = []
788 el.to_send = []
789
789
790 # Copy the engine args over to each engine launcher.
790 # Copy the engine args over to each engine launcher.
791 el.engine_cmd = self.engine_cmd
791 el.engine_cmd = self.engine_cmd
792 el.engine_args = args
792 el.engine_args = args
793 el.on_stop(self._notice_engine_stopped)
793 el.on_stop(self._notice_engine_stopped)
794 d = el.start(user=user, hostname=host)
794 d = el.start(user=user, hostname=host)
795 self.launchers[ "%s/%i" % (host,i) ] = el
795 self.launchers[ "%s/%i" % (host,i) ] = el
796 dlist.append(d)
796 dlist.append(d)
797 self.notify_start(dlist)
797 self.notify_start(dlist)
798 return dlist
798 return dlist
799
799
800
800
801 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
801 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
802 """Launcher for calling
802 """Launcher for calling
803 `ipcluster engines` on a remote machine.
803 `ipcluster engines` on a remote machine.
804
804
805 Requires that remote profile is already configured.
805 Requires that remote profile is already configured.
806 """
806 """
807
807
808 n = Integer()
808 n = Integer()
809 ipcluster_cmd = List(['ipcluster'], config=True)
809 ipcluster_cmd = List(['ipcluster'], config=True)
810
810
811 @property
811 @property
812 def program(self):
812 def program(self):
813 return self.ipcluster_cmd + ['engines']
813 return self.ipcluster_cmd + ['engines']
814
814
815 @property
815 @property
816 def program_args(self):
816 def program_args(self):
817 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
817 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
818
818
819 def _to_send_default(self):
819 def _to_send_default(self):
820 return [
820 return [
821 (os.path.join(self.profile_dir, 'security', cf),
821 (os.path.join(self.profile_dir, 'security', cf),
822 os.path.join(self.remote_profile_dir, 'security', cf))
822 os.path.join(self.remote_profile_dir, 'security', cf))
823 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
823 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
824 ]
824 ]
825
825
826 def start(self, n):
826 def start(self, n):
827 self.n = n
827 self.n = n
828 super(SSHProxyEngineSetLauncher, self).start()
828 super(SSHProxyEngineSetLauncher, self).start()
829
829
830
830
831 #-----------------------------------------------------------------------------
831 #-----------------------------------------------------------------------------
832 # Windows HPC Server 2008 scheduler launchers
832 # Windows HPC Server 2008 scheduler launchers
833 #-----------------------------------------------------------------------------
833 #-----------------------------------------------------------------------------
834
834
835
835
836 # This is only used on Windows.
836 # This is only used on Windows.
837 def find_job_cmd():
837 def find_job_cmd():
838 if WINDOWS:
838 if WINDOWS:
839 try:
839 try:
840 return find_cmd('job')
840 return find_cmd('job')
841 except (FindCmdError, ImportError):
841 except (FindCmdError, ImportError):
842 # ImportError will be raised if win32api is not installed
842 # ImportError will be raised if win32api is not installed
843 return 'job'
843 return 'job'
844 else:
844 else:
845 return 'job'
845 return 'job'
846
846
847
847
848 class WindowsHPCLauncher(BaseLauncher):
848 class WindowsHPCLauncher(BaseLauncher):
849
849
850 job_id_regexp = CRegExp(r'\d+', config=True,
850 job_id_regexp = CRegExp(r'\d+', config=True,
851 help="""A regular expression used to get the job id from the output of the
851 help="""A regular expression used to get the job id from the output of the
852 submit_command. """
852 submit_command. """
853 )
853 )
854 job_file_name = Unicode(u'ipython_job.xml', config=True,
854 job_file_name = Unicode(u'ipython_job.xml', config=True,
855 help="The filename of the instantiated job script.")
855 help="The filename of the instantiated job script.")
856 # The full path to the instantiated job script. This gets made dynamically
856 # The full path to the instantiated job script. This gets made dynamically
857 # by combining the work_dir with the job_file_name.
857 # by combining the work_dir with the job_file_name.
858 job_file = Unicode(u'')
858 job_file = Unicode(u'')
859 scheduler = Unicode('', config=True,
859 scheduler = Unicode('', config=True,
860 help="The hostname of the scheduler to submit the job to.")
860 help="The hostname of the scheduler to submit the job to.")
861 job_cmd = Unicode(find_job_cmd(), config=True,
861 job_cmd = Unicode(find_job_cmd(), config=True,
862 help="The command for submitting jobs.")
862 help="The command for submitting jobs.")
863
863
864 def __init__(self, work_dir=u'.', config=None, **kwargs):
864 def __init__(self, work_dir=u'.', config=None, **kwargs):
865 super(WindowsHPCLauncher, self).__init__(
865 super(WindowsHPCLauncher, self).__init__(
866 work_dir=work_dir, config=config, **kwargs
866 work_dir=work_dir, config=config, **kwargs
867 )
867 )
868
868
869 @property
869 @property
870 def job_file(self):
870 def job_file(self):
871 return os.path.join(self.work_dir, self.job_file_name)
871 return os.path.join(self.work_dir, self.job_file_name)
872
872
873 def write_job_file(self, n):
873 def write_job_file(self, n):
874 raise NotImplementedError("Implement write_job_file in a subclass.")
874 raise NotImplementedError("Implement write_job_file in a subclass.")
875
875
876 def find_args(self):
876 def find_args(self):
877 return [u'job.exe']
877 return [u'job.exe']
878
878
879 def parse_job_id(self, output):
879 def parse_job_id(self, output):
880 """Take the output of the submit command and return the job id."""
880 """Take the output of the submit command and return the job id."""
881 m = self.job_id_regexp.search(output)
881 m = self.job_id_regexp.search(output)
882 if m is not None:
882 if m is not None:
883 job_id = m.group()
883 job_id = m.group()
884 else:
884 else:
885 raise LauncherError("Job id couldn't be determined: %s" % output)
885 raise LauncherError("Job id couldn't be determined: %s" % output)
886 self.job_id = job_id
886 self.job_id = job_id
887 self.log.info('Job started with id: %r', job_id)
887 self.log.info('Job started with id: %r', job_id)
888 return job_id
888 return job_id
889
889
890 def start(self, n):
890 def start(self, n):
891 """Start n copies of the process using the Win HPC job scheduler."""
891 """Start n copies of the process using the Win HPC job scheduler."""
892 self.write_job_file(n)
892 self.write_job_file(n)
893 args = [
893 args = [
894 'submit',
894 'submit',
895 '/jobfile:%s' % self.job_file,
895 '/jobfile:%s' % self.job_file,
896 '/scheduler:%s' % self.scheduler
896 '/scheduler:%s' % self.scheduler
897 ]
897 ]
898 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
898 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
899
899
900 output = check_output([self.job_cmd]+args,
900 output = check_output([self.job_cmd]+args,
901 env=os.environ,
901 env=os.environ,
902 cwd=self.work_dir,
902 cwd=self.work_dir,
903 stderr=STDOUT
903 stderr=STDOUT
904 )
904 )
905 output = output.decode(DEFAULT_ENCODING, 'replace')
905 output = output.decode(DEFAULT_ENCODING, 'replace')
906 job_id = self.parse_job_id(output)
906 job_id = self.parse_job_id(output)
907 self.notify_start(job_id)
907 self.notify_start(job_id)
908 return job_id
908 return job_id
909
909
910 def stop(self):
910 def stop(self):
911 args = [
911 args = [
912 'cancel',
912 'cancel',
913 self.job_id,
913 self.job_id,
914 '/scheduler:%s' % self.scheduler
914 '/scheduler:%s' % self.scheduler
915 ]
915 ]
916 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
916 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
917 try:
917 try:
918 output = check_output([self.job_cmd]+args,
918 output = check_output([self.job_cmd]+args,
919 env=os.environ,
919 env=os.environ,
920 cwd=self.work_dir,
920 cwd=self.work_dir,
921 stderr=STDOUT
921 stderr=STDOUT
922 )
922 )
923 output = output.decode(DEFAULT_ENCODING, 'replace')
923 output = output.decode(DEFAULT_ENCODING, 'replace')
924 except:
924 except:
925 output = u'The job already appears to be stopped: %r' % self.job_id
925 output = u'The job already appears to be stopped: %r' % self.job_id
926 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
926 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
927 return output
927 return output
928
928
929
929
930 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
930 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
931
931
932 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
932 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
933 help="WinHPC xml job file.")
933 help="WinHPC xml job file.")
934 controller_args = List([], config=False,
934 controller_args = List([], config=False,
935 help="extra args to pass to ipcontroller")
935 help="extra args to pass to ipcontroller")
936
936
937 def write_job_file(self, n):
937 def write_job_file(self, n):
938 job = IPControllerJob(parent=self)
938 job = IPControllerJob(parent=self)
939
939
940 t = IPControllerTask(parent=self)
940 t = IPControllerTask(parent=self)
941 # The tasks work directory is *not* the actual work directory of
941 # The tasks work directory is *not* the actual work directory of
942 # the controller. It is used as the base path for the stdout/stderr
942 # the controller. It is used as the base path for the stdout/stderr
943 # files that the scheduler redirects to.
943 # files that the scheduler redirects to.
944 t.work_directory = self.profile_dir
944 t.work_directory = self.profile_dir
945 # Add the profile_dir and from self.start().
945 # Add the profile_dir and from self.start().
946 t.controller_args.extend(self.cluster_args)
946 t.controller_args.extend(self.cluster_args)
947 t.controller_args.extend(self.controller_args)
947 t.controller_args.extend(self.controller_args)
948 job.add_task(t)
948 job.add_task(t)
949
949
950 self.log.debug("Writing job description file: %s", self.job_file)
950 self.log.debug("Writing job description file: %s", self.job_file)
951 job.write(self.job_file)
951 job.write(self.job_file)
952
952
953 @property
953 @property
954 def job_file(self):
954 def job_file(self):
955 return os.path.join(self.profile_dir, self.job_file_name)
955 return os.path.join(self.profile_dir, self.job_file_name)
956
956
957 def start(self):
957 def start(self):
958 """Start the controller by profile_dir."""
958 """Start the controller by profile_dir."""
959 return super(WindowsHPCControllerLauncher, self).start(1)
959 return super(WindowsHPCControllerLauncher, self).start(1)
960
960
961
961
962 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
962 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
963
963
964 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
964 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
965 help="jobfile for ipengines job")
965 help="jobfile for ipengines job")
966 engine_args = List([], config=False,
966 engine_args = List([], config=False,
967 help="extra args to pas to ipengine")
967 help="extra args to pas to ipengine")
968
968
969 def write_job_file(self, n):
969 def write_job_file(self, n):
970 job = IPEngineSetJob(parent=self)
970 job = IPEngineSetJob(parent=self)
971
971
972 for i in range(n):
972 for i in range(n):
973 t = IPEngineTask(parent=self)
973 t = IPEngineTask(parent=self)
974 # The tasks work directory is *not* the actual work directory of
974 # The tasks work directory is *not* the actual work directory of
975 # the engine. It is used as the base path for the stdout/stderr
975 # the engine. It is used as the base path for the stdout/stderr
976 # files that the scheduler redirects to.
976 # files that the scheduler redirects to.
977 t.work_directory = self.profile_dir
977 t.work_directory = self.profile_dir
978 # Add the profile_dir and from self.start().
978 # Add the profile_dir and from self.start().
979 t.engine_args.extend(self.cluster_args)
979 t.engine_args.extend(self.cluster_args)
980 t.engine_args.extend(self.engine_args)
980 t.engine_args.extend(self.engine_args)
981 job.add_task(t)
981 job.add_task(t)
982
982
983 self.log.debug("Writing job description file: %s", self.job_file)
983 self.log.debug("Writing job description file: %s", self.job_file)
984 job.write(self.job_file)
984 job.write(self.job_file)
985
985
986 @property
986 @property
987 def job_file(self):
987 def job_file(self):
988 return os.path.join(self.profile_dir, self.job_file_name)
988 return os.path.join(self.profile_dir, self.job_file_name)
989
989
990 def start(self, n):
990 def start(self, n):
991 """Start the controller by profile_dir."""
991 """Start the controller by profile_dir."""
992 return super(WindowsHPCEngineSetLauncher, self).start(n)
992 return super(WindowsHPCEngineSetLauncher, self).start(n)
993
993
994
994
995 #-----------------------------------------------------------------------------
995 #-----------------------------------------------------------------------------
996 # Batch (PBS) system launchers
996 # Batch (PBS) system launchers
997 #-----------------------------------------------------------------------------
997 #-----------------------------------------------------------------------------
998
998
999 class BatchClusterAppMixin(ClusterAppMixin):
999 class BatchClusterAppMixin(ClusterAppMixin):
1000 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1000 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
1001 def _profile_dir_changed(self, name, old, new):
1001 def _profile_dir_changed(self, name, old, new):
1002 self.context[name] = new
1002 self.context[name] = new
1003 _cluster_id_changed = _profile_dir_changed
1003 _cluster_id_changed = _profile_dir_changed
1004
1004
1005 def _profile_dir_default(self):
1005 def _profile_dir_default(self):
1006 self.context['profile_dir'] = ''
1006 self.context['profile_dir'] = ''
1007 return ''
1007 return ''
1008 def _cluster_id_default(self):
1008 def _cluster_id_default(self):
1009 self.context['cluster_id'] = ''
1009 self.context['cluster_id'] = ''
1010 return ''
1010 return ''
1011
1011
1012
1012
1013 class BatchSystemLauncher(BaseLauncher):
1013 class BatchSystemLauncher(BaseLauncher):
1014 """Launch an external process using a batch system.
1014 """Launch an external process using a batch system.
1015
1015
1016 This class is designed to work with UNIX batch systems like PBS, LSF,
1016 This class is designed to work with UNIX batch systems like PBS, LSF,
1017 GridEngine, etc. The overall model is that there are different commands
1017 GridEngine, etc. The overall model is that there are different commands
1018 like qsub, qdel, etc. that handle the starting and stopping of the process.
1018 like qsub, qdel, etc. that handle the starting and stopping of the process.
1019
1019
1020 This class also has the notion of a batch script. The ``batch_template``
1020 This class also has the notion of a batch script. The ``batch_template``
1021 attribute can be set to a string that is a template for the batch script.
1021 attribute can be set to a string that is a template for the batch script.
1022 This template is instantiated using string formatting. Thus the template can
1022 This template is instantiated using string formatting. Thus the template can
1023 use {n} fot the number of instances. Subclasses can add additional variables
1023 use {n} fot the number of instances. Subclasses can add additional variables
1024 to the template dict.
1024 to the template dict.
1025 """
1025 """
1026
1026
1027 # Subclasses must fill these in. See PBSEngineSet
1027 # Subclasses must fill these in. See PBSEngineSet
1028 submit_command = List([''], config=True,
1028 submit_command = List([''], config=True,
1029 help="The name of the command line program used to submit jobs.")
1029 help="The name of the command line program used to submit jobs.")
1030 delete_command = List([''], config=True,
1030 delete_command = List([''], config=True,
1031 help="The name of the command line program used to delete jobs.")
1031 help="The name of the command line program used to delete jobs.")
1032 job_id_regexp = CRegExp('', config=True,
1032 job_id_regexp = CRegExp('', config=True,
1033 help="""A regular expression used to get the job id from the output of the
1033 help="""A regular expression used to get the job id from the output of the
1034 submit_command.""")
1034 submit_command.""")
1035 job_id_regexp_group = Integer(0, config=True,
1035 job_id_regexp_group = Integer(0, config=True,
1036 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1036 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1037 batch_template = Unicode('', config=True,
1037 batch_template = Unicode('', config=True,
1038 help="The string that is the batch script template itself.")
1038 help="The string that is the batch script template itself.")
1039 batch_template_file = Unicode(u'', config=True,
1039 batch_template_file = Unicode(u'', config=True,
1040 help="The file that contains the batch template.")
1040 help="The file that contains the batch template.")
1041 batch_file_name = Unicode(u'batch_script', config=True,
1041 batch_file_name = Unicode(u'batch_script', config=True,
1042 help="The filename of the instantiated batch script.")
1042 help="The filename of the instantiated batch script.")
1043 queue = Unicode(u'', config=True,
1043 queue = Unicode(u'', config=True,
1044 help="The PBS Queue.")
1044 help="The PBS Queue.")
1045
1045
1046 def _queue_changed(self, name, old, new):
1046 def _queue_changed(self, name, old, new):
1047 self.context[name] = new
1047 self.context[name] = new
1048
1048
1049 n = Integer(1)
1049 n = Integer(1)
1050 _n_changed = _queue_changed
1050 _n_changed = _queue_changed
1051
1051
1052 # not configurable, override in subclasses
1052 # not configurable, override in subclasses
1053 # PBS Job Array regex
1053 # PBS Job Array regex
1054 job_array_regexp = CRegExp('')
1054 job_array_regexp = CRegExp('')
1055 job_array_template = Unicode('')
1055 job_array_template = Unicode('')
1056 # PBS Queue regex
1056 # PBS Queue regex
1057 queue_regexp = CRegExp('')
1057 queue_regexp = CRegExp('')
1058 queue_template = Unicode('')
1058 queue_template = Unicode('')
1059 # The default batch template, override in subclasses
1059 # The default batch template, override in subclasses
1060 default_template = Unicode('')
1060 default_template = Unicode('')
1061 # The full path to the instantiated batch script.
1061 # The full path to the instantiated batch script.
1062 batch_file = Unicode(u'')
1062 batch_file = Unicode(u'')
1063 # the format dict used with batch_template:
1063 # the format dict used with batch_template:
1064 context = Dict()
1064 context = Dict()
1065
1065
1066 def _context_default(self):
1066 def _context_default(self):
1067 """load the default context with the default values for the basic keys
1067 """load the default context with the default values for the basic keys
1068
1068
1069 because the _trait_changed methods only load the context if they
1069 because the _trait_changed methods only load the context if they
1070 are set to something other than the default value.
1070 are set to something other than the default value.
1071 """
1071 """
1072 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1072 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1073
1073
1074 # the Formatter instance for rendering the templates:
1074 # the Formatter instance for rendering the templates:
1075 formatter = Instance(EvalFormatter, (), {})
1075 formatter = Instance(EvalFormatter, (), {})
1076
1076
1077 def find_args(self):
1077 def find_args(self):
1078 return self.submit_command + [self.batch_file]
1078 return self.submit_command + [self.batch_file]
1079
1079
1080 def __init__(self, work_dir=u'.', config=None, **kwargs):
1080 def __init__(self, work_dir=u'.', config=None, **kwargs):
1081 super(BatchSystemLauncher, self).__init__(
1081 super(BatchSystemLauncher, self).__init__(
1082 work_dir=work_dir, config=config, **kwargs
1082 work_dir=work_dir, config=config, **kwargs
1083 )
1083 )
1084 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1084 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1085
1085
1086 def parse_job_id(self, output):
1086 def parse_job_id(self, output):
1087 """Take the output of the submit command and return the job id."""
1087 """Take the output of the submit command and return the job id."""
1088 m = self.job_id_regexp.search(output)
1088 m = self.job_id_regexp.search(output)
1089 if m is not None:
1089 if m is not None:
1090 job_id = m.group(self.job_id_regexp_group)
1090 job_id = m.group(self.job_id_regexp_group)
1091 else:
1091 else:
1092 raise LauncherError("Job id couldn't be determined: %s" % output)
1092 raise LauncherError("Job id couldn't be determined: %s" % output)
1093 self.job_id = job_id
1093 self.job_id = job_id
1094 self.log.info('Job submitted with job id: %r', job_id)
1094 self.log.info('Job submitted with job id: %r', job_id)
1095 return job_id
1095 return job_id
1096
1096
1097 def write_batch_script(self, n):
1097 def write_batch_script(self, n):
1098 """Instantiate and write the batch script to the work_dir."""
1098 """Instantiate and write the batch script to the work_dir."""
1099 self.n = n
1099 self.n = n
1100 # first priority is batch_template if set
1100 # first priority is batch_template if set
1101 if self.batch_template_file and not self.batch_template:
1101 if self.batch_template_file and not self.batch_template:
1102 # second priority is batch_template_file
1102 # second priority is batch_template_file
1103 with open(self.batch_template_file) as f:
1103 with open(self.batch_template_file) as f:
1104 self.batch_template = f.read()
1104 self.batch_template = f.read()
1105 if not self.batch_template:
1105 if not self.batch_template:
1106 # third (last) priority is default_template
1106 # third (last) priority is default_template
1107 self.batch_template = self.default_template
1107 self.batch_template = self.default_template
1108 # add jobarray or queue lines to user-specified template
1108 # add jobarray or queue lines to user-specified template
1109 # note that this is *only* when user did not specify a template.
1109 # note that this is *only* when user did not specify a template.
1110 self._insert_queue_in_script()
1110 self._insert_queue_in_script()
1111 self._insert_job_array_in_script()
1111 self._insert_job_array_in_script()
1112 script_as_string = self.formatter.format(self.batch_template, **self.context)
1112 script_as_string = self.formatter.format(self.batch_template, **self.context)
1113 self.log.debug('Writing batch script: %s', self.batch_file)
1113 self.log.debug('Writing batch script: %s', self.batch_file)
1114 with open(self.batch_file, 'w') as f:
1114 with open(self.batch_file, 'w') as f:
1115 f.write(script_as_string)
1115 f.write(script_as_string)
1116 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1116 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1117
1117
1118 def _insert_queue_in_script(self):
1118 def _insert_queue_in_script(self):
1119 """Inserts a queue if required into the batch script.
1119 """Inserts a queue if required into the batch script.
1120 """
1120 """
1121 if self.queue and not self.queue_regexp.search(self.batch_template):
1121 if self.queue and not self.queue_regexp.search(self.batch_template):
1122 self.log.debug("adding PBS queue settings to batch script")
1122 self.log.debug("adding PBS queue settings to batch script")
1123 firstline, rest = self.batch_template.split('\n',1)
1123 firstline, rest = self.batch_template.split('\n',1)
1124 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1124 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1125
1125
1126 def _insert_job_array_in_script(self):
1126 def _insert_job_array_in_script(self):
1127 """Inserts a job array if required into the batch script.
1127 """Inserts a job array if required into the batch script.
1128 """
1128 """
1129 if not self.job_array_regexp.search(self.batch_template):
1129 if not self.job_array_regexp.search(self.batch_template):
1130 self.log.debug("adding job array settings to batch script")
1130 self.log.debug("adding job array settings to batch script")
1131 firstline, rest = self.batch_template.split('\n',1)
1131 firstline, rest = self.batch_template.split('\n',1)
1132 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1132 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1133
1133
1134 def start(self, n):
1134 def start(self, n):
1135 """Start n copies of the process using a batch system."""
1135 """Start n copies of the process using a batch system."""
1136 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1136 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1137 # Here we save profile_dir in the context so they
1137 # Here we save profile_dir in the context so they
1138 # can be used in the batch script template as {profile_dir}
1138 # can be used in the batch script template as {profile_dir}
1139 self.write_batch_script(n)
1139 self.write_batch_script(n)
1140 output = check_output(self.args, env=os.environ)
1140 output = check_output(self.args, env=os.environ)
1141 output = output.decode(DEFAULT_ENCODING, 'replace')
1141 output = output.decode(DEFAULT_ENCODING, 'replace')
1142
1142
1143 job_id = self.parse_job_id(output)
1143 job_id = self.parse_job_id(output)
1144 self.notify_start(job_id)
1144 self.notify_start(job_id)
1145 return job_id
1145 return job_id
1146
1146
1147 def stop(self):
1147 def stop(self):
1148 try:
1148 try:
1149 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1149 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1150 stdout=PIPE, stderr=PIPE)
1150 stdout=PIPE, stderr=PIPE)
1151 out, err = p.communicate()
1151 out, err = p.communicate()
1152 output = out + err
1152 output = out + err
1153 except:
1153 except:
1154 self.log.exception("Problem stopping cluster with command: %s" %
1154 self.log.exception("Problem stopping cluster with command: %s" %
1155 (self.delete_command + [self.job_id]))
1155 (self.delete_command + [self.job_id]))
1156 output = ""
1156 output = ""
1157 output = output.decode(DEFAULT_ENCODING, 'replace')
1157 output = output.decode(DEFAULT_ENCODING, 'replace')
1158 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1158 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1159 return output
1159 return output
1160
1160
1161
1161
1162 class PBSLauncher(BatchSystemLauncher):
1162 class PBSLauncher(BatchSystemLauncher):
1163 """A BatchSystemLauncher subclass for PBS."""
1163 """A BatchSystemLauncher subclass for PBS."""
1164
1164
1165 submit_command = List(['qsub'], config=True,
1165 submit_command = List(['qsub'], config=True,
1166 help="The PBS submit command ['qsub']")
1166 help="The PBS submit command ['qsub']")
1167 delete_command = List(['qdel'], config=True,
1167 delete_command = List(['qdel'], config=True,
1168 help="The PBS delete command ['qsub']")
1168 help="The PBS delete command ['qsub']")
1169 job_id_regexp = CRegExp(r'\d+', config=True,
1169 job_id_regexp = CRegExp(r'\d+', config=True,
1170 help="Regular expresion for identifying the job ID [r'\d+']")
1170 help="Regular expresion for identifying the job ID [r'\d+']")
1171
1171
1172 batch_file = Unicode(u'')
1172 batch_file = Unicode(u'')
1173 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1173 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1174 job_array_template = Unicode('#PBS -t 1-{n}')
1174 job_array_template = Unicode('#PBS -t 1-{n}')
1175 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1175 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1176 queue_template = Unicode('#PBS -q {queue}')
1176 queue_template = Unicode('#PBS -q {queue}')
1177
1177
1178
1178
1179 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1179 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1180 """Launch a controller using PBS."""
1180 """Launch a controller using PBS."""
1181
1181
1182 batch_file_name = Unicode(u'pbs_controller', config=True,
1182 batch_file_name = Unicode(u'pbs_controller', config=True,
1183 help="batch file name for the controller job.")
1183 help="batch file name for the controller job.")
1184 default_template= Unicode("""#!/bin/sh
1184 default_template= Unicode("""#!/bin/sh
1185 #PBS -V
1185 #PBS -V
1186 #PBS -N ipcontroller
1186 #PBS -N ipcontroller
1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1189
1189
1190 def start(self):
1190 def start(self):
1191 """Start the controller by profile or profile_dir."""
1191 """Start the controller by profile or profile_dir."""
1192 return super(PBSControllerLauncher, self).start(1)
1192 return super(PBSControllerLauncher, self).start(1)
1193
1193
1194
1194
1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1195 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1196 """Launch Engines using PBS"""
1196 """Launch Engines using PBS"""
1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1197 batch_file_name = Unicode(u'pbs_engines', config=True,
1198 help="batch file name for the engine(s) job.")
1198 help="batch file name for the engine(s) job.")
1199 default_template= Unicode(u"""#!/bin/sh
1199 default_template= Unicode(u"""#!/bin/sh
1200 #PBS -V
1200 #PBS -V
1201 #PBS -N ipengine
1201 #PBS -N ipengine
1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1202 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1203 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1204
1204
1205
1205
1206 #SGE is very similar to PBS
1206 #SGE is very similar to PBS
1207
1207
1208 class SGELauncher(PBSLauncher):
1208 class SGELauncher(PBSLauncher):
1209 """Sun GridEngine is a PBS clone with slightly different syntax"""
1209 """Sun GridEngine is a PBS clone with slightly different syntax"""
1210 job_array_regexp = CRegExp('#\$\W+\-t')
1210 job_array_regexp = CRegExp('#\$\W+\-t')
1211 job_array_template = Unicode('#$ -t 1-{n}')
1211 job_array_template = Unicode('#$ -t 1-{n}')
1212 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1212 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1213 queue_template = Unicode('#$ -q {queue}')
1213 queue_template = Unicode('#$ -q {queue}')
1214
1214
1215
1215
1216 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1216 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1217 """Launch a controller using SGE."""
1217 """Launch a controller using SGE."""
1218
1218
1219 batch_file_name = Unicode(u'sge_controller', config=True,
1219 batch_file_name = Unicode(u'sge_controller', config=True,
1220 help="batch file name for the ipontroller job.")
1220 help="batch file name for the ipontroller job.")
1221 default_template= Unicode(u"""#$ -V
1221 default_template= Unicode(u"""#$ -V
1222 #$ -S /bin/sh
1222 #$ -S /bin/sh
1223 #$ -N ipcontroller
1223 #$ -N ipcontroller
1224 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1224 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1225 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1225 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1226
1226
1227 def start(self):
1227 def start(self):
1228 """Start the controller by profile or profile_dir."""
1228 """Start the controller by profile or profile_dir."""
1229 return super(SGEControllerLauncher, self).start(1)
1229 return super(SGEControllerLauncher, self).start(1)
1230
1230
1231
1231
1232 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1232 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1233 """Launch Engines with SGE"""
1233 """Launch Engines with SGE"""
1234 batch_file_name = Unicode(u'sge_engines', config=True,
1234 batch_file_name = Unicode(u'sge_engines', config=True,
1235 help="batch file name for the engine(s) job.")
1235 help="batch file name for the engine(s) job.")
1236 default_template = Unicode("""#$ -V
1236 default_template = Unicode("""#$ -V
1237 #$ -S /bin/sh
1237 #$ -S /bin/sh
1238 #$ -N ipengine
1238 #$ -N ipengine
1239 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1239 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1240 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1240 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1241
1241
1242
1242
1243 # LSF launchers
1243 # LSF launchers
1244
1244
1245 class LSFLauncher(BatchSystemLauncher):
1245 class LSFLauncher(BatchSystemLauncher):
1246 """A BatchSystemLauncher subclass for LSF."""
1246 """A BatchSystemLauncher subclass for LSF."""
1247
1247
1248 submit_command = List(['bsub'], config=True,
1248 submit_command = List(['bsub'], config=True,
1249 help="The PBS submit command ['bsub']")
1249 help="The PBS submit command ['bsub']")
1250 delete_command = List(['bkill'], config=True,
1250 delete_command = List(['bkill'], config=True,
1251 help="The PBS delete command ['bkill']")
1251 help="The PBS delete command ['bkill']")
1252 job_id_regexp = CRegExp(r'\d+', config=True,
1252 job_id_regexp = CRegExp(r'\d+', config=True,
1253 help="Regular expresion for identifying the job ID [r'\d+']")
1253 help="Regular expresion for identifying the job ID [r'\d+']")
1254
1254
1255 batch_file = Unicode(u'')
1255 batch_file = Unicode(u'')
1256 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1256 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1257 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1257 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1258 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1258 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1259 queue_template = Unicode('#BSUB -q {queue}')
1259 queue_template = Unicode('#BSUB -q {queue}')
1260
1260
1261 def start(self, n):
1261 def start(self, n):
1262 """Start n copies of the process using LSF batch system.
1262 """Start n copies of the process using LSF batch system.
1263 This cant inherit from the base class because bsub expects
1263 This cant inherit from the base class because bsub expects
1264 to be piped a shell script in order to honor the #BSUB directives :
1264 to be piped a shell script in order to honor the #BSUB directives :
1265 bsub < script
1265 bsub < script
1266 """
1266 """
1267 # Here we save profile_dir in the context so they
1267 # Here we save profile_dir in the context so they
1268 # can be used in the batch script template as {profile_dir}
1268 # can be used in the batch script template as {profile_dir}
1269 self.write_batch_script(n)
1269 self.write_batch_script(n)
1270 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1270 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1271 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1271 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1272 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1272 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1273 output,err = p.communicate()
1273 output,err = p.communicate()
1274 output = output.decode(DEFAULT_ENCODING, 'replace')
1274 output = output.decode(DEFAULT_ENCODING, 'replace')
1275 job_id = self.parse_job_id(output)
1275 job_id = self.parse_job_id(output)
1276 self.notify_start(job_id)
1276 self.notify_start(job_id)
1277 return job_id
1277 return job_id
1278
1278
1279
1279
1280 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1280 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1281 """Launch a controller using LSF."""
1281 """Launch a controller using LSF."""
1282
1282
1283 batch_file_name = Unicode(u'lsf_controller', config=True,
1283 batch_file_name = Unicode(u'lsf_controller', config=True,
1284 help="batch file name for the controller job.")
1284 help="batch file name for the controller job.")
1285 default_template= Unicode("""#!/bin/sh
1285 default_template= Unicode("""#!/bin/sh
1286 #BSUB -J ipcontroller
1286 #BSUB -J ipcontroller
1287 #BSUB -oo ipcontroller.o.%%J
1287 #BSUB -oo ipcontroller.o.%%J
1288 #BSUB -eo ipcontroller.e.%%J
1288 #BSUB -eo ipcontroller.e.%%J
1289 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1289 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1290 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1290 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1291
1291
1292 def start(self):
1292 def start(self):
1293 """Start the controller by profile or profile_dir."""
1293 """Start the controller by profile or profile_dir."""
1294 return super(LSFControllerLauncher, self).start(1)
1294 return super(LSFControllerLauncher, self).start(1)
1295
1295
1296
1296
1297 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1297 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1298 """Launch Engines using LSF"""
1298 """Launch Engines using LSF"""
1299 batch_file_name = Unicode(u'lsf_engines', config=True,
1299 batch_file_name = Unicode(u'lsf_engines', config=True,
1300 help="batch file name for the engine(s) job.")
1300 help="batch file name for the engine(s) job.")
1301 default_template= Unicode(u"""#!/bin/sh
1301 default_template= Unicode(u"""#!/bin/sh
1302 #BSUB -oo ipengine.o.%%J
1302 #BSUB -oo ipengine.o.%%J
1303 #BSUB -eo ipengine.e.%%J
1303 #BSUB -eo ipengine.e.%%J
1304 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1304 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1305 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1305 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1306
1306
1307
1307
1308
1308
1309 class HTCondorLauncher(BatchSystemLauncher):
1309 class HTCondorLauncher(BatchSystemLauncher):
1310 """A BatchSystemLauncher subclass for HTCondor.
1310 """A BatchSystemLauncher subclass for HTCondor.
1311
1311
1312 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1312 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1313 that the python instance but otherwise is very similar to PBS. This is because
1313 that the python instance but otherwise is very similar to PBS. This is because
1314 HTCondor destroys sys.executable when launching remote processes - a launched
1314 HTCondor destroys sys.executable when launching remote processes - a launched
1315 python process depends on sys.executable to effectively evaluate its
1315 python process depends on sys.executable to effectively evaluate its
1316 module search paths. Without it, regardless of which python interpreter you launch
1316 module search paths. Without it, regardless of which python interpreter you launch
1317 you will get the to built in module search paths.
1317 you will get the to built in module search paths.
1318
1318
1319 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1319 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1320 this - the mechanism of shebanged scripts means that the python binary will be
1320 this - the mechanism of shebanged scripts means that the python binary will be
1321 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1321 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1322 scripts on the remote node*. This means you need to take care that:
1322 scripts on the remote node*. This means you need to take care that:
1323
1323
1324 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1324 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1325 of the python environment you wish to execute code in having top precedence.
1325 of the python environment you wish to execute code in having top precedence.
1326 b. This functionality is untested on Windows.
1326 b. This functionality is untested on Windows.
1327
1327
1328 If you need different behavior, consider making you own template.
1328 If you need different behavior, consider making you own template.
1329 """
1329 """
1330
1330
1331 submit_command = List(['condor_submit'], config=True,
1331 submit_command = List(['condor_submit'], config=True,
1332 help="The HTCondor submit command ['condor_submit']")
1332 help="The HTCondor submit command ['condor_submit']")
1333 delete_command = List(['condor_rm'], config=True,
1333 delete_command = List(['condor_rm'], config=True,
1334 help="The HTCondor delete command ['condor_rm']")
1334 help="The HTCondor delete command ['condor_rm']")
1335 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1335 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1336 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1336 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1337 job_id_regexp_group = Integer(1, config=True,
1337 job_id_regexp_group = Integer(1, config=True,
1338 help="""The group we wish to match in job_id_regexp [1]""")
1338 help="""The group we wish to match in job_id_regexp [1]""")
1339
1339
1340 job_array_regexp = CRegExp('queue\W+\$')
1340 job_array_regexp = CRegExp('queue\W+\$')
1341 job_array_template = Unicode('queue {n}')
1341 job_array_template = Unicode('queue {n}')
1342
1342
1343
1343
1344 def _insert_job_array_in_script(self):
1344 def _insert_job_array_in_script(self):
1345 """Inserts a job array if required into the batch script.
1345 """Inserts a job array if required into the batch script.
1346 """
1346 """
1347 if not self.job_array_regexp.search(self.batch_template):
1347 if not self.job_array_regexp.search(self.batch_template):
1348 self.log.debug("adding job array settings to batch script")
1348 self.log.debug("adding job array settings to batch script")
1349 #HTCondor requires that the job array goes at the bottom of the script
1349 #HTCondor requires that the job array goes at the bottom of the script
1350 self.batch_template = '\n'.join([self.batch_template,
1350 self.batch_template = '\n'.join([self.batch_template,
1351 self.job_array_template])
1351 self.job_array_template])
1352
1352
1353 def _insert_queue_in_script(self):
1353 def _insert_queue_in_script(self):
1354 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1354 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1355 specified in the script.
1355 specified in the script.
1356 """
1356 """
1357 pass
1357 pass
1358
1358
1359
1359
1360 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1360 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1361 """Launch a controller using HTCondor."""
1361 """Launch a controller using HTCondor."""
1362
1362
1363 batch_file_name = Unicode(u'htcondor_controller', config=True,
1363 batch_file_name = Unicode(u'htcondor_controller', config=True,
1364 help="batch file name for the controller job.")
1364 help="batch file name for the controller job.")
1365 default_template = Unicode(r"""
1365 default_template = Unicode(r"""
1366 universe = vanilla
1366 universe = vanilla
1367 executable = ipcontroller
1367 executable = ipcontroller
1368 # by default we expect a shared file system
1368 # by default we expect a shared file system
1369 transfer_executable = False
1369 transfer_executable = False
1370 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1370 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1371 """)
1371 """)
1372
1372
1373 def start(self):
1373 def start(self):
1374 """Start the controller by profile or profile_dir."""
1374 """Start the controller by profile or profile_dir."""
1375 return super(HTCondorControllerLauncher, self).start(1)
1375 return super(HTCondorControllerLauncher, self).start(1)
1376
1376
1377
1377
1378 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1378 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1379 """Launch Engines using HTCondor"""
1379 """Launch Engines using HTCondor"""
1380 batch_file_name = Unicode(u'htcondor_engines', config=True,
1380 batch_file_name = Unicode(u'htcondor_engines', config=True,
1381 help="batch file name for the engine(s) job.")
1381 help="batch file name for the engine(s) job.")
1382 default_template = Unicode("""
1382 default_template = Unicode("""
1383 universe = vanilla
1383 universe = vanilla
1384 executable = ipengine
1384 executable = ipengine
1385 # by default we expect a shared file system
1385 # by default we expect a shared file system
1386 transfer_executable = False
1386 transfer_executable = False
1387 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1387 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1388 """)
1388 """)
1389
1389
1390
1390
1391 #-----------------------------------------------------------------------------
1391 #-----------------------------------------------------------------------------
1392 # A launcher for ipcluster itself!
1392 # A launcher for ipcluster itself!
1393 #-----------------------------------------------------------------------------
1393 #-----------------------------------------------------------------------------
1394
1394
1395
1395
1396 class IPClusterLauncher(LocalProcessLauncher):
1396 class IPClusterLauncher(LocalProcessLauncher):
1397 """Launch the ipcluster program in an external process."""
1397 """Launch the ipcluster program in an external process."""
1398
1398
1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1399 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1400 help="Popen command for ipcluster")
1400 help="Popen command for ipcluster")
1401 ipcluster_args = List(
1401 ipcluster_args = List(
1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1402 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1403 help="Command line arguments to pass to ipcluster.")
1403 help="Command line arguments to pass to ipcluster.")
1404 ipcluster_subcommand = Unicode('start')
1404 ipcluster_subcommand = Unicode('start')
1405 profile = Unicode('default')
1405 profile = Unicode('default')
1406 n = Integer(2)
1406 n = Integer(2)
1407
1407
1408 def find_args(self):
1408 def find_args(self):
1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1409 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1410 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1411 self.ipcluster_args
1411 self.ipcluster_args
1412
1412
1413 def start(self):
1413 def start(self):
1414 return super(IPClusterLauncher, self).start()
1414 return super(IPClusterLauncher, self).start()
1415
1415
1416 #-----------------------------------------------------------------------------
1416 #-----------------------------------------------------------------------------
1417 # Collections of launchers
1417 # Collections of launchers
1418 #-----------------------------------------------------------------------------
1418 #-----------------------------------------------------------------------------
1419
1419
1420 local_launchers = [
1420 local_launchers = [
1421 LocalControllerLauncher,
1421 LocalControllerLauncher,
1422 LocalEngineLauncher,
1422 LocalEngineLauncher,
1423 LocalEngineSetLauncher,
1423 LocalEngineSetLauncher,
1424 ]
1424 ]
1425 mpi_launchers = [
1425 mpi_launchers = [
1426 MPILauncher,
1426 MPILauncher,
1427 MPIControllerLauncher,
1427 MPIControllerLauncher,
1428 MPIEngineSetLauncher,
1428 MPIEngineSetLauncher,
1429 ]
1429 ]
1430 ssh_launchers = [
1430 ssh_launchers = [
1431 SSHLauncher,
1431 SSHLauncher,
1432 SSHControllerLauncher,
1432 SSHControllerLauncher,
1433 SSHEngineLauncher,
1433 SSHEngineLauncher,
1434 SSHEngineSetLauncher,
1434 SSHEngineSetLauncher,
1435 SSHProxyEngineSetLauncher,
1435 SSHProxyEngineSetLauncher,
1436 ]
1436 ]
1437 winhpc_launchers = [
1437 winhpc_launchers = [
1438 WindowsHPCLauncher,
1438 WindowsHPCLauncher,
1439 WindowsHPCControllerLauncher,
1439 WindowsHPCControllerLauncher,
1440 WindowsHPCEngineSetLauncher,
1440 WindowsHPCEngineSetLauncher,
1441 ]
1441 ]
1442 pbs_launchers = [
1442 pbs_launchers = [
1443 PBSLauncher,
1443 PBSLauncher,
1444 PBSControllerLauncher,
1444 PBSControllerLauncher,
1445 PBSEngineSetLauncher,
1445 PBSEngineSetLauncher,
1446 ]
1446 ]
1447 sge_launchers = [
1447 sge_launchers = [
1448 SGELauncher,
1448 SGELauncher,
1449 SGEControllerLauncher,
1449 SGEControllerLauncher,
1450 SGEEngineSetLauncher,
1450 SGEEngineSetLauncher,
1451 ]
1451 ]
1452 lsf_launchers = [
1452 lsf_launchers = [
1453 LSFLauncher,
1453 LSFLauncher,
1454 LSFControllerLauncher,
1454 LSFControllerLauncher,
1455 LSFEngineSetLauncher,
1455 LSFEngineSetLauncher,
1456 ]
1456 ]
1457 htcondor_launchers = [
1457 htcondor_launchers = [
1458 HTCondorLauncher,
1458 HTCondorLauncher,
1459 HTCondorControllerLauncher,
1459 HTCondorControllerLauncher,
1460 HTCondorEngineSetLauncher,
1460 HTCondorEngineSetLauncher,
1461 ]
1461 ]
1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1462 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1463 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
1463 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now