##// END OF EJS Templates
fix controller_args -> engine_args typo in WinHPCEngine
MinRK -
Show More
@@ -1,1184 +1,1184
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
60 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
62 )
63 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65
65
66 from .win32support import forward_read_events
66 from .win32support import forward_read_events
67
67
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
69
70 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
73 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
79 ))
79 ))
80
80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
83 ))
83 ))
84
84
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
87 ))
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # Base launchers and errors
90 # Base launchers and errors
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92
92
93
93
94 class LauncherError(Exception):
94 class LauncherError(Exception):
95 pass
95 pass
96
96
97
97
98 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
99 pass
99 pass
100
100
101
101
102 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
103 pass
103 pass
104
104
105
105
106 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
108
108
109 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
115 # the work_dir option.
116 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
118
119 start_data = Any()
119 start_data = Any()
120 stop_data = Any()
120 stop_data = Any()
121
121
122 def _loop_default(self):
122 def _loop_default(self):
123 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
124
124
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
128 self.stop_callbacks = []
129 self.start_data = None
129 self.start_data = None
130 self.stop_data = None
130 self.stop_data = None
131
131
132 @property
132 @property
133 def args(self):
133 def args(self):
134 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
135
135
136 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
137 will be the process name.
138 """
138 """
139 return self.find_args()
139 return self.find_args()
140
140
141 def find_args(self):
141 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
143
143
144 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
145 """
145 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
147
147
148 @property
148 @property
149 def arg_str(self):
149 def arg_str(self):
150 """The string form of the program arguments."""
150 """The string form of the program arguments."""
151 return ' '.join(self.args)
151 return ' '.join(self.args)
152
152
153 @property
153 @property
154 def running(self):
154 def running(self):
155 """Am I running."""
155 """Am I running."""
156 if self.state == 'running':
156 if self.state == 'running':
157 return True
157 return True
158 else:
158 else:
159 return False
159 return False
160
160
161 def start(self):
161 def start(self):
162 """Start the process."""
162 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
164
164
165 def stop(self):
165 def stop(self):
166 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
167
167
168 This method will return None immediately.
168 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
170 """
170 """
171 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
172
172
173 def on_stop(self, f):
173 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
175 when the process actually finishes.
176 """
176 """
177 if self.state=='after':
177 if self.state=='after':
178 return f(self.stop_data)
178 return f(self.stop_data)
179 else:
179 else:
180 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
181
181
182 def notify_start(self, data):
182 def notify_start(self, data):
183 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
184
184
185 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
187 """
187 """
188
188
189 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.info('Process %r started: %r' % (self.args[0], data))
190 self.start_data = data
190 self.start_data = data
191 self.state = 'running'
191 self.state = 'running'
192 return data
192 return data
193
193
194 def notify_stop(self, data):
194 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
196
196
197 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
199
199
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
201 self.stop_data = data
201 self.stop_data = data
202 self.state = 'after'
202 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
205 d(data)
205 d(data)
206 return data
206 return data
207
207
208 def signal(self, sig):
208 def signal(self, sig):
209 """Signal the process.
209 """Signal the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218 class ClusterAppMixin(HasTraits):
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
220 cluster_args = List([])
221 profile_dir=Unicode('')
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
224 self.cluster_args = []
225 if self.profile_dir:
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
229 _cluster_id_changed = _profile_dir_changed
230
230
231 class ControllerMixin(ClusterAppMixin):
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
236 help="""command-line args to pass to ipcontroller""")
237
237
238 class EngineMixin(ClusterAppMixin):
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
243 help="command-line arguments to pass to ipengine"
244 )
244 )
245
245
246 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
247 # Local process launchers
247 # Local process launchers
248 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
249
249
250
250
251 class LocalProcessLauncher(BaseLauncher):
251 class LocalProcessLauncher(BaseLauncher):
252 """Start and stop an external process in an asynchronous manner.
252 """Start and stop an external process in an asynchronous manner.
253
253
254 This will launch the external process with a working directory of
254 This will launch the external process with a working directory of
255 ``self.work_dir``.
255 ``self.work_dir``.
256 """
256 """
257
257
258 # This is used to to construct self.args, which is passed to
258 # This is used to to construct self.args, which is passed to
259 # spawnProcess.
259 # spawnProcess.
260 cmd_and_args = List([])
260 cmd_and_args = List([])
261 poll_frequency = Integer(100) # in ms
261 poll_frequency = Integer(100) # in ms
262
262
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 super(LocalProcessLauncher, self).__init__(
264 super(LocalProcessLauncher, self).__init__(
265 work_dir=work_dir, config=config, **kwargs
265 work_dir=work_dir, config=config, **kwargs
266 )
266 )
267 self.process = None
267 self.process = None
268 self.poller = None
268 self.poller = None
269
269
270 def find_args(self):
270 def find_args(self):
271 return self.cmd_and_args
271 return self.cmd_and_args
272
272
273 def start(self):
273 def start(self):
274 if self.state == 'before':
274 if self.state == 'before':
275 self.process = Popen(self.args,
275 self.process = Popen(self.args,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 env=os.environ,
277 env=os.environ,
278 cwd=self.work_dir
278 cwd=self.work_dir
279 )
279 )
280 if WINDOWS:
280 if WINDOWS:
281 self.stdout = forward_read_events(self.process.stdout)
281 self.stdout = forward_read_events(self.process.stdout)
282 self.stderr = forward_read_events(self.process.stderr)
282 self.stderr = forward_read_events(self.process.stderr)
283 else:
283 else:
284 self.stdout = self.process.stdout.fileno()
284 self.stdout = self.process.stdout.fileno()
285 self.stderr = self.process.stderr.fileno()
285 self.stderr = self.process.stderr.fileno()
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
289 self.poller.start()
289 self.poller.start()
290 self.notify_start(self.process.pid)
290 self.notify_start(self.process.pid)
291 else:
291 else:
292 s = 'The process was already started and has state: %r' % self.state
292 s = 'The process was already started and has state: %r' % self.state
293 raise ProcessStateError(s)
293 raise ProcessStateError(s)
294
294
295 def stop(self):
295 def stop(self):
296 return self.interrupt_then_kill()
296 return self.interrupt_then_kill()
297
297
298 def signal(self, sig):
298 def signal(self, sig):
299 if self.state == 'running':
299 if self.state == 'running':
300 if WINDOWS and sig != SIGINT:
300 if WINDOWS and sig != SIGINT:
301 # use Windows tree-kill for better child cleanup
301 # use Windows tree-kill for better child cleanup
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
303 else:
303 else:
304 self.process.send_signal(sig)
304 self.process.send_signal(sig)
305
305
306 def interrupt_then_kill(self, delay=2.0):
306 def interrupt_then_kill(self, delay=2.0):
307 """Send INT, wait a delay and then send KILL."""
307 """Send INT, wait a delay and then send KILL."""
308 try:
308 try:
309 self.signal(SIGINT)
309 self.signal(SIGINT)
310 except Exception:
310 except Exception:
311 self.log.debug("interrupt failed")
311 self.log.debug("interrupt failed")
312 pass
312 pass
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
314 self.killer.start()
314 self.killer.start()
315
315
316 # callbacks, etc:
316 # callbacks, etc:
317
317
318 def handle_stdout(self, fd, events):
318 def handle_stdout(self, fd, events):
319 if WINDOWS:
319 if WINDOWS:
320 line = self.stdout.recv()
320 line = self.stdout.recv()
321 else:
321 else:
322 line = self.process.stdout.readline()
322 line = self.process.stdout.readline()
323 # a stopped process will be readable but return empty strings
323 # a stopped process will be readable but return empty strings
324 if line:
324 if line:
325 self.log.info(line[:-1])
325 self.log.info(line[:-1])
326 else:
326 else:
327 self.poll()
327 self.poll()
328
328
329 def handle_stderr(self, fd, events):
329 def handle_stderr(self, fd, events):
330 if WINDOWS:
330 if WINDOWS:
331 line = self.stderr.recv()
331 line = self.stderr.recv()
332 else:
332 else:
333 line = self.process.stderr.readline()
333 line = self.process.stderr.readline()
334 # a stopped process will be readable but return empty strings
334 # a stopped process will be readable but return empty strings
335 if line:
335 if line:
336 self.log.error(line[:-1])
336 self.log.error(line[:-1])
337 else:
337 else:
338 self.poll()
338 self.poll()
339
339
340 def poll(self):
340 def poll(self):
341 status = self.process.poll()
341 status = self.process.poll()
342 if status is not None:
342 if status is not None:
343 self.poller.stop()
343 self.poller.stop()
344 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stdout)
345 self.loop.remove_handler(self.stderr)
345 self.loop.remove_handler(self.stderr)
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
347 return status
347 return status
348
348
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
350 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
351
351
352 def find_args(self):
352 def find_args(self):
353 return self.controller_cmd + self.cluster_args + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
354
354
355 def start(self):
355 def start(self):
356 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
358 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
359
359
360
360
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
363
363
364 def find_args(self):
364 def find_args(self):
365 return self.engine_cmd + self.cluster_args + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
366
366
367
367
368 class LocalEngineSetLauncher(LocalEngineLauncher):
368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
370
370
371 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
372 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
373 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
374 process flood when starting many engines."""
374 process flood when starting many engines."""
375 )
375 )
376
376
377 # launcher class
377 # launcher class
378 launcher_class = LocalEngineLauncher
378 launcher_class = LocalEngineLauncher
379
379
380 launchers = Dict()
380 launchers = Dict()
381 stop_data = Dict()
381 stop_data = Dict()
382
382
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 super(LocalEngineSetLauncher, self).__init__(
384 super(LocalEngineSetLauncher, self).__init__(
385 work_dir=work_dir, config=config, **kwargs
385 work_dir=work_dir, config=config, **kwargs
386 )
386 )
387 self.stop_data = {}
387 self.stop_data = {}
388
388
389 def start(self, n):
389 def start(self, n):
390 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
391 dlist = []
391 dlist = []
392 for i in range(n):
392 for i in range(n):
393 if i > 0:
393 if i > 0:
394 time.sleep(self.delay)
394 time.sleep(self.delay)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
397 )
398
398
399 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
402 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
403 d = el.start()
403 d = el.start()
404 if i==0:
404 if i==0:
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
406 self.launchers[i] = el
406 self.launchers[i] = el
407 dlist.append(d)
407 dlist.append(d)
408 self.notify_start(dlist)
408 self.notify_start(dlist)
409 return dlist
409 return dlist
410
410
411 def find_args(self):
411 def find_args(self):
412 return ['engine set']
412 return ['engine set']
413
413
414 def signal(self, sig):
414 def signal(self, sig):
415 dlist = []
415 dlist = []
416 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
417 d = el.signal(sig)
417 d = el.signal(sig)
418 dlist.append(d)
418 dlist.append(d)
419 return dlist
419 return dlist
420
420
421 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
422 dlist = []
422 dlist = []
423 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
424 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
425 dlist.append(d)
425 dlist.append(d)
426 return dlist
426 return dlist
427
427
428 def stop(self):
428 def stop(self):
429 return self.interrupt_then_kill()
429 return self.interrupt_then_kill()
430
430
431 def _notice_engine_stopped(self, data):
431 def _notice_engine_stopped(self, data):
432 pid = data['pid']
432 pid = data['pid']
433 for idx,el in self.launchers.iteritems():
433 for idx,el in self.launchers.iteritems():
434 if el.process.pid == pid:
434 if el.process.pid == pid:
435 break
435 break
436 self.launchers.pop(idx)
436 self.launchers.pop(idx)
437 self.stop_data[idx] = data
437 self.stop_data[idx] = data
438 if not self.launchers:
438 if not self.launchers:
439 self.notify_stop(self.stop_data)
439 self.notify_stop(self.stop_data)
440
440
441
441
442 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
443 # MPIExec launchers
443 # MPIExec launchers
444 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
445
445
446
446
447 class MPIExecLauncher(LocalProcessLauncher):
447 class MPIExecLauncher(LocalProcessLauncher):
448 """Launch an external process using mpiexec."""
448 """Launch an external process using mpiexec."""
449
449
450 mpi_cmd = List(['mpiexec'], config=True,
450 mpi_cmd = List(['mpiexec'], config=True,
451 help="The mpiexec command to use in starting the process."
451 help="The mpiexec command to use in starting the process."
452 )
452 )
453 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
454 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
455 )
455 )
456 program = List(['date'],
456 program = List(['date'],
457 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
458 program_args = List([],
458 program_args = List([],
459 help="The command line argument to the program."
459 help="The command line argument to the program."
460 )
460 )
461 n = Integer(1)
461 n = Integer(1)
462
462
463 def find_args(self):
463 def find_args(self):
464 """Build self.args using all the fields."""
464 """Build self.args using all the fields."""
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
466 self.program + self.program_args
466 self.program + self.program_args
467
467
468 def start(self, n):
468 def start(self, n):
469 """Start n instances of the program using mpiexec."""
469 """Start n instances of the program using mpiexec."""
470 self.n = n
470 self.n = n
471 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
472
472
473
473
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
475 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
476
476
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 # alias back to *non-configurable* program[_args] for use in find_args()
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 # this way all Controller/EngineSetLaunchers have the same form, rather
479 # than *some* having `program_args` and others `controller_args`
479 # than *some* having `program_args` and others `controller_args`
480 @property
480 @property
481 def program(self):
481 def program(self):
482 return self.controller_cmd
482 return self.controller_cmd
483
483
484 @property
484 @property
485 def program_args(self):
485 def program_args(self):
486 return self.cluster_args + self.controller_args
486 return self.cluster_args + self.controller_args
487
487
488 def start(self):
488 def start(self):
489 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
492
492
493
493
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
495 """Launch engines using mpiexec"""
496
496
497 # alias back to *non-configurable* program[_args] for use in find_args()
497 # alias back to *non-configurable* program[_args] for use in find_args()
498 # this way all Controller/EngineSetLaunchers have the same form, rather
498 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # than *some* having `program_args` and others `controller_args`
499 # than *some* having `program_args` and others `controller_args`
500 @property
500 @property
501 def program(self):
501 def program(self):
502 return self.engine_cmd
502 return self.engine_cmd
503
503
504 @property
504 @property
505 def program_args(self):
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506 return self.cluster_args + self.engine_args
507
507
508 def start(self, n):
508 def start(self, n):
509 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
510 self.n = n
510 self.n = n
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
513
513
514 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
515 # SSH launchers
515 # SSH launchers
516 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
517
517
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519
519
520 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
521 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
522
522
523 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
524 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
525 as well.
525 as well.
526 """
526 """
527
527
528 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
529 help="command for starting ssh")
529 help="command for starting ssh")
530 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
531 help="args to pass to ssh")
531 help="args to pass to ssh")
532 program = List(['date'],
532 program = List(['date'],
533 help="Program to launch via ssh")
533 help="Program to launch via ssh")
534 program_args = List([],
534 program_args = List([],
535 help="args to pass to remote program")
535 help="args to pass to remote program")
536 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
537 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
538 user = Unicode('', config=True,
538 user = Unicode('', config=True,
539 help="username for ssh")
539 help="username for ssh")
540 location = Unicode('', config=True,
540 location = Unicode('', config=True,
541 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
542
542
543 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
544 if self.user:
544 if self.user:
545 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
546 else:
546 else:
547 self.location = new
547 self.location = new
548
548
549 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
550 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
551
551
552 def find_args(self):
552 def find_args(self):
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 self.program + self.program_args
554 self.program + self.program_args
555
555
556 def start(self, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
557 if hostname is not None:
557 if hostname is not None:
558 self.hostname = hostname
558 self.hostname = hostname
559 if user is not None:
559 if user is not None:
560 self.user = user
560 self.user = user
561
561
562 return super(SSHLauncher, self).start()
562 return super(SSHLauncher, self).start()
563
563
564 def signal(self, sig):
564 def signal(self, sig):
565 if self.state == 'running':
565 if self.state == 'running':
566 # send escaped ssh connection-closer
566 # send escaped ssh connection-closer
567 self.process.stdin.write('~.')
567 self.process.stdin.write('~.')
568 self.process.stdin.flush()
568 self.process.stdin.flush()
569
569
570
570
571
571
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
573
573
574 # alias back to *non-configurable* program[_args] for use in find_args()
574 # alias back to *non-configurable* program[_args] for use in find_args()
575 # this way all Controller/EngineSetLaunchers have the same form, rather
575 # this way all Controller/EngineSetLaunchers have the same form, rather
576 # than *some* having `program_args` and others `controller_args`
576 # than *some* having `program_args` and others `controller_args`
577 @property
577 @property
578 def program(self):
578 def program(self):
579 return self.controller_cmd
579 return self.controller_cmd
580
580
581 @property
581 @property
582 def program_args(self):
582 def program_args(self):
583 return self.cluster_args + self.controller_args
583 return self.cluster_args + self.controller_args
584
584
585
585
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
587
587
588 # alias back to *non-configurable* program[_args] for use in find_args()
588 # alias back to *non-configurable* program[_args] for use in find_args()
589 # this way all Controller/EngineSetLaunchers have the same form, rather
589 # this way all Controller/EngineSetLaunchers have the same form, rather
590 # than *some* having `program_args` and others `controller_args`
590 # than *some* having `program_args` and others `controller_args`
591 @property
591 @property
592 def program(self):
592 def program(self):
593 return self.engine_cmd
593 return self.engine_cmd
594
594
595 @property
595 @property
596 def program_args(self):
596 def program_args(self):
597 return self.cluster_args + self.engine_args
597 return self.cluster_args + self.engine_args
598
598
599
599
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
601 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
602 engines = Dict(config=True,
602 engines = Dict(config=True,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
604 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
605
605
606 def start(self, n):
606 def start(self, n):
607 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
608 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
609 """
609 """
610
610
611 dlist = []
611 dlist = []
612 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
613 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
614 n, args = n
614 n, args = n
615 else:
615 else:
616 args = copy.deepcopy(self.engine_args)
616 args = copy.deepcopy(self.engine_args)
617
617
618 if '@' in host:
618 if '@' in host:
619 user,host = host.split('@',1)
619 user,host = host.split('@',1)
620 else:
620 else:
621 user=None
621 user=None
622 for i in range(n):
622 for i in range(n):
623 if i > 0:
623 if i > 0:
624 time.sleep(self.delay)
624 time.sleep(self.delay)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
627 )
628
628
629 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
630 el.engine_cmd = self.engine_cmd
630 el.engine_cmd = self.engine_cmd
631 el.engine_args = args
631 el.engine_args = args
632 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
633 d = el.start(user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
634 if i==0:
634 if i==0:
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
636 self.launchers[ "%s/%i" % (host,i) ] = el
636 self.launchers[ "%s/%i" % (host,i) ] = el
637 dlist.append(d)
637 dlist.append(d)
638 self.notify_start(dlist)
638 self.notify_start(dlist)
639 return dlist
639 return dlist
640
640
641
641
642
642
643 #-----------------------------------------------------------------------------
643 #-----------------------------------------------------------------------------
644 # Windows HPC Server 2008 scheduler launchers
644 # Windows HPC Server 2008 scheduler launchers
645 #-----------------------------------------------------------------------------
645 #-----------------------------------------------------------------------------
646
646
647
647
648 # This is only used on Windows.
648 # This is only used on Windows.
649 def find_job_cmd():
649 def find_job_cmd():
650 if WINDOWS:
650 if WINDOWS:
651 try:
651 try:
652 return find_cmd('job')
652 return find_cmd('job')
653 except (FindCmdError, ImportError):
653 except (FindCmdError, ImportError):
654 # ImportError will be raised if win32api is not installed
654 # ImportError will be raised if win32api is not installed
655 return 'job'
655 return 'job'
656 else:
656 else:
657 return 'job'
657 return 'job'
658
658
659
659
660 class WindowsHPCLauncher(BaseLauncher):
660 class WindowsHPCLauncher(BaseLauncher):
661
661
662 job_id_regexp = Unicode(r'\d+', config=True,
662 job_id_regexp = Unicode(r'\d+', config=True,
663 help="""A regular expression used to get the job id from the output of the
663 help="""A regular expression used to get the job id from the output of the
664 submit_command. """
664 submit_command. """
665 )
665 )
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
667 help="The filename of the instantiated job script.")
667 help="The filename of the instantiated job script.")
668 # The full path to the instantiated job script. This gets made dynamically
668 # The full path to the instantiated job script. This gets made dynamically
669 # by combining the work_dir with the job_file_name.
669 # by combining the work_dir with the job_file_name.
670 job_file = Unicode(u'')
670 job_file = Unicode(u'')
671 scheduler = Unicode('', config=True,
671 scheduler = Unicode('', config=True,
672 help="The hostname of the scheduler to submit the job to.")
672 help="The hostname of the scheduler to submit the job to.")
673 job_cmd = Unicode(find_job_cmd(), config=True,
673 job_cmd = Unicode(find_job_cmd(), config=True,
674 help="The command for submitting jobs.")
674 help="The command for submitting jobs.")
675
675
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
677 super(WindowsHPCLauncher, self).__init__(
677 super(WindowsHPCLauncher, self).__init__(
678 work_dir=work_dir, config=config, **kwargs
678 work_dir=work_dir, config=config, **kwargs
679 )
679 )
680
680
681 @property
681 @property
682 def job_file(self):
682 def job_file(self):
683 return os.path.join(self.work_dir, self.job_file_name)
683 return os.path.join(self.work_dir, self.job_file_name)
684
684
685 def write_job_file(self, n):
685 def write_job_file(self, n):
686 raise NotImplementedError("Implement write_job_file in a subclass.")
686 raise NotImplementedError("Implement write_job_file in a subclass.")
687
687
688 def find_args(self):
688 def find_args(self):
689 return [u'job.exe']
689 return [u'job.exe']
690
690
691 def parse_job_id(self, output):
691 def parse_job_id(self, output):
692 """Take the output of the submit command and return the job id."""
692 """Take the output of the submit command and return the job id."""
693 m = re.search(self.job_id_regexp, output)
693 m = re.search(self.job_id_regexp, output)
694 if m is not None:
694 if m is not None:
695 job_id = m.group()
695 job_id = m.group()
696 else:
696 else:
697 raise LauncherError("Job id couldn't be determined: %s" % output)
697 raise LauncherError("Job id couldn't be determined: %s" % output)
698 self.job_id = job_id
698 self.job_id = job_id
699 self.log.info('Job started with job id: %r' % job_id)
699 self.log.info('Job started with job id: %r' % job_id)
700 return job_id
700 return job_id
701
701
702 def start(self, n):
702 def start(self, n):
703 """Start n copies of the process using the Win HPC job scheduler."""
703 """Start n copies of the process using the Win HPC job scheduler."""
704 self.write_job_file(n)
704 self.write_job_file(n)
705 args = [
705 args = [
706 'submit',
706 'submit',
707 '/jobfile:%s' % self.job_file,
707 '/jobfile:%s' % self.job_file,
708 '/scheduler:%s' % self.scheduler
708 '/scheduler:%s' % self.scheduler
709 ]
709 ]
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
711
711
712 output = check_output([self.job_cmd]+args,
712 output = check_output([self.job_cmd]+args,
713 env=os.environ,
713 env=os.environ,
714 cwd=self.work_dir,
714 cwd=self.work_dir,
715 stderr=STDOUT
715 stderr=STDOUT
716 )
716 )
717 job_id = self.parse_job_id(output)
717 job_id = self.parse_job_id(output)
718 self.notify_start(job_id)
718 self.notify_start(job_id)
719 return job_id
719 return job_id
720
720
721 def stop(self):
721 def stop(self):
722 args = [
722 args = [
723 'cancel',
723 'cancel',
724 self.job_id,
724 self.job_id,
725 '/scheduler:%s' % self.scheduler
725 '/scheduler:%s' % self.scheduler
726 ]
726 ]
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
728 try:
728 try:
729 output = check_output([self.job_cmd]+args,
729 output = check_output([self.job_cmd]+args,
730 env=os.environ,
730 env=os.environ,
731 cwd=self.work_dir,
731 cwd=self.work_dir,
732 stderr=STDOUT
732 stderr=STDOUT
733 )
733 )
734 except:
734 except:
735 output = 'The job already appears to be stoppped: %r' % self.job_id
735 output = 'The job already appears to be stoppped: %r' % self.job_id
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
737 return output
737 return output
738
738
739
739
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
741
741
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
743 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
744 controller_args = List([], config=False,
744 controller_args = List([], config=False,
745 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
746
746
747 def write_job_file(self, n):
747 def write_job_file(self, n):
748 job = IPControllerJob(config=self.config)
748 job = IPControllerJob(config=self.config)
749
749
750 t = IPControllerTask(config=self.config)
750 t = IPControllerTask(config=self.config)
751 # The tasks work directory is *not* the actual work directory of
751 # The tasks work directory is *not* the actual work directory of
752 # the controller. It is used as the base path for the stdout/stderr
752 # the controller. It is used as the base path for the stdout/stderr
753 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
754 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
755 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
756 t.controller_args.extend(self.cluster_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
757 t.controller_args.extend(self.controller_args)
758 job.add_task(t)
758 job.add_task(t)
759
759
760 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
761 job.write(self.job_file)
761 job.write(self.job_file)
762
762
763 @property
763 @property
764 def job_file(self):
764 def job_file(self):
765 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
766
766
767 def start(self):
767 def start(self):
768 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
769 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
770
770
771
771
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
773
773
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
775 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
776 engine_args = List([], config=False,
776 engine_args = List([], config=False,
777 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
778
778
779 def write_job_file(self, n):
779 def write_job_file(self, n):
780 job = IPEngineSetJob(config=self.config)
780 job = IPEngineSetJob(config=self.config)
781
781
782 for i in range(n):
782 for i in range(n):
783 t = IPEngineTask(config=self.config)
783 t = IPEngineTask(config=self.config)
784 # The tasks work directory is *not* the actual work directory of
784 # The tasks work directory is *not* the actual work directory of
785 # the engine. It is used as the base path for the stdout/stderr
785 # the engine. It is used as the base path for the stdout/stderr
786 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
787 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
788 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
789 t.controller_args.extend(self.cluster_args)
789 t.engine_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
790 t.engine_args.extend(self.engine_args)
791 job.add_task(t)
791 job.add_task(t)
792
792
793 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
794 job.write(self.job_file)
794 job.write(self.job_file)
795
795
796 @property
796 @property
797 def job_file(self):
797 def job_file(self):
798 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
799
799
800 def start(self, n):
800 def start(self, n):
801 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
803
803
804
804
805 #-----------------------------------------------------------------------------
805 #-----------------------------------------------------------------------------
806 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
807 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
808
808
809 class BatchClusterAppMixin(ClusterAppMixin):
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 def _profile_dir_changed(self, name, old, new):
811 def _profile_dir_changed(self, name, old, new):
812 self.context[name] = new
812 self.context[name] = new
813 _cluster_id_changed = _profile_dir_changed
813 _cluster_id_changed = _profile_dir_changed
814
814
815 def _profile_dir_default(self):
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
816 self.context['profile_dir'] = ''
817 return ''
817 return ''
818 def _cluster_id_default(self):
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
819 self.context['cluster_id'] = ''
820 return ''
820 return ''
821
821
822
822
823 class BatchSystemLauncher(BaseLauncher):
823 class BatchSystemLauncher(BaseLauncher):
824 """Launch an external process using a batch system.
824 """Launch an external process using a batch system.
825
825
826 This class is designed to work with UNIX batch systems like PBS, LSF,
826 This class is designed to work with UNIX batch systems like PBS, LSF,
827 GridEngine, etc. The overall model is that there are different commands
827 GridEngine, etc. The overall model is that there are different commands
828 like qsub, qdel, etc. that handle the starting and stopping of the process.
828 like qsub, qdel, etc. that handle the starting and stopping of the process.
829
829
830 This class also has the notion of a batch script. The ``batch_template``
830 This class also has the notion of a batch script. The ``batch_template``
831 attribute can be set to a string that is a template for the batch script.
831 attribute can be set to a string that is a template for the batch script.
832 This template is instantiated using string formatting. Thus the template can
832 This template is instantiated using string formatting. Thus the template can
833 use {n} fot the number of instances. Subclasses can add additional variables
833 use {n} fot the number of instances. Subclasses can add additional variables
834 to the template dict.
834 to the template dict.
835 """
835 """
836
836
837 # Subclasses must fill these in. See PBSEngineSet
837 # Subclasses must fill these in. See PBSEngineSet
838 submit_command = List([''], config=True,
838 submit_command = List([''], config=True,
839 help="The name of the command line program used to submit jobs.")
839 help="The name of the command line program used to submit jobs.")
840 delete_command = List([''], config=True,
840 delete_command = List([''], config=True,
841 help="The name of the command line program used to delete jobs.")
841 help="The name of the command line program used to delete jobs.")
842 job_id_regexp = Unicode('', config=True,
842 job_id_regexp = Unicode('', config=True,
843 help="""A regular expression used to get the job id from the output of the
843 help="""A regular expression used to get the job id from the output of the
844 submit_command.""")
844 submit_command.""")
845 batch_template = Unicode('', config=True,
845 batch_template = Unicode('', config=True,
846 help="The string that is the batch script template itself.")
846 help="The string that is the batch script template itself.")
847 batch_template_file = Unicode(u'', config=True,
847 batch_template_file = Unicode(u'', config=True,
848 help="The file that contains the batch template.")
848 help="The file that contains the batch template.")
849 batch_file_name = Unicode(u'batch_script', config=True,
849 batch_file_name = Unicode(u'batch_script', config=True,
850 help="The filename of the instantiated batch script.")
850 help="The filename of the instantiated batch script.")
851 queue = Unicode(u'', config=True,
851 queue = Unicode(u'', config=True,
852 help="The PBS Queue.")
852 help="The PBS Queue.")
853
853
854 def _queue_changed(self, name, old, new):
854 def _queue_changed(self, name, old, new):
855 self.context[name] = new
855 self.context[name] = new
856
856
857 n = Integer(1)
857 n = Integer(1)
858 _n_changed = _queue_changed
858 _n_changed = _queue_changed
859
859
860 # not configurable, override in subclasses
860 # not configurable, override in subclasses
861 # PBS Job Array regex
861 # PBS Job Array regex
862 job_array_regexp = Unicode('')
862 job_array_regexp = Unicode('')
863 job_array_template = Unicode('')
863 job_array_template = Unicode('')
864 # PBS Queue regex
864 # PBS Queue regex
865 queue_regexp = Unicode('')
865 queue_regexp = Unicode('')
866 queue_template = Unicode('')
866 queue_template = Unicode('')
867 # The default batch template, override in subclasses
867 # The default batch template, override in subclasses
868 default_template = Unicode('')
868 default_template = Unicode('')
869 # The full path to the instantiated batch script.
869 # The full path to the instantiated batch script.
870 batch_file = Unicode(u'')
870 batch_file = Unicode(u'')
871 # the format dict used with batch_template:
871 # the format dict used with batch_template:
872 context = Dict()
872 context = Dict()
873 def _context_default(self):
873 def _context_default(self):
874 """load the default context with the default values for the basic keys
874 """load the default context with the default values for the basic keys
875
875
876 because the _trait_changed methods only load the context if they
876 because the _trait_changed methods only load the context if they
877 are set to something other than the default value.
877 are set to something other than the default value.
878 """
878 """
879 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
879 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
880
880
881 # the Formatter instance for rendering the templates:
881 # the Formatter instance for rendering the templates:
882 formatter = Instance(EvalFormatter, (), {})
882 formatter = Instance(EvalFormatter, (), {})
883
883
884
884
885 def find_args(self):
885 def find_args(self):
886 return self.submit_command + [self.batch_file]
886 return self.submit_command + [self.batch_file]
887
887
888 def __init__(self, work_dir=u'.', config=None, **kwargs):
888 def __init__(self, work_dir=u'.', config=None, **kwargs):
889 super(BatchSystemLauncher, self).__init__(
889 super(BatchSystemLauncher, self).__init__(
890 work_dir=work_dir, config=config, **kwargs
890 work_dir=work_dir, config=config, **kwargs
891 )
891 )
892 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
892 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
893
893
894 def parse_job_id(self, output):
894 def parse_job_id(self, output):
895 """Take the output of the submit command and return the job id."""
895 """Take the output of the submit command and return the job id."""
896 m = re.search(self.job_id_regexp, output)
896 m = re.search(self.job_id_regexp, output)
897 if m is not None:
897 if m is not None:
898 job_id = m.group()
898 job_id = m.group()
899 else:
899 else:
900 raise LauncherError("Job id couldn't be determined: %s" % output)
900 raise LauncherError("Job id couldn't be determined: %s" % output)
901 self.job_id = job_id
901 self.job_id = job_id
902 self.log.info('Job submitted with job id: %r' % job_id)
902 self.log.info('Job submitted with job id: %r' % job_id)
903 return job_id
903 return job_id
904
904
905 def write_batch_script(self, n):
905 def write_batch_script(self, n):
906 """Instantiate and write the batch script to the work_dir."""
906 """Instantiate and write the batch script to the work_dir."""
907 self.n = n
907 self.n = n
908 # first priority is batch_template if set
908 # first priority is batch_template if set
909 if self.batch_template_file and not self.batch_template:
909 if self.batch_template_file and not self.batch_template:
910 # second priority is batch_template_file
910 # second priority is batch_template_file
911 with open(self.batch_template_file) as f:
911 with open(self.batch_template_file) as f:
912 self.batch_template = f.read()
912 self.batch_template = f.read()
913 if not self.batch_template:
913 if not self.batch_template:
914 # third (last) priority is default_template
914 # third (last) priority is default_template
915 self.batch_template = self.default_template
915 self.batch_template = self.default_template
916
916
917 # add jobarray or queue lines to user-specified template
917 # add jobarray or queue lines to user-specified template
918 # note that this is *only* when user did not specify a template.
918 # note that this is *only* when user did not specify a template.
919 regex = re.compile(self.job_array_regexp)
919 regex = re.compile(self.job_array_regexp)
920 # print regex.search(self.batch_template)
920 # print regex.search(self.batch_template)
921 if not regex.search(self.batch_template):
921 if not regex.search(self.batch_template):
922 self.log.info("adding job array settings to batch script")
922 self.log.info("adding job array settings to batch script")
923 firstline, rest = self.batch_template.split('\n',1)
923 firstline, rest = self.batch_template.split('\n',1)
924 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
924 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
925
925
926 regex = re.compile(self.queue_regexp)
926 regex = re.compile(self.queue_regexp)
927 # print regex.search(self.batch_template)
927 # print regex.search(self.batch_template)
928 if self.queue and not regex.search(self.batch_template):
928 if self.queue and not regex.search(self.batch_template):
929 self.log.info("adding PBS queue settings to batch script")
929 self.log.info("adding PBS queue settings to batch script")
930 firstline, rest = self.batch_template.split('\n',1)
930 firstline, rest = self.batch_template.split('\n',1)
931 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
931 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
932
932
933 script_as_string = self.formatter.format(self.batch_template, **self.context)
933 script_as_string = self.formatter.format(self.batch_template, **self.context)
934 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
934 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
935
935
936 with open(self.batch_file, 'w') as f:
936 with open(self.batch_file, 'w') as f:
937 f.write(script_as_string)
937 f.write(script_as_string)
938 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
938 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
939
939
940 def start(self, n):
940 def start(self, n):
941 """Start n copies of the process using a batch system."""
941 """Start n copies of the process using a batch system."""
942 # Here we save profile_dir in the context so they
942 # Here we save profile_dir in the context so they
943 # can be used in the batch script template as {profile_dir}
943 # can be used in the batch script template as {profile_dir}
944 self.write_batch_script(n)
944 self.write_batch_script(n)
945 output = check_output(self.args, env=os.environ)
945 output = check_output(self.args, env=os.environ)
946
946
947 job_id = self.parse_job_id(output)
947 job_id = self.parse_job_id(output)
948 self.notify_start(job_id)
948 self.notify_start(job_id)
949 return job_id
949 return job_id
950
950
951 def stop(self):
951 def stop(self):
952 output = check_output(self.delete_command+[self.job_id], env=os.environ)
952 output = check_output(self.delete_command+[self.job_id], env=os.environ)
953 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
953 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
954 return output
954 return output
955
955
956
956
957 class PBSLauncher(BatchSystemLauncher):
957 class PBSLauncher(BatchSystemLauncher):
958 """A BatchSystemLauncher subclass for PBS."""
958 """A BatchSystemLauncher subclass for PBS."""
959
959
960 submit_command = List(['qsub'], config=True,
960 submit_command = List(['qsub'], config=True,
961 help="The PBS submit command ['qsub']")
961 help="The PBS submit command ['qsub']")
962 delete_command = List(['qdel'], config=True,
962 delete_command = List(['qdel'], config=True,
963 help="The PBS delete command ['qsub']")
963 help="The PBS delete command ['qsub']")
964 job_id_regexp = Unicode(r'\d+', config=True,
964 job_id_regexp = Unicode(r'\d+', config=True,
965 help="Regular expresion for identifying the job ID [r'\d+']")
965 help="Regular expresion for identifying the job ID [r'\d+']")
966
966
967 batch_file = Unicode(u'')
967 batch_file = Unicode(u'')
968 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
968 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
969 job_array_template = Unicode('#PBS -t 1-{n}')
969 job_array_template = Unicode('#PBS -t 1-{n}')
970 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
970 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
971 queue_template = Unicode('#PBS -q {queue}')
971 queue_template = Unicode('#PBS -q {queue}')
972
972
973
973
974 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
974 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
975 """Launch a controller using PBS."""
975 """Launch a controller using PBS."""
976
976
977 batch_file_name = Unicode(u'pbs_controller', config=True,
977 batch_file_name = Unicode(u'pbs_controller', config=True,
978 help="batch file name for the controller job.")
978 help="batch file name for the controller job.")
979 default_template= Unicode("""#!/bin/sh
979 default_template= Unicode("""#!/bin/sh
980 #PBS -V
980 #PBS -V
981 #PBS -N ipcontroller
981 #PBS -N ipcontroller
982 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
982 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
983 """%(' '.join(ipcontroller_cmd_argv)))
983 """%(' '.join(ipcontroller_cmd_argv)))
984
984
985
985
986 def start(self):
986 def start(self):
987 """Start the controller by profile or profile_dir."""
987 """Start the controller by profile or profile_dir."""
988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
989 return super(PBSControllerLauncher, self).start(1)
989 return super(PBSControllerLauncher, self).start(1)
990
990
991
991
992 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
992 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
993 """Launch Engines using PBS"""
993 """Launch Engines using PBS"""
994 batch_file_name = Unicode(u'pbs_engines', config=True,
994 batch_file_name = Unicode(u'pbs_engines', config=True,
995 help="batch file name for the engine(s) job.")
995 help="batch file name for the engine(s) job.")
996 default_template= Unicode(u"""#!/bin/sh
996 default_template= Unicode(u"""#!/bin/sh
997 #PBS -V
997 #PBS -V
998 #PBS -N ipengine
998 #PBS -N ipengine
999 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
999 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1000 """%(' '.join(ipengine_cmd_argv)))
1000 """%(' '.join(ipengine_cmd_argv)))
1001
1001
1002 def start(self, n):
1002 def start(self, n):
1003 """Start n engines by profile or profile_dir."""
1003 """Start n engines by profile or profile_dir."""
1004 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
1004 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
1005 return super(PBSEngineSetLauncher, self).start(n)
1005 return super(PBSEngineSetLauncher, self).start(n)
1006
1006
1007 #SGE is very similar to PBS
1007 #SGE is very similar to PBS
1008
1008
1009 class SGELauncher(PBSLauncher):
1009 class SGELauncher(PBSLauncher):
1010 """Sun GridEngine is a PBS clone with slightly different syntax"""
1010 """Sun GridEngine is a PBS clone with slightly different syntax"""
1011 job_array_regexp = Unicode('#\$\W+\-t')
1011 job_array_regexp = Unicode('#\$\W+\-t')
1012 job_array_template = Unicode('#$ -t 1-{n}')
1012 job_array_template = Unicode('#$ -t 1-{n}')
1013 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1013 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1014 queue_template = Unicode('#$ -q {queue}')
1014 queue_template = Unicode('#$ -q {queue}')
1015
1015
1016 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1016 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1017 """Launch a controller using SGE."""
1017 """Launch a controller using SGE."""
1018
1018
1019 batch_file_name = Unicode(u'sge_controller', config=True,
1019 batch_file_name = Unicode(u'sge_controller', config=True,
1020 help="batch file name for the ipontroller job.")
1020 help="batch file name for the ipontroller job.")
1021 default_template= Unicode(u"""#$ -V
1021 default_template= Unicode(u"""#$ -V
1022 #$ -S /bin/sh
1022 #$ -S /bin/sh
1023 #$ -N ipcontroller
1023 #$ -N ipcontroller
1024 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1024 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1025 """%(' '.join(ipcontroller_cmd_argv)))
1025 """%(' '.join(ipcontroller_cmd_argv)))
1026
1026
1027 def start(self):
1027 def start(self):
1028 """Start the controller by profile or profile_dir."""
1028 """Start the controller by profile or profile_dir."""
1029 self.log.info("Starting SGEControllerLauncher: %r" % self.args)
1029 self.log.info("Starting SGEControllerLauncher: %r" % self.args)
1030 return super(SGEControllerLauncher, self).start(1)
1030 return super(SGEControllerLauncher, self).start(1)
1031
1031
1032 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1032 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1033 """Launch Engines with SGE"""
1033 """Launch Engines with SGE"""
1034 batch_file_name = Unicode(u'sge_engines', config=True,
1034 batch_file_name = Unicode(u'sge_engines', config=True,
1035 help="batch file name for the engine(s) job.")
1035 help="batch file name for the engine(s) job.")
1036 default_template = Unicode("""#$ -V
1036 default_template = Unicode("""#$ -V
1037 #$ -S /bin/sh
1037 #$ -S /bin/sh
1038 #$ -N ipengine
1038 #$ -N ipengine
1039 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1039 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1040 """%(' '.join(ipengine_cmd_argv)))
1040 """%(' '.join(ipengine_cmd_argv)))
1041
1041
1042 def start(self, n):
1042 def start(self, n):
1043 """Start n engines by profile or profile_dir."""
1043 """Start n engines by profile or profile_dir."""
1044 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1044 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1045 return super(SGEEngineSetLauncher, self).start(n)
1045 return super(SGEEngineSetLauncher, self).start(n)
1046
1046
1047
1047
1048 # LSF launchers
1048 # LSF launchers
1049
1049
1050 class LSFLauncher(BatchSystemLauncher):
1050 class LSFLauncher(BatchSystemLauncher):
1051 """A BatchSystemLauncher subclass for LSF."""
1051 """A BatchSystemLauncher subclass for LSF."""
1052
1052
1053 submit_command = List(['bsub'], config=True,
1053 submit_command = List(['bsub'], config=True,
1054 help="The PBS submit command ['bsub']")
1054 help="The PBS submit command ['bsub']")
1055 delete_command = List(['bkill'], config=True,
1055 delete_command = List(['bkill'], config=True,
1056 help="The PBS delete command ['bkill']")
1056 help="The PBS delete command ['bkill']")
1057 job_id_regexp = Unicode(r'\d+', config=True,
1057 job_id_regexp = Unicode(r'\d+', config=True,
1058 help="Regular expresion for identifying the job ID [r'\d+']")
1058 help="Regular expresion for identifying the job ID [r'\d+']")
1059
1059
1060 batch_file = Unicode(u'')
1060 batch_file = Unicode(u'')
1061 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1061 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1062 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1062 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1063 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1063 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1064 queue_template = Unicode('#BSUB -q {queue}')
1064 queue_template = Unicode('#BSUB -q {queue}')
1065
1065
1066 def start(self, n):
1066 def start(self, n):
1067 """Start n copies of the process using LSF batch system.
1067 """Start n copies of the process using LSF batch system.
1068 This cant inherit from the base class because bsub expects
1068 This cant inherit from the base class because bsub expects
1069 to be piped a shell script in order to honor the #BSUB directives :
1069 to be piped a shell script in order to honor the #BSUB directives :
1070 bsub < script
1070 bsub < script
1071 """
1071 """
1072 # Here we save profile_dir in the context so they
1072 # Here we save profile_dir in the context so they
1073 # can be used in the batch script template as {profile_dir}
1073 # can be used in the batch script template as {profile_dir}
1074 self.write_batch_script(n)
1074 self.write_batch_script(n)
1075 #output = check_output(self.args, env=os.environ)
1075 #output = check_output(self.args, env=os.environ)
1076 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1076 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1077 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1077 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1078 output,err = p.communicate()
1078 output,err = p.communicate()
1079 job_id = self.parse_job_id(output)
1079 job_id = self.parse_job_id(output)
1080 self.notify_start(job_id)
1080 self.notify_start(job_id)
1081 return job_id
1081 return job_id
1082
1082
1083
1083
1084 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1084 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1085 """Launch a controller using LSF."""
1085 """Launch a controller using LSF."""
1086
1086
1087 batch_file_name = Unicode(u'lsf_controller', config=True,
1087 batch_file_name = Unicode(u'lsf_controller', config=True,
1088 help="batch file name for the controller job.")
1088 help="batch file name for the controller job.")
1089 default_template= Unicode("""#!/bin/sh
1089 default_template= Unicode("""#!/bin/sh
1090 #BSUB -J ipcontroller
1090 #BSUB -J ipcontroller
1091 #BSUB -oo ipcontroller.o.%%J
1091 #BSUB -oo ipcontroller.o.%%J
1092 #BSUB -eo ipcontroller.e.%%J
1092 #BSUB -eo ipcontroller.e.%%J
1093 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1093 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1094 """%(' '.join(ipcontroller_cmd_argv)))
1094 """%(' '.join(ipcontroller_cmd_argv)))
1095
1095
1096 def start(self):
1096 def start(self):
1097 """Start the controller by profile or profile_dir."""
1097 """Start the controller by profile or profile_dir."""
1098 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1098 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1099 return super(LSFControllerLauncher, self).start(1)
1099 return super(LSFControllerLauncher, self).start(1)
1100
1100
1101
1101
1102 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1102 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1103 """Launch Engines using LSF"""
1103 """Launch Engines using LSF"""
1104 batch_file_name = Unicode(u'lsf_engines', config=True,
1104 batch_file_name = Unicode(u'lsf_engines', config=True,
1105 help="batch file name for the engine(s) job.")
1105 help="batch file name for the engine(s) job.")
1106 default_template= Unicode(u"""#!/bin/sh
1106 default_template= Unicode(u"""#!/bin/sh
1107 #BSUB -oo ipengine.o.%%J
1107 #BSUB -oo ipengine.o.%%J
1108 #BSUB -eo ipengine.e.%%J
1108 #BSUB -eo ipengine.e.%%J
1109 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1109 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1110 """%(' '.join(ipengine_cmd_argv)))
1110 """%(' '.join(ipengine_cmd_argv)))
1111
1111
1112 def start(self, n):
1112 def start(self, n):
1113 """Start n engines by profile or profile_dir."""
1113 """Start n engines by profile or profile_dir."""
1114 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1114 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1115 return super(LSFEngineSetLauncher, self).start(n)
1115 return super(LSFEngineSetLauncher, self).start(n)
1116
1116
1117
1117
1118 #-----------------------------------------------------------------------------
1118 #-----------------------------------------------------------------------------
1119 # A launcher for ipcluster itself!
1119 # A launcher for ipcluster itself!
1120 #-----------------------------------------------------------------------------
1120 #-----------------------------------------------------------------------------
1121
1121
1122
1122
1123 class IPClusterLauncher(LocalProcessLauncher):
1123 class IPClusterLauncher(LocalProcessLauncher):
1124 """Launch the ipcluster program in an external process."""
1124 """Launch the ipcluster program in an external process."""
1125
1125
1126 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1126 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1127 help="Popen command for ipcluster")
1127 help="Popen command for ipcluster")
1128 ipcluster_args = List(
1128 ipcluster_args = List(
1129 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1129 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1130 help="Command line arguments to pass to ipcluster.")
1130 help="Command line arguments to pass to ipcluster.")
1131 ipcluster_subcommand = Unicode('start')
1131 ipcluster_subcommand = Unicode('start')
1132 ipcluster_n = Integer(2)
1132 ipcluster_n = Integer(2)
1133
1133
1134 def find_args(self):
1134 def find_args(self):
1135 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1135 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1136 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1136 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1137
1137
1138 def start(self):
1138 def start(self):
1139 self.log.info("Starting ipcluster: %r" % self.args)
1139 self.log.info("Starting ipcluster: %r" % self.args)
1140 return super(IPClusterLauncher, self).start()
1140 return super(IPClusterLauncher, self).start()
1141
1141
1142 #-----------------------------------------------------------------------------
1142 #-----------------------------------------------------------------------------
1143 # Collections of launchers
1143 # Collections of launchers
1144 #-----------------------------------------------------------------------------
1144 #-----------------------------------------------------------------------------
1145
1145
1146 local_launchers = [
1146 local_launchers = [
1147 LocalControllerLauncher,
1147 LocalControllerLauncher,
1148 LocalEngineLauncher,
1148 LocalEngineLauncher,
1149 LocalEngineSetLauncher,
1149 LocalEngineSetLauncher,
1150 ]
1150 ]
1151 mpi_launchers = [
1151 mpi_launchers = [
1152 MPIExecLauncher,
1152 MPIExecLauncher,
1153 MPIExecControllerLauncher,
1153 MPIExecControllerLauncher,
1154 MPIExecEngineSetLauncher,
1154 MPIExecEngineSetLauncher,
1155 ]
1155 ]
1156 ssh_launchers = [
1156 ssh_launchers = [
1157 SSHLauncher,
1157 SSHLauncher,
1158 SSHControllerLauncher,
1158 SSHControllerLauncher,
1159 SSHEngineLauncher,
1159 SSHEngineLauncher,
1160 SSHEngineSetLauncher,
1160 SSHEngineSetLauncher,
1161 ]
1161 ]
1162 winhpc_launchers = [
1162 winhpc_launchers = [
1163 WindowsHPCLauncher,
1163 WindowsHPCLauncher,
1164 WindowsHPCControllerLauncher,
1164 WindowsHPCControllerLauncher,
1165 WindowsHPCEngineSetLauncher,
1165 WindowsHPCEngineSetLauncher,
1166 ]
1166 ]
1167 pbs_launchers = [
1167 pbs_launchers = [
1168 PBSLauncher,
1168 PBSLauncher,
1169 PBSControllerLauncher,
1169 PBSControllerLauncher,
1170 PBSEngineSetLauncher,
1170 PBSEngineSetLauncher,
1171 ]
1171 ]
1172 sge_launchers = [
1172 sge_launchers = [
1173 SGELauncher,
1173 SGELauncher,
1174 SGEControllerLauncher,
1174 SGEControllerLauncher,
1175 SGEEngineSetLauncher,
1175 SGEEngineSetLauncher,
1176 ]
1176 ]
1177 lsf_launchers = [
1177 lsf_launchers = [
1178 LSFLauncher,
1178 LSFLauncher,
1179 LSFControllerLauncher,
1179 LSFControllerLauncher,
1180 LSFEngineSetLauncher,
1180 LSFEngineSetLauncher,
1181 ]
1181 ]
1182 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1182 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1183 + pbs_launchers + sge_launchers + lsf_launchers
1183 + pbs_launchers + sge_launchers + lsf_launchers
1184
1184
General Comments 0
You need to be logged in to leave comments. Login now