##// END OF EJS Templates
Move a few PBS launcher messages from info to debug
MinRK -
Show More
@@ -1,1221 +1,1221 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
60 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
62 )
63 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65
65
66 from .win32support import forward_read_events
66 from .win32support import forward_read_events
67
67
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
69
70 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
73 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
79 ))
79 ))
80
80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
83 ))
83 ))
84
84
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
87 ))
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # Base launchers and errors
90 # Base launchers and errors
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92
92
93
93
94 class LauncherError(Exception):
94 class LauncherError(Exception):
95 pass
95 pass
96
96
97
97
98 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
99 pass
99 pass
100
100
101
101
102 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
103 pass
103 pass
104
104
105
105
106 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
108
108
109 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
115 # the work_dir option.
116 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
118
119 start_data = Any()
119 start_data = Any()
120 stop_data = Any()
120 stop_data = Any()
121
121
122 def _loop_default(self):
122 def _loop_default(self):
123 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
124
124
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
128 self.stop_callbacks = []
129 self.start_data = None
129 self.start_data = None
130 self.stop_data = None
130 self.stop_data = None
131
131
132 @property
132 @property
133 def args(self):
133 def args(self):
134 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
135
135
136 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
137 will be the process name.
138 """
138 """
139 return self.find_args()
139 return self.find_args()
140
140
141 def find_args(self):
141 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
143
143
144 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
145 """
145 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
147
147
148 @property
148 @property
149 def arg_str(self):
149 def arg_str(self):
150 """The string form of the program arguments."""
150 """The string form of the program arguments."""
151 return ' '.join(self.args)
151 return ' '.join(self.args)
152
152
153 @property
153 @property
154 def running(self):
154 def running(self):
155 """Am I running."""
155 """Am I running."""
156 if self.state == 'running':
156 if self.state == 'running':
157 return True
157 return True
158 else:
158 else:
159 return False
159 return False
160
160
161 def start(self):
161 def start(self):
162 """Start the process."""
162 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
164
164
165 def stop(self):
165 def stop(self):
166 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
167
167
168 This method will return None immediately.
168 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
170 """
170 """
171 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
172
172
173 def on_stop(self, f):
173 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
175 when the process actually finishes.
176 """
176 """
177 if self.state=='after':
177 if self.state=='after':
178 return f(self.stop_data)
178 return f(self.stop_data)
179 else:
179 else:
180 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
181
181
182 def notify_start(self, data):
182 def notify_start(self, data):
183 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
184
184
185 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
187 """
187 """
188
188
189 self.log.debug('Process %r started: %r', self.args[0], data)
189 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.start_data = data
190 self.start_data = data
191 self.state = 'running'
191 self.state = 'running'
192 return data
192 return data
193
193
194 def notify_stop(self, data):
194 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
196
196
197 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
199
199
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.stop_data = data
201 self.stop_data = data
202 self.state = 'after'
202 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
205 d(data)
205 d(data)
206 return data
206 return data
207
207
208 def signal(self, sig):
208 def signal(self, sig):
209 """Signal the process.
209 """Signal the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218 class ClusterAppMixin(HasTraits):
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
220 cluster_args = List([])
221 profile_dir=Unicode('')
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
224 self.cluster_args = []
225 if self.profile_dir:
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
229 _cluster_id_changed = _profile_dir_changed
230
230
231 class ControllerMixin(ClusterAppMixin):
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
236 help="""command-line args to pass to ipcontroller""")
237
237
238 class EngineMixin(ClusterAppMixin):
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
243 help="command-line arguments to pass to ipengine"
244 )
244 )
245
245
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247 # Local process launchers
247 # Local process launchers
248 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
249
249
250
250
251 class LocalProcessLauncher(BaseLauncher):
251 class LocalProcessLauncher(BaseLauncher):
252 """Start and stop an external process in an asynchronous manner.
252 """Start and stop an external process in an asynchronous manner.
253
253
254 This will launch the external process with a working directory of
254 This will launch the external process with a working directory of
255 ``self.work_dir``.
255 ``self.work_dir``.
256 """
256 """
257
257
258 # This is used to to construct self.args, which is passed to
258 # This is used to to construct self.args, which is passed to
259 # spawnProcess.
259 # spawnProcess.
260 cmd_and_args = List([])
260 cmd_and_args = List([])
261 poll_frequency = Integer(100) # in ms
261 poll_frequency = Integer(100) # in ms
262
262
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 super(LocalProcessLauncher, self).__init__(
264 super(LocalProcessLauncher, self).__init__(
265 work_dir=work_dir, config=config, **kwargs
265 work_dir=work_dir, config=config, **kwargs
266 )
266 )
267 self.process = None
267 self.process = None
268 self.poller = None
268 self.poller = None
269
269
270 def find_args(self):
270 def find_args(self):
271 return self.cmd_and_args
271 return self.cmd_and_args
272
272
273 def start(self):
273 def start(self):
274 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
274 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
275 if self.state == 'before':
275 if self.state == 'before':
276 self.process = Popen(self.args,
276 self.process = Popen(self.args,
277 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 stdout=PIPE,stderr=PIPE,stdin=PIPE,
278 env=os.environ,
278 env=os.environ,
279 cwd=self.work_dir
279 cwd=self.work_dir
280 )
280 )
281 if WINDOWS:
281 if WINDOWS:
282 self.stdout = forward_read_events(self.process.stdout)
282 self.stdout = forward_read_events(self.process.stdout)
283 self.stderr = forward_read_events(self.process.stderr)
283 self.stderr = forward_read_events(self.process.stderr)
284 else:
284 else:
285 self.stdout = self.process.stdout.fileno()
285 self.stdout = self.process.stdout.fileno()
286 self.stderr = self.process.stderr.fileno()
286 self.stderr = self.process.stderr.fileno()
287 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
287 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
288 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
288 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
289 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
289 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
290 self.poller.start()
290 self.poller.start()
291 self.notify_start(self.process.pid)
291 self.notify_start(self.process.pid)
292 else:
292 else:
293 s = 'The process was already started and has state: %r' % self.state
293 s = 'The process was already started and has state: %r' % self.state
294 raise ProcessStateError(s)
294 raise ProcessStateError(s)
295
295
296 def stop(self):
296 def stop(self):
297 return self.interrupt_then_kill()
297 return self.interrupt_then_kill()
298
298
299 def signal(self, sig):
299 def signal(self, sig):
300 if self.state == 'running':
300 if self.state == 'running':
301 if WINDOWS and sig != SIGINT:
301 if WINDOWS and sig != SIGINT:
302 # use Windows tree-kill for better child cleanup
302 # use Windows tree-kill for better child cleanup
303 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
303 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
304 else:
304 else:
305 self.process.send_signal(sig)
305 self.process.send_signal(sig)
306
306
307 def interrupt_then_kill(self, delay=2.0):
307 def interrupt_then_kill(self, delay=2.0):
308 """Send INT, wait a delay and then send KILL."""
308 """Send INT, wait a delay and then send KILL."""
309 try:
309 try:
310 self.signal(SIGINT)
310 self.signal(SIGINT)
311 except Exception:
311 except Exception:
312 self.log.debug("interrupt failed")
312 self.log.debug("interrupt failed")
313 pass
313 pass
314 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
314 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
315 self.killer.start()
315 self.killer.start()
316
316
317 # callbacks, etc:
317 # callbacks, etc:
318
318
319 def handle_stdout(self, fd, events):
319 def handle_stdout(self, fd, events):
320 if WINDOWS:
320 if WINDOWS:
321 line = self.stdout.recv()
321 line = self.stdout.recv()
322 else:
322 else:
323 line = self.process.stdout.readline()
323 line = self.process.stdout.readline()
324 # a stopped process will be readable but return empty strings
324 # a stopped process will be readable but return empty strings
325 if line:
325 if line:
326 self.log.debug(line[:-1])
326 self.log.debug(line[:-1])
327 else:
327 else:
328 self.poll()
328 self.poll()
329
329
330 def handle_stderr(self, fd, events):
330 def handle_stderr(self, fd, events):
331 if WINDOWS:
331 if WINDOWS:
332 line = self.stderr.recv()
332 line = self.stderr.recv()
333 else:
333 else:
334 line = self.process.stderr.readline()
334 line = self.process.stderr.readline()
335 # a stopped process will be readable but return empty strings
335 # a stopped process will be readable but return empty strings
336 if line:
336 if line:
337 self.log.debug(line[:-1])
337 self.log.debug(line[:-1])
338 else:
338 else:
339 self.poll()
339 self.poll()
340
340
341 def poll(self):
341 def poll(self):
342 status = self.process.poll()
342 status = self.process.poll()
343 if status is not None:
343 if status is not None:
344 self.poller.stop()
344 self.poller.stop()
345 self.loop.remove_handler(self.stdout)
345 self.loop.remove_handler(self.stdout)
346 self.loop.remove_handler(self.stderr)
346 self.loop.remove_handler(self.stderr)
347 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
347 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
348 return status
348 return status
349
349
350 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
350 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
351 """Launch a controller as a regular external process."""
351 """Launch a controller as a regular external process."""
352
352
353 def find_args(self):
353 def find_args(self):
354 return self.controller_cmd + self.cluster_args + self.controller_args
354 return self.controller_cmd + self.cluster_args + self.controller_args
355
355
356 def start(self):
356 def start(self):
357 """Start the controller by profile_dir."""
357 """Start the controller by profile_dir."""
358 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
359
359
360
360
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
363
363
364 def find_args(self):
364 def find_args(self):
365 return self.engine_cmd + self.cluster_args + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
366
366
367
367
368 class LocalEngineSetLauncher(LocalEngineLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
370
370
371 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
372 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
373 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
374 process flood when starting many engines."""
374 process flood when starting many engines."""
375 )
375 )
376
376
377 # launcher class
377 # launcher class
378 launcher_class = LocalEngineLauncher
378 launcher_class = LocalEngineLauncher
379
379
380 launchers = Dict()
380 launchers = Dict()
381 stop_data = Dict()
381 stop_data = Dict()
382
382
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 super(LocalEngineSetLauncher, self).__init__(
384 super(LocalEngineSetLauncher, self).__init__(
385 work_dir=work_dir, config=config, **kwargs
385 work_dir=work_dir, config=config, **kwargs
386 )
386 )
387 self.stop_data = {}
387 self.stop_data = {}
388
388
389 def start(self, n):
389 def start(self, n):
390 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
391 dlist = []
391 dlist = []
392 for i in range(n):
392 for i in range(n):
393 if i > 0:
393 if i > 0:
394 time.sleep(self.delay)
394 time.sleep(self.delay)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
397 )
398
398
399 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
402 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
403 d = el.start()
403 d = el.start()
404 self.launchers[i] = el
404 self.launchers[i] = el
405 dlist.append(d)
405 dlist.append(d)
406 self.notify_start(dlist)
406 self.notify_start(dlist)
407 return dlist
407 return dlist
408
408
409 def find_args(self):
409 def find_args(self):
410 return ['engine set']
410 return ['engine set']
411
411
412 def signal(self, sig):
412 def signal(self, sig):
413 dlist = []
413 dlist = []
414 for el in self.launchers.itervalues():
414 for el in self.launchers.itervalues():
415 d = el.signal(sig)
415 d = el.signal(sig)
416 dlist.append(d)
416 dlist.append(d)
417 return dlist
417 return dlist
418
418
419 def interrupt_then_kill(self, delay=1.0):
419 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
420 dlist = []
421 for el in self.launchers.itervalues():
421 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
422 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
423 dlist.append(d)
424 return dlist
424 return dlist
425
425
426 def stop(self):
426 def stop(self):
427 return self.interrupt_then_kill()
427 return self.interrupt_then_kill()
428
428
429 def _notice_engine_stopped(self, data):
429 def _notice_engine_stopped(self, data):
430 pid = data['pid']
430 pid = data['pid']
431 for idx,el in self.launchers.iteritems():
431 for idx,el in self.launchers.iteritems():
432 if el.process.pid == pid:
432 if el.process.pid == pid:
433 break
433 break
434 self.launchers.pop(idx)
434 self.launchers.pop(idx)
435 self.stop_data[idx] = data
435 self.stop_data[idx] = data
436 if not self.launchers:
436 if not self.launchers:
437 self.notify_stop(self.stop_data)
437 self.notify_stop(self.stop_data)
438
438
439
439
440 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
441 # MPI launchers
441 # MPI launchers
442 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
443
443
444
444
445 class MPILauncher(LocalProcessLauncher):
445 class MPILauncher(LocalProcessLauncher):
446 """Launch an external process using mpiexec."""
446 """Launch an external process using mpiexec."""
447
447
448 mpi_cmd = List(['mpiexec'], config=True,
448 mpi_cmd = List(['mpiexec'], config=True,
449 help="The mpiexec command to use in starting the process."
449 help="The mpiexec command to use in starting the process."
450 )
450 )
451 mpi_args = List([], config=True,
451 mpi_args = List([], config=True,
452 help="The command line arguments to pass to mpiexec."
452 help="The command line arguments to pass to mpiexec."
453 )
453 )
454 program = List(['date'],
454 program = List(['date'],
455 help="The program to start via mpiexec.")
455 help="The program to start via mpiexec.")
456 program_args = List([],
456 program_args = List([],
457 help="The command line argument to the program."
457 help="The command line argument to the program."
458 )
458 )
459 n = Integer(1)
459 n = Integer(1)
460
460
461 def __init__(self, *args, **kwargs):
461 def __init__(self, *args, **kwargs):
462 # deprecation for old MPIExec names:
462 # deprecation for old MPIExec names:
463 config = kwargs.get('config', {})
463 config = kwargs.get('config', {})
464 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
464 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
465 deprecated = config.get(oldname)
465 deprecated = config.get(oldname)
466 if deprecated:
466 if deprecated:
467 newname = oldname.replace('MPIExec', 'MPI')
467 newname = oldname.replace('MPIExec', 'MPI')
468 config[newname].update(deprecated)
468 config[newname].update(deprecated)
469 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
469 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
470
470
471 super(MPILauncher, self).__init__(*args, **kwargs)
471 super(MPILauncher, self).__init__(*args, **kwargs)
472
472
473 def find_args(self):
473 def find_args(self):
474 """Build self.args using all the fields."""
474 """Build self.args using all the fields."""
475 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
475 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
476 self.program + self.program_args
476 self.program + self.program_args
477
477
478 def start(self, n):
478 def start(self, n):
479 """Start n instances of the program using mpiexec."""
479 """Start n instances of the program using mpiexec."""
480 self.n = n
480 self.n = n
481 return super(MPILauncher, self).start()
481 return super(MPILauncher, self).start()
482
482
483
483
484 class MPIControllerLauncher(MPILauncher, ControllerMixin):
484 class MPIControllerLauncher(MPILauncher, ControllerMixin):
485 """Launch a controller using mpiexec."""
485 """Launch a controller using mpiexec."""
486
486
487 # alias back to *non-configurable* program[_args] for use in find_args()
487 # alias back to *non-configurable* program[_args] for use in find_args()
488 # this way all Controller/EngineSetLaunchers have the same form, rather
488 # this way all Controller/EngineSetLaunchers have the same form, rather
489 # than *some* having `program_args` and others `controller_args`
489 # than *some* having `program_args` and others `controller_args`
490 @property
490 @property
491 def program(self):
491 def program(self):
492 return self.controller_cmd
492 return self.controller_cmd
493
493
494 @property
494 @property
495 def program_args(self):
495 def program_args(self):
496 return self.cluster_args + self.controller_args
496 return self.cluster_args + self.controller_args
497
497
498 def start(self):
498 def start(self):
499 """Start the controller by profile_dir."""
499 """Start the controller by profile_dir."""
500 return super(MPIControllerLauncher, self).start(1)
500 return super(MPIControllerLauncher, self).start(1)
501
501
502
502
503 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
503 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
504 """Launch engines using mpiexec"""
504 """Launch engines using mpiexec"""
505
505
506 # alias back to *non-configurable* program[_args] for use in find_args()
506 # alias back to *non-configurable* program[_args] for use in find_args()
507 # this way all Controller/EngineSetLaunchers have the same form, rather
507 # this way all Controller/EngineSetLaunchers have the same form, rather
508 # than *some* having `program_args` and others `controller_args`
508 # than *some* having `program_args` and others `controller_args`
509 @property
509 @property
510 def program(self):
510 def program(self):
511 return self.engine_cmd
511 return self.engine_cmd
512
512
513 @property
513 @property
514 def program_args(self):
514 def program_args(self):
515 return self.cluster_args + self.engine_args
515 return self.cluster_args + self.engine_args
516
516
517 def start(self, n):
517 def start(self, n):
518 """Start n engines by profile or profile_dir."""
518 """Start n engines by profile or profile_dir."""
519 self.n = n
519 self.n = n
520 return super(MPIEngineSetLauncher, self).start(n)
520 return super(MPIEngineSetLauncher, self).start(n)
521
521
522 # deprecated MPIExec names
522 # deprecated MPIExec names
523 class DeprecatedMPILauncher(object):
523 class DeprecatedMPILauncher(object):
524 def warn(self):
524 def warn(self):
525 oldname = self.__class__.__name__
525 oldname = self.__class__.__name__
526 newname = oldname.replace('MPIExec', 'MPI')
526 newname = oldname.replace('MPIExec', 'MPI')
527 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
527 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
528
528
529 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
529 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
530 """Deprecated, use MPILauncher"""
530 """Deprecated, use MPILauncher"""
531 def __init__(self, *args, **kwargs):
531 def __init__(self, *args, **kwargs):
532 super(MPIExecLauncher, self).__init__(*args, **kwargs)
532 super(MPIExecLauncher, self).__init__(*args, **kwargs)
533 self.warn()
533 self.warn()
534
534
535 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
535 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
536 """Deprecated, use MPIControllerLauncher"""
536 """Deprecated, use MPIControllerLauncher"""
537 def __init__(self, *args, **kwargs):
537 def __init__(self, *args, **kwargs):
538 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
538 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
539 self.warn()
539 self.warn()
540
540
541 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
541 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
542 """Deprecated, use MPIEngineSetLauncher"""
542 """Deprecated, use MPIEngineSetLauncher"""
543 def __init__(self, *args, **kwargs):
543 def __init__(self, *args, **kwargs):
544 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
544 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
545 self.warn()
545 self.warn()
546
546
547
547
548 #-----------------------------------------------------------------------------
548 #-----------------------------------------------------------------------------
549 # SSH launchers
549 # SSH launchers
550 #-----------------------------------------------------------------------------
550 #-----------------------------------------------------------------------------
551
551
552 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
552 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
553
553
554 class SSHLauncher(LocalProcessLauncher):
554 class SSHLauncher(LocalProcessLauncher):
555 """A minimal launcher for ssh.
555 """A minimal launcher for ssh.
556
556
557 To be useful this will probably have to be extended to use the ``sshx``
557 To be useful this will probably have to be extended to use the ``sshx``
558 idea for environment variables. There could be other things this needs
558 idea for environment variables. There could be other things this needs
559 as well.
559 as well.
560 """
560 """
561
561
562 ssh_cmd = List(['ssh'], config=True,
562 ssh_cmd = List(['ssh'], config=True,
563 help="command for starting ssh")
563 help="command for starting ssh")
564 ssh_args = List(['-tt'], config=True,
564 ssh_args = List(['-tt'], config=True,
565 help="args to pass to ssh")
565 help="args to pass to ssh")
566 program = List(['date'],
566 program = List(['date'],
567 help="Program to launch via ssh")
567 help="Program to launch via ssh")
568 program_args = List([],
568 program_args = List([],
569 help="args to pass to remote program")
569 help="args to pass to remote program")
570 hostname = Unicode('', config=True,
570 hostname = Unicode('', config=True,
571 help="hostname on which to launch the program")
571 help="hostname on which to launch the program")
572 user = Unicode('', config=True,
572 user = Unicode('', config=True,
573 help="username for ssh")
573 help="username for ssh")
574 location = Unicode('', config=True,
574 location = Unicode('', config=True,
575 help="user@hostname location for ssh in one setting")
575 help="user@hostname location for ssh in one setting")
576
576
577 def _hostname_changed(self, name, old, new):
577 def _hostname_changed(self, name, old, new):
578 if self.user:
578 if self.user:
579 self.location = u'%s@%s' % (self.user, new)
579 self.location = u'%s@%s' % (self.user, new)
580 else:
580 else:
581 self.location = new
581 self.location = new
582
582
583 def _user_changed(self, name, old, new):
583 def _user_changed(self, name, old, new):
584 self.location = u'%s@%s' % (new, self.hostname)
584 self.location = u'%s@%s' % (new, self.hostname)
585
585
586 def find_args(self):
586 def find_args(self):
587 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 self.program + self.program_args
588 self.program + self.program_args
589
589
590 def start(self, hostname=None, user=None):
590 def start(self, hostname=None, user=None):
591 if hostname is not None:
591 if hostname is not None:
592 self.hostname = hostname
592 self.hostname = hostname
593 if user is not None:
593 if user is not None:
594 self.user = user
594 self.user = user
595
595
596 return super(SSHLauncher, self).start()
596 return super(SSHLauncher, self).start()
597
597
598 def signal(self, sig):
598 def signal(self, sig):
599 if self.state == 'running':
599 if self.state == 'running':
600 # send escaped ssh connection-closer
600 # send escaped ssh connection-closer
601 self.process.stdin.write('~.')
601 self.process.stdin.write('~.')
602 self.process.stdin.flush()
602 self.process.stdin.flush()
603
603
604
604
605
605
606 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
606 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
607
607
608 # alias back to *non-configurable* program[_args] for use in find_args()
608 # alias back to *non-configurable* program[_args] for use in find_args()
609 # this way all Controller/EngineSetLaunchers have the same form, rather
609 # this way all Controller/EngineSetLaunchers have the same form, rather
610 # than *some* having `program_args` and others `controller_args`
610 # than *some* having `program_args` and others `controller_args`
611 @property
611 @property
612 def program(self):
612 def program(self):
613 return self.controller_cmd
613 return self.controller_cmd
614
614
615 @property
615 @property
616 def program_args(self):
616 def program_args(self):
617 return self.cluster_args + self.controller_args
617 return self.cluster_args + self.controller_args
618
618
619
619
620 class SSHEngineLauncher(SSHLauncher, EngineMixin):
620 class SSHEngineLauncher(SSHLauncher, EngineMixin):
621
621
622 # alias back to *non-configurable* program[_args] for use in find_args()
622 # alias back to *non-configurable* program[_args] for use in find_args()
623 # this way all Controller/EngineSetLaunchers have the same form, rather
623 # this way all Controller/EngineSetLaunchers have the same form, rather
624 # than *some* having `program_args` and others `controller_args`
624 # than *some* having `program_args` and others `controller_args`
625 @property
625 @property
626 def program(self):
626 def program(self):
627 return self.engine_cmd
627 return self.engine_cmd
628
628
629 @property
629 @property
630 def program_args(self):
630 def program_args(self):
631 return self.cluster_args + self.engine_args
631 return self.cluster_args + self.engine_args
632
632
633
633
634 class SSHEngineSetLauncher(LocalEngineSetLauncher):
634 class SSHEngineSetLauncher(LocalEngineSetLauncher):
635 launcher_class = SSHEngineLauncher
635 launcher_class = SSHEngineLauncher
636 engines = Dict(config=True,
636 engines = Dict(config=True,
637 help="""dict of engines to launch. This is a dict by hostname of ints,
637 help="""dict of engines to launch. This is a dict by hostname of ints,
638 corresponding to the number of engines to start on that host.""")
638 corresponding to the number of engines to start on that host.""")
639
639
640 @property
640 @property
641 def engine_count(self):
641 def engine_count(self):
642 """determine engine count from `engines` dict"""
642 """determine engine count from `engines` dict"""
643 count = 0
643 count = 0
644 for n in self.engines.itervalues():
644 for n in self.engines.itervalues():
645 if isinstance(n, (tuple,list)):
645 if isinstance(n, (tuple,list)):
646 n,args = n
646 n,args = n
647 count += n
647 count += n
648 return count
648 return count
649
649
650 def start(self, n):
650 def start(self, n):
651 """Start engines by profile or profile_dir.
651 """Start engines by profile or profile_dir.
652 `n` is ignored, and the `engines` config property is used instead.
652 `n` is ignored, and the `engines` config property is used instead.
653 """
653 """
654
654
655 dlist = []
655 dlist = []
656 for host, n in self.engines.iteritems():
656 for host, n in self.engines.iteritems():
657 if isinstance(n, (tuple, list)):
657 if isinstance(n, (tuple, list)):
658 n, args = n
658 n, args = n
659 else:
659 else:
660 args = copy.deepcopy(self.engine_args)
660 args = copy.deepcopy(self.engine_args)
661
661
662 if '@' in host:
662 if '@' in host:
663 user,host = host.split('@',1)
663 user,host = host.split('@',1)
664 else:
664 else:
665 user=None
665 user=None
666 for i in range(n):
666 for i in range(n):
667 if i > 0:
667 if i > 0:
668 time.sleep(self.delay)
668 time.sleep(self.delay)
669 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
669 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
670 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
670 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
671 )
671 )
672
672
673 # Copy the engine args over to each engine launcher.
673 # Copy the engine args over to each engine launcher.
674 el.engine_cmd = self.engine_cmd
674 el.engine_cmd = self.engine_cmd
675 el.engine_args = args
675 el.engine_args = args
676 el.on_stop(self._notice_engine_stopped)
676 el.on_stop(self._notice_engine_stopped)
677 d = el.start(user=user, hostname=host)
677 d = el.start(user=user, hostname=host)
678 self.launchers[ "%s/%i" % (host,i) ] = el
678 self.launchers[ "%s/%i" % (host,i) ] = el
679 dlist.append(d)
679 dlist.append(d)
680 self.notify_start(dlist)
680 self.notify_start(dlist)
681 return dlist
681 return dlist
682
682
683
683
684
684
685 #-----------------------------------------------------------------------------
685 #-----------------------------------------------------------------------------
686 # Windows HPC Server 2008 scheduler launchers
686 # Windows HPC Server 2008 scheduler launchers
687 #-----------------------------------------------------------------------------
687 #-----------------------------------------------------------------------------
688
688
689
689
690 # This is only used on Windows.
690 # This is only used on Windows.
691 def find_job_cmd():
691 def find_job_cmd():
692 if WINDOWS:
692 if WINDOWS:
693 try:
693 try:
694 return find_cmd('job')
694 return find_cmd('job')
695 except (FindCmdError, ImportError):
695 except (FindCmdError, ImportError):
696 # ImportError will be raised if win32api is not installed
696 # ImportError will be raised if win32api is not installed
697 return 'job'
697 return 'job'
698 else:
698 else:
699 return 'job'
699 return 'job'
700
700
701
701
702 class WindowsHPCLauncher(BaseLauncher):
702 class WindowsHPCLauncher(BaseLauncher):
703
703
704 job_id_regexp = Unicode(r'\d+', config=True,
704 job_id_regexp = Unicode(r'\d+', config=True,
705 help="""A regular expression used to get the job id from the output of the
705 help="""A regular expression used to get the job id from the output of the
706 submit_command. """
706 submit_command. """
707 )
707 )
708 job_file_name = Unicode(u'ipython_job.xml', config=True,
708 job_file_name = Unicode(u'ipython_job.xml', config=True,
709 help="The filename of the instantiated job script.")
709 help="The filename of the instantiated job script.")
710 # The full path to the instantiated job script. This gets made dynamically
710 # The full path to the instantiated job script. This gets made dynamically
711 # by combining the work_dir with the job_file_name.
711 # by combining the work_dir with the job_file_name.
712 job_file = Unicode(u'')
712 job_file = Unicode(u'')
713 scheduler = Unicode('', config=True,
713 scheduler = Unicode('', config=True,
714 help="The hostname of the scheduler to submit the job to.")
714 help="The hostname of the scheduler to submit the job to.")
715 job_cmd = Unicode(find_job_cmd(), config=True,
715 job_cmd = Unicode(find_job_cmd(), config=True,
716 help="The command for submitting jobs.")
716 help="The command for submitting jobs.")
717
717
718 def __init__(self, work_dir=u'.', config=None, **kwargs):
718 def __init__(self, work_dir=u'.', config=None, **kwargs):
719 super(WindowsHPCLauncher, self).__init__(
719 super(WindowsHPCLauncher, self).__init__(
720 work_dir=work_dir, config=config, **kwargs
720 work_dir=work_dir, config=config, **kwargs
721 )
721 )
722
722
723 @property
723 @property
724 def job_file(self):
724 def job_file(self):
725 return os.path.join(self.work_dir, self.job_file_name)
725 return os.path.join(self.work_dir, self.job_file_name)
726
726
727 def write_job_file(self, n):
727 def write_job_file(self, n):
728 raise NotImplementedError("Implement write_job_file in a subclass.")
728 raise NotImplementedError("Implement write_job_file in a subclass.")
729
729
730 def find_args(self):
730 def find_args(self):
731 return [u'job.exe']
731 return [u'job.exe']
732
732
733 def parse_job_id(self, output):
733 def parse_job_id(self, output):
734 """Take the output of the submit command and return the job id."""
734 """Take the output of the submit command and return the job id."""
735 m = re.search(self.job_id_regexp, output)
735 m = re.search(self.job_id_regexp, output)
736 if m is not None:
736 if m is not None:
737 job_id = m.group()
737 job_id = m.group()
738 else:
738 else:
739 raise LauncherError("Job id couldn't be determined: %s" % output)
739 raise LauncherError("Job id couldn't be determined: %s" % output)
740 self.job_id = job_id
740 self.job_id = job_id
741 self.log.info('Job started with job id: %r' % job_id)
741 self.log.info('Job started with id: %r', job_id)
742 return job_id
742 return job_id
743
743
744 def start(self, n):
744 def start(self, n):
745 """Start n copies of the process using the Win HPC job scheduler."""
745 """Start n copies of the process using the Win HPC job scheduler."""
746 self.write_job_file(n)
746 self.write_job_file(n)
747 args = [
747 args = [
748 'submit',
748 'submit',
749 '/jobfile:%s' % self.job_file,
749 '/jobfile:%s' % self.job_file,
750 '/scheduler:%s' % self.scheduler
750 '/scheduler:%s' % self.scheduler
751 ]
751 ]
752 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
752 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
753
753
754 output = check_output([self.job_cmd]+args,
754 output = check_output([self.job_cmd]+args,
755 env=os.environ,
755 env=os.environ,
756 cwd=self.work_dir,
756 cwd=self.work_dir,
757 stderr=STDOUT
757 stderr=STDOUT
758 )
758 )
759 job_id = self.parse_job_id(output)
759 job_id = self.parse_job_id(output)
760 self.notify_start(job_id)
760 self.notify_start(job_id)
761 return job_id
761 return job_id
762
762
763 def stop(self):
763 def stop(self):
764 args = [
764 args = [
765 'cancel',
765 'cancel',
766 self.job_id,
766 self.job_id,
767 '/scheduler:%s' % self.scheduler
767 '/scheduler:%s' % self.scheduler
768 ]
768 ]
769 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
769 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
770 try:
770 try:
771 output = check_output([self.job_cmd]+args,
771 output = check_output([self.job_cmd]+args,
772 env=os.environ,
772 env=os.environ,
773 cwd=self.work_dir,
773 cwd=self.work_dir,
774 stderr=STDOUT
774 stderr=STDOUT
775 )
775 )
776 except:
776 except:
777 output = 'The job already appears to be stoppped: %r' % self.job_id
777 output = 'The job already appears to be stoppped: %r' % self.job_id
778 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
778 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
779 return output
779 return output
780
780
781
781
782 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
782 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
783
783
784 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
784 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
785 help="WinHPC xml job file.")
785 help="WinHPC xml job file.")
786 controller_args = List([], config=False,
786 controller_args = List([], config=False,
787 help="extra args to pass to ipcontroller")
787 help="extra args to pass to ipcontroller")
788
788
789 def write_job_file(self, n):
789 def write_job_file(self, n):
790 job = IPControllerJob(config=self.config)
790 job = IPControllerJob(config=self.config)
791
791
792 t = IPControllerTask(config=self.config)
792 t = IPControllerTask(config=self.config)
793 # The tasks work directory is *not* the actual work directory of
793 # The tasks work directory is *not* the actual work directory of
794 # the controller. It is used as the base path for the stdout/stderr
794 # the controller. It is used as the base path for the stdout/stderr
795 # files that the scheduler redirects to.
795 # files that the scheduler redirects to.
796 t.work_directory = self.profile_dir
796 t.work_directory = self.profile_dir
797 # Add the profile_dir and from self.start().
797 # Add the profile_dir and from self.start().
798 t.controller_args.extend(self.cluster_args)
798 t.controller_args.extend(self.cluster_args)
799 t.controller_args.extend(self.controller_args)
799 t.controller_args.extend(self.controller_args)
800 job.add_task(t)
800 job.add_task(t)
801
801
802 self.log.info("Writing job description file: %s" % self.job_file)
802 self.log.debug("Writing job description file: %s", self.job_file)
803 job.write(self.job_file)
803 job.write(self.job_file)
804
804
805 @property
805 @property
806 def job_file(self):
806 def job_file(self):
807 return os.path.join(self.profile_dir, self.job_file_name)
807 return os.path.join(self.profile_dir, self.job_file_name)
808
808
809 def start(self):
809 def start(self):
810 """Start the controller by profile_dir."""
810 """Start the controller by profile_dir."""
811 return super(WindowsHPCControllerLauncher, self).start(1)
811 return super(WindowsHPCControllerLauncher, self).start(1)
812
812
813
813
814 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
814 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
815
815
816 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
816 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
817 help="jobfile for ipengines job")
817 help="jobfile for ipengines job")
818 engine_args = List([], config=False,
818 engine_args = List([], config=False,
819 help="extra args to pas to ipengine")
819 help="extra args to pas to ipengine")
820
820
821 def write_job_file(self, n):
821 def write_job_file(self, n):
822 job = IPEngineSetJob(config=self.config)
822 job = IPEngineSetJob(config=self.config)
823
823
824 for i in range(n):
824 for i in range(n):
825 t = IPEngineTask(config=self.config)
825 t = IPEngineTask(config=self.config)
826 # The tasks work directory is *not* the actual work directory of
826 # The tasks work directory is *not* the actual work directory of
827 # the engine. It is used as the base path for the stdout/stderr
827 # the engine. It is used as the base path for the stdout/stderr
828 # files that the scheduler redirects to.
828 # files that the scheduler redirects to.
829 t.work_directory = self.profile_dir
829 t.work_directory = self.profile_dir
830 # Add the profile_dir and from self.start().
830 # Add the profile_dir and from self.start().
831 t.engine_args.extend(self.cluster_args)
831 t.engine_args.extend(self.cluster_args)
832 t.engine_args.extend(self.engine_args)
832 t.engine_args.extend(self.engine_args)
833 job.add_task(t)
833 job.add_task(t)
834
834
835 self.log.info("Writing job description file: %s" % self.job_file)
835 self.log.debug("Writing job description file: %s", self.job_file)
836 job.write(self.job_file)
836 job.write(self.job_file)
837
837
838 @property
838 @property
839 def job_file(self):
839 def job_file(self):
840 return os.path.join(self.profile_dir, self.job_file_name)
840 return os.path.join(self.profile_dir, self.job_file_name)
841
841
842 def start(self, n):
842 def start(self, n):
843 """Start the controller by profile_dir."""
843 """Start the controller by profile_dir."""
844 return super(WindowsHPCEngineSetLauncher, self).start(n)
844 return super(WindowsHPCEngineSetLauncher, self).start(n)
845
845
846
846
847 #-----------------------------------------------------------------------------
847 #-----------------------------------------------------------------------------
848 # Batch (PBS) system launchers
848 # Batch (PBS) system launchers
849 #-----------------------------------------------------------------------------
849 #-----------------------------------------------------------------------------
850
850
851 class BatchClusterAppMixin(ClusterAppMixin):
851 class BatchClusterAppMixin(ClusterAppMixin):
852 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
852 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
853 def _profile_dir_changed(self, name, old, new):
853 def _profile_dir_changed(self, name, old, new):
854 self.context[name] = new
854 self.context[name] = new
855 _cluster_id_changed = _profile_dir_changed
855 _cluster_id_changed = _profile_dir_changed
856
856
857 def _profile_dir_default(self):
857 def _profile_dir_default(self):
858 self.context['profile_dir'] = ''
858 self.context['profile_dir'] = ''
859 return ''
859 return ''
860 def _cluster_id_default(self):
860 def _cluster_id_default(self):
861 self.context['cluster_id'] = ''
861 self.context['cluster_id'] = ''
862 return ''
862 return ''
863
863
864
864
865 class BatchSystemLauncher(BaseLauncher):
865 class BatchSystemLauncher(BaseLauncher):
866 """Launch an external process using a batch system.
866 """Launch an external process using a batch system.
867
867
868 This class is designed to work with UNIX batch systems like PBS, LSF,
868 This class is designed to work with UNIX batch systems like PBS, LSF,
869 GridEngine, etc. The overall model is that there are different commands
869 GridEngine, etc. The overall model is that there are different commands
870 like qsub, qdel, etc. that handle the starting and stopping of the process.
870 like qsub, qdel, etc. that handle the starting and stopping of the process.
871
871
872 This class also has the notion of a batch script. The ``batch_template``
872 This class also has the notion of a batch script. The ``batch_template``
873 attribute can be set to a string that is a template for the batch script.
873 attribute can be set to a string that is a template for the batch script.
874 This template is instantiated using string formatting. Thus the template can
874 This template is instantiated using string formatting. Thus the template can
875 use {n} fot the number of instances. Subclasses can add additional variables
875 use {n} fot the number of instances. Subclasses can add additional variables
876 to the template dict.
876 to the template dict.
877 """
877 """
878
878
879 # Subclasses must fill these in. See PBSEngineSet
879 # Subclasses must fill these in. See PBSEngineSet
880 submit_command = List([''], config=True,
880 submit_command = List([''], config=True,
881 help="The name of the command line program used to submit jobs.")
881 help="The name of the command line program used to submit jobs.")
882 delete_command = List([''], config=True,
882 delete_command = List([''], config=True,
883 help="The name of the command line program used to delete jobs.")
883 help="The name of the command line program used to delete jobs.")
884 job_id_regexp = Unicode('', config=True,
884 job_id_regexp = Unicode('', config=True,
885 help="""A regular expression used to get the job id from the output of the
885 help="""A regular expression used to get the job id from the output of the
886 submit_command.""")
886 submit_command.""")
887 batch_template = Unicode('', config=True,
887 batch_template = Unicode('', config=True,
888 help="The string that is the batch script template itself.")
888 help="The string that is the batch script template itself.")
889 batch_template_file = Unicode(u'', config=True,
889 batch_template_file = Unicode(u'', config=True,
890 help="The file that contains the batch template.")
890 help="The file that contains the batch template.")
891 batch_file_name = Unicode(u'batch_script', config=True,
891 batch_file_name = Unicode(u'batch_script', config=True,
892 help="The filename of the instantiated batch script.")
892 help="The filename of the instantiated batch script.")
893 queue = Unicode(u'', config=True,
893 queue = Unicode(u'', config=True,
894 help="The PBS Queue.")
894 help="The PBS Queue.")
895
895
896 def _queue_changed(self, name, old, new):
896 def _queue_changed(self, name, old, new):
897 self.context[name] = new
897 self.context[name] = new
898
898
899 n = Integer(1)
899 n = Integer(1)
900 _n_changed = _queue_changed
900 _n_changed = _queue_changed
901
901
902 # not configurable, override in subclasses
902 # not configurable, override in subclasses
903 # PBS Job Array regex
903 # PBS Job Array regex
904 job_array_regexp = Unicode('')
904 job_array_regexp = Unicode('')
905 job_array_template = Unicode('')
905 job_array_template = Unicode('')
906 # PBS Queue regex
906 # PBS Queue regex
907 queue_regexp = Unicode('')
907 queue_regexp = Unicode('')
908 queue_template = Unicode('')
908 queue_template = Unicode('')
909 # The default batch template, override in subclasses
909 # The default batch template, override in subclasses
910 default_template = Unicode('')
910 default_template = Unicode('')
911 # The full path to the instantiated batch script.
911 # The full path to the instantiated batch script.
912 batch_file = Unicode(u'')
912 batch_file = Unicode(u'')
913 # the format dict used with batch_template:
913 # the format dict used with batch_template:
914 context = Dict()
914 context = Dict()
915 def _context_default(self):
915 def _context_default(self):
916 """load the default context with the default values for the basic keys
916 """load the default context with the default values for the basic keys
917
917
918 because the _trait_changed methods only load the context if they
918 because the _trait_changed methods only load the context if they
919 are set to something other than the default value.
919 are set to something other than the default value.
920 """
920 """
921 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
921 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
922
922
923 # the Formatter instance for rendering the templates:
923 # the Formatter instance for rendering the templates:
924 formatter = Instance(EvalFormatter, (), {})
924 formatter = Instance(EvalFormatter, (), {})
925
925
926
926
927 def find_args(self):
927 def find_args(self):
928 return self.submit_command + [self.batch_file]
928 return self.submit_command + [self.batch_file]
929
929
930 def __init__(self, work_dir=u'.', config=None, **kwargs):
930 def __init__(self, work_dir=u'.', config=None, **kwargs):
931 super(BatchSystemLauncher, self).__init__(
931 super(BatchSystemLauncher, self).__init__(
932 work_dir=work_dir, config=config, **kwargs
932 work_dir=work_dir, config=config, **kwargs
933 )
933 )
934 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
934 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
935
935
936 def parse_job_id(self, output):
936 def parse_job_id(self, output):
937 """Take the output of the submit command and return the job id."""
937 """Take the output of the submit command and return the job id."""
938 m = re.search(self.job_id_regexp, output)
938 m = re.search(self.job_id_regexp, output)
939 if m is not None:
939 if m is not None:
940 job_id = m.group()
940 job_id = m.group()
941 else:
941 else:
942 raise LauncherError("Job id couldn't be determined: %s" % output)
942 raise LauncherError("Job id couldn't be determined: %s" % output)
943 self.job_id = job_id
943 self.job_id = job_id
944 self.log.info('Job submitted with job id: %r' % job_id)
944 self.log.info('Job submitted with job id: %r', job_id)
945 return job_id
945 return job_id
946
946
947 def write_batch_script(self, n):
947 def write_batch_script(self, n):
948 """Instantiate and write the batch script to the work_dir."""
948 """Instantiate and write the batch script to the work_dir."""
949 self.n = n
949 self.n = n
950 # first priority is batch_template if set
950 # first priority is batch_template if set
951 if self.batch_template_file and not self.batch_template:
951 if self.batch_template_file and not self.batch_template:
952 # second priority is batch_template_file
952 # second priority is batch_template_file
953 with open(self.batch_template_file) as f:
953 with open(self.batch_template_file) as f:
954 self.batch_template = f.read()
954 self.batch_template = f.read()
955 if not self.batch_template:
955 if not self.batch_template:
956 # third (last) priority is default_template
956 # third (last) priority is default_template
957 self.batch_template = self.default_template
957 self.batch_template = self.default_template
958
958
959 # add jobarray or queue lines to user-specified template
959 # add jobarray or queue lines to user-specified template
960 # note that this is *only* when user did not specify a template.
960 # note that this is *only* when user did not specify a template.
961 regex = re.compile(self.job_array_regexp)
961 regex = re.compile(self.job_array_regexp)
962 # print regex.search(self.batch_template)
962 # print regex.search(self.batch_template)
963 if not regex.search(self.batch_template):
963 if not regex.search(self.batch_template):
964 self.log.info("adding job array settings to batch script")
964 self.log.debug("adding job array settings to batch script")
965 firstline, rest = self.batch_template.split('\n',1)
965 firstline, rest = self.batch_template.split('\n',1)
966 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
966 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
967
967
968 regex = re.compile(self.queue_regexp)
968 regex = re.compile(self.queue_regexp)
969 # print regex.search(self.batch_template)
969 # print regex.search(self.batch_template)
970 if self.queue and not regex.search(self.batch_template):
970 if self.queue and not regex.search(self.batch_template):
971 self.log.info("adding PBS queue settings to batch script")
971 self.log.debug("adding PBS queue settings to batch script")
972 firstline, rest = self.batch_template.split('\n',1)
972 firstline, rest = self.batch_template.split('\n',1)
973 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
973 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
974
974
975 script_as_string = self.formatter.format(self.batch_template, **self.context)
975 script_as_string = self.formatter.format(self.batch_template, **self.context)
976 self.log.info('Writing batch script: %s', self.batch_file)
976 self.log.debug('Writing batch script: %s', self.batch_file)
977
977
978 with open(self.batch_file, 'w') as f:
978 with open(self.batch_file, 'w') as f:
979 f.write(script_as_string)
979 f.write(script_as_string)
980 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
980 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
981
981
982 def start(self, n):
982 def start(self, n):
983 """Start n copies of the process using a batch system."""
983 """Start n copies of the process using a batch system."""
984 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
984 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
985 # Here we save profile_dir in the context so they
985 # Here we save profile_dir in the context so they
986 # can be used in the batch script template as {profile_dir}
986 # can be used in the batch script template as {profile_dir}
987 self.write_batch_script(n)
987 self.write_batch_script(n)
988 output = check_output(self.args, env=os.environ)
988 output = check_output(self.args, env=os.environ)
989
989
990 job_id = self.parse_job_id(output)
990 job_id = self.parse_job_id(output)
991 self.notify_start(job_id)
991 self.notify_start(job_id)
992 return job_id
992 return job_id
993
993
994 def stop(self):
994 def stop(self):
995 output = check_output(self.delete_command+[self.job_id], env=os.environ)
995 output = check_output(self.delete_command+[self.job_id], env=os.environ)
996 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
996 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
997 return output
997 return output
998
998
999
999
1000 class PBSLauncher(BatchSystemLauncher):
1000 class PBSLauncher(BatchSystemLauncher):
1001 """A BatchSystemLauncher subclass for PBS."""
1001 """A BatchSystemLauncher subclass for PBS."""
1002
1002
1003 submit_command = List(['qsub'], config=True,
1003 submit_command = List(['qsub'], config=True,
1004 help="The PBS submit command ['qsub']")
1004 help="The PBS submit command ['qsub']")
1005 delete_command = List(['qdel'], config=True,
1005 delete_command = List(['qdel'], config=True,
1006 help="The PBS delete command ['qsub']")
1006 help="The PBS delete command ['qsub']")
1007 job_id_regexp = Unicode(r'\d+', config=True,
1007 job_id_regexp = Unicode(r'\d+', config=True,
1008 help="Regular expresion for identifying the job ID [r'\d+']")
1008 help="Regular expresion for identifying the job ID [r'\d+']")
1009
1009
1010 batch_file = Unicode(u'')
1010 batch_file = Unicode(u'')
1011 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1011 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1012 job_array_template = Unicode('#PBS -t 1-{n}')
1012 job_array_template = Unicode('#PBS -t 1-{n}')
1013 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1013 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1014 queue_template = Unicode('#PBS -q {queue}')
1014 queue_template = Unicode('#PBS -q {queue}')
1015
1015
1016
1016
1017 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1017 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1018 """Launch a controller using PBS."""
1018 """Launch a controller using PBS."""
1019
1019
1020 batch_file_name = Unicode(u'pbs_controller', config=True,
1020 batch_file_name = Unicode(u'pbs_controller', config=True,
1021 help="batch file name for the controller job.")
1021 help="batch file name for the controller job.")
1022 default_template= Unicode("""#!/bin/sh
1022 default_template= Unicode("""#!/bin/sh
1023 #PBS -V
1023 #PBS -V
1024 #PBS -N ipcontroller
1024 #PBS -N ipcontroller
1025 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1025 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1026 """%(' '.join(ipcontroller_cmd_argv)))
1026 """%(' '.join(ipcontroller_cmd_argv)))
1027
1027
1028
1028
1029 def start(self):
1029 def start(self):
1030 """Start the controller by profile or profile_dir."""
1030 """Start the controller by profile or profile_dir."""
1031 return super(PBSControllerLauncher, self).start(1)
1031 return super(PBSControllerLauncher, self).start(1)
1032
1032
1033
1033
1034 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1034 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1035 """Launch Engines using PBS"""
1035 """Launch Engines using PBS"""
1036 batch_file_name = Unicode(u'pbs_engines', config=True,
1036 batch_file_name = Unicode(u'pbs_engines', config=True,
1037 help="batch file name for the engine(s) job.")
1037 help="batch file name for the engine(s) job.")
1038 default_template= Unicode(u"""#!/bin/sh
1038 default_template= Unicode(u"""#!/bin/sh
1039 #PBS -V
1039 #PBS -V
1040 #PBS -N ipengine
1040 #PBS -N ipengine
1041 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1041 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1042 """%(' '.join(ipengine_cmd_argv)))
1042 """%(' '.join(ipengine_cmd_argv)))
1043
1043
1044 def start(self, n):
1044 def start(self, n):
1045 """Start n engines by profile or profile_dir."""
1045 """Start n engines by profile or profile_dir."""
1046 return super(PBSEngineSetLauncher, self).start(n)
1046 return super(PBSEngineSetLauncher, self).start(n)
1047
1047
1048 #SGE is very similar to PBS
1048 #SGE is very similar to PBS
1049
1049
1050 class SGELauncher(PBSLauncher):
1050 class SGELauncher(PBSLauncher):
1051 """Sun GridEngine is a PBS clone with slightly different syntax"""
1051 """Sun GridEngine is a PBS clone with slightly different syntax"""
1052 job_array_regexp = Unicode('#\$\W+\-t')
1052 job_array_regexp = Unicode('#\$\W+\-t')
1053 job_array_template = Unicode('#$ -t 1-{n}')
1053 job_array_template = Unicode('#$ -t 1-{n}')
1054 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1054 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1055 queue_template = Unicode('#$ -q {queue}')
1055 queue_template = Unicode('#$ -q {queue}')
1056
1056
1057 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1057 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1058 """Launch a controller using SGE."""
1058 """Launch a controller using SGE."""
1059
1059
1060 batch_file_name = Unicode(u'sge_controller', config=True,
1060 batch_file_name = Unicode(u'sge_controller', config=True,
1061 help="batch file name for the ipontroller job.")
1061 help="batch file name for the ipontroller job.")
1062 default_template= Unicode(u"""#$ -V
1062 default_template= Unicode(u"""#$ -V
1063 #$ -S /bin/sh
1063 #$ -S /bin/sh
1064 #$ -N ipcontroller
1064 #$ -N ipcontroller
1065 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1065 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1066 """%(' '.join(ipcontroller_cmd_argv)))
1066 """%(' '.join(ipcontroller_cmd_argv)))
1067
1067
1068 def start(self):
1068 def start(self):
1069 """Start the controller by profile or profile_dir."""
1069 """Start the controller by profile or profile_dir."""
1070 return super(SGEControllerLauncher, self).start(1)
1070 return super(SGEControllerLauncher, self).start(1)
1071
1071
1072 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1072 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1073 """Launch Engines with SGE"""
1073 """Launch Engines with SGE"""
1074 batch_file_name = Unicode(u'sge_engines', config=True,
1074 batch_file_name = Unicode(u'sge_engines', config=True,
1075 help="batch file name for the engine(s) job.")
1075 help="batch file name for the engine(s) job.")
1076 default_template = Unicode("""#$ -V
1076 default_template = Unicode("""#$ -V
1077 #$ -S /bin/sh
1077 #$ -S /bin/sh
1078 #$ -N ipengine
1078 #$ -N ipengine
1079 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1079 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1080 """%(' '.join(ipengine_cmd_argv)))
1080 """%(' '.join(ipengine_cmd_argv)))
1081
1081
1082 def start(self, n):
1082 def start(self, n):
1083 """Start n engines by profile or profile_dir."""
1083 """Start n engines by profile or profile_dir."""
1084 return super(SGEEngineSetLauncher, self).start(n)
1084 return super(SGEEngineSetLauncher, self).start(n)
1085
1085
1086
1086
1087 # LSF launchers
1087 # LSF launchers
1088
1088
1089 class LSFLauncher(BatchSystemLauncher):
1089 class LSFLauncher(BatchSystemLauncher):
1090 """A BatchSystemLauncher subclass for LSF."""
1090 """A BatchSystemLauncher subclass for LSF."""
1091
1091
1092 submit_command = List(['bsub'], config=True,
1092 submit_command = List(['bsub'], config=True,
1093 help="The PBS submit command ['bsub']")
1093 help="The PBS submit command ['bsub']")
1094 delete_command = List(['bkill'], config=True,
1094 delete_command = List(['bkill'], config=True,
1095 help="The PBS delete command ['bkill']")
1095 help="The PBS delete command ['bkill']")
1096 job_id_regexp = Unicode(r'\d+', config=True,
1096 job_id_regexp = Unicode(r'\d+', config=True,
1097 help="Regular expresion for identifying the job ID [r'\d+']")
1097 help="Regular expresion for identifying the job ID [r'\d+']")
1098
1098
1099 batch_file = Unicode(u'')
1099 batch_file = Unicode(u'')
1100 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1100 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1101 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1101 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1102 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1102 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1103 queue_template = Unicode('#BSUB -q {queue}')
1103 queue_template = Unicode('#BSUB -q {queue}')
1104
1104
1105 def start(self, n):
1105 def start(self, n):
1106 """Start n copies of the process using LSF batch system.
1106 """Start n copies of the process using LSF batch system.
1107 This cant inherit from the base class because bsub expects
1107 This cant inherit from the base class because bsub expects
1108 to be piped a shell script in order to honor the #BSUB directives :
1108 to be piped a shell script in order to honor the #BSUB directives :
1109 bsub < script
1109 bsub < script
1110 """
1110 """
1111 # Here we save profile_dir in the context so they
1111 # Here we save profile_dir in the context so they
1112 # can be used in the batch script template as {profile_dir}
1112 # can be used in the batch script template as {profile_dir}
1113 self.write_batch_script(n)
1113 self.write_batch_script(n)
1114 #output = check_output(self.args, env=os.environ)
1114 #output = check_output(self.args, env=os.environ)
1115 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1115 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1116 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1116 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1117 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1117 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1118 output,err = p.communicate()
1118 output,err = p.communicate()
1119 job_id = self.parse_job_id(output)
1119 job_id = self.parse_job_id(output)
1120 self.notify_start(job_id)
1120 self.notify_start(job_id)
1121 return job_id
1121 return job_id
1122
1122
1123
1123
1124 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1124 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1125 """Launch a controller using LSF."""
1125 """Launch a controller using LSF."""
1126
1126
1127 batch_file_name = Unicode(u'lsf_controller', config=True,
1127 batch_file_name = Unicode(u'lsf_controller', config=True,
1128 help="batch file name for the controller job.")
1128 help="batch file name for the controller job.")
1129 default_template= Unicode("""#!/bin/sh
1129 default_template= Unicode("""#!/bin/sh
1130 #BSUB -J ipcontroller
1130 #BSUB -J ipcontroller
1131 #BSUB -oo ipcontroller.o.%%J
1131 #BSUB -oo ipcontroller.o.%%J
1132 #BSUB -eo ipcontroller.e.%%J
1132 #BSUB -eo ipcontroller.e.%%J
1133 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1133 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1134 """%(' '.join(ipcontroller_cmd_argv)))
1134 """%(' '.join(ipcontroller_cmd_argv)))
1135
1135
1136 def start(self):
1136 def start(self):
1137 """Start the controller by profile or profile_dir."""
1137 """Start the controller by profile or profile_dir."""
1138 return super(LSFControllerLauncher, self).start(1)
1138 return super(LSFControllerLauncher, self).start(1)
1139
1139
1140
1140
1141 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1141 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1142 """Launch Engines using LSF"""
1142 """Launch Engines using LSF"""
1143 batch_file_name = Unicode(u'lsf_engines', config=True,
1143 batch_file_name = Unicode(u'lsf_engines', config=True,
1144 help="batch file name for the engine(s) job.")
1144 help="batch file name for the engine(s) job.")
1145 default_template= Unicode(u"""#!/bin/sh
1145 default_template= Unicode(u"""#!/bin/sh
1146 #BSUB -oo ipengine.o.%%J
1146 #BSUB -oo ipengine.o.%%J
1147 #BSUB -eo ipengine.e.%%J
1147 #BSUB -eo ipengine.e.%%J
1148 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1148 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1149 """%(' '.join(ipengine_cmd_argv)))
1149 """%(' '.join(ipengine_cmd_argv)))
1150
1150
1151 def start(self, n):
1151 def start(self, n):
1152 """Start n engines by profile or profile_dir."""
1152 """Start n engines by profile or profile_dir."""
1153 return super(LSFEngineSetLauncher, self).start(n)
1153 return super(LSFEngineSetLauncher, self).start(n)
1154
1154
1155
1155
1156 #-----------------------------------------------------------------------------
1156 #-----------------------------------------------------------------------------
1157 # A launcher for ipcluster itself!
1157 # A launcher for ipcluster itself!
1158 #-----------------------------------------------------------------------------
1158 #-----------------------------------------------------------------------------
1159
1159
1160
1160
1161 class IPClusterLauncher(LocalProcessLauncher):
1161 class IPClusterLauncher(LocalProcessLauncher):
1162 """Launch the ipcluster program in an external process."""
1162 """Launch the ipcluster program in an external process."""
1163
1163
1164 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1164 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1165 help="Popen command for ipcluster")
1165 help="Popen command for ipcluster")
1166 ipcluster_args = List(
1166 ipcluster_args = List(
1167 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1167 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1168 help="Command line arguments to pass to ipcluster.")
1168 help="Command line arguments to pass to ipcluster.")
1169 ipcluster_subcommand = Unicode('start')
1169 ipcluster_subcommand = Unicode('start')
1170 ipcluster_n = Integer(2)
1170 ipcluster_n = Integer(2)
1171
1171
1172 def find_args(self):
1172 def find_args(self):
1173 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1173 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1174 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1174 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1175
1175
1176 def start(self):
1176 def start(self):
1177 return super(IPClusterLauncher, self).start()
1177 return super(IPClusterLauncher, self).start()
1178
1178
1179 #-----------------------------------------------------------------------------
1179 #-----------------------------------------------------------------------------
1180 # Collections of launchers
1180 # Collections of launchers
1181 #-----------------------------------------------------------------------------
1181 #-----------------------------------------------------------------------------
1182
1182
1183 local_launchers = [
1183 local_launchers = [
1184 LocalControllerLauncher,
1184 LocalControllerLauncher,
1185 LocalEngineLauncher,
1185 LocalEngineLauncher,
1186 LocalEngineSetLauncher,
1186 LocalEngineSetLauncher,
1187 ]
1187 ]
1188 mpi_launchers = [
1188 mpi_launchers = [
1189 MPILauncher,
1189 MPILauncher,
1190 MPIControllerLauncher,
1190 MPIControllerLauncher,
1191 MPIEngineSetLauncher,
1191 MPIEngineSetLauncher,
1192 ]
1192 ]
1193 ssh_launchers = [
1193 ssh_launchers = [
1194 SSHLauncher,
1194 SSHLauncher,
1195 SSHControllerLauncher,
1195 SSHControllerLauncher,
1196 SSHEngineLauncher,
1196 SSHEngineLauncher,
1197 SSHEngineSetLauncher,
1197 SSHEngineSetLauncher,
1198 ]
1198 ]
1199 winhpc_launchers = [
1199 winhpc_launchers = [
1200 WindowsHPCLauncher,
1200 WindowsHPCLauncher,
1201 WindowsHPCControllerLauncher,
1201 WindowsHPCControllerLauncher,
1202 WindowsHPCEngineSetLauncher,
1202 WindowsHPCEngineSetLauncher,
1203 ]
1203 ]
1204 pbs_launchers = [
1204 pbs_launchers = [
1205 PBSLauncher,
1205 PBSLauncher,
1206 PBSControllerLauncher,
1206 PBSControllerLauncher,
1207 PBSEngineSetLauncher,
1207 PBSEngineSetLauncher,
1208 ]
1208 ]
1209 sge_launchers = [
1209 sge_launchers = [
1210 SGELauncher,
1210 SGELauncher,
1211 SGEControllerLauncher,
1211 SGEControllerLauncher,
1212 SGEEngineSetLauncher,
1212 SGEEngineSetLauncher,
1213 ]
1213 ]
1214 lsf_launchers = [
1214 lsf_launchers = [
1215 LSFLauncher,
1215 LSFLauncher,
1216 LSFControllerLauncher,
1216 LSFControllerLauncher,
1217 LSFEngineSetLauncher,
1217 LSFEngineSetLauncher,
1218 ]
1218 ]
1219 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1219 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1220 + pbs_launchers + sge_launchers + lsf_launchers
1220 + pbs_launchers + sge_launchers + lsf_launchers
1221
1221
General Comments 0
You need to be logged in to leave comments. Login now