##// END OF EJS Templates
add delay configurable to EngineSetLaunchers...
MinRK -
Show More
@@ -1,1141 +1,1151
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27
27
28 # signal imports, handling various platforms, versions
28 # signal imports, handling various platforms, versions
29
29
30 from signal import SIGINT, SIGTERM
30 from signal import SIGINT, SIGTERM
31 try:
31 try:
32 from signal import SIGKILL
32 from signal import SIGKILL
33 except ImportError:
33 except ImportError:
34 # Windows
34 # Windows
35 SIGKILL=SIGTERM
35 SIGKILL=SIGTERM
36
36
37 try:
37 try:
38 # Windows >= 2.7, 3.2
38 # Windows >= 2.7, 3.2
39 from signal import CTRL_C_EVENT as SIGINT
39 from signal import CTRL_C_EVENT as SIGINT
40 except ImportError:
40 except ImportError:
41 pass
41 pass
42
42
43 from subprocess import Popen, PIPE, STDOUT
43 from subprocess import Popen, PIPE, STDOUT
44 try:
44 try:
45 from subprocess import check_output
45 from subprocess import check_output
46 except ImportError:
46 except ImportError:
47 # pre-2.7, define check_output with Popen
47 # pre-2.7, define check_output with Popen
48 def check_output(*args, **kwargs):
48 def check_output(*args, **kwargs):
49 kwargs.update(dict(stdout=PIPE))
49 kwargs.update(dict(stdout=PIPE))
50 p = Popen(*args, **kwargs)
50 p = Popen(*args, **kwargs)
51 out,err = p.communicate()
51 out,err = p.communicate()
52 return out
52 return out
53
53
54 from zmq.eventloop import ioloop
54 from zmq.eventloop import ioloop
55
55
56 from IPython.config.application import Application
56 from IPython.config.application import Application
57 from IPython.config.configurable import LoggingConfigurable
57 from IPython.config.configurable import LoggingConfigurable
58 from IPython.utils.text import EvalFormatter
58 from IPython.utils.text import EvalFormatter
59 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
59 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.path import get_ipython_module_path
60 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62
62
63 from .win32support import forward_read_events
63 from .win32support import forward_read_events
64
64
65 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
65 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66
66
67 WINDOWS = os.name == 'nt'
67 WINDOWS = os.name == 'nt'
68
68
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 # Paths to the kernel apps
70 # Paths to the kernel apps
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72
72
73
73
74 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipclusterapp'
75 'IPython.parallel.apps.ipclusterapp'
76 ))
76 ))
77
77
78 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
78 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 'IPython.parallel.apps.ipengineapp'
79 'IPython.parallel.apps.ipengineapp'
80 ))
80 ))
81
81
82 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
82 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 'IPython.parallel.apps.ipcontrollerapp'
83 'IPython.parallel.apps.ipcontrollerapp'
84 ))
84 ))
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Base launchers and errors
87 # Base launchers and errors
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90
90
91 class LauncherError(Exception):
91 class LauncherError(Exception):
92 pass
92 pass
93
93
94
94
95 class ProcessStateError(LauncherError):
95 class ProcessStateError(LauncherError):
96 pass
96 pass
97
97
98
98
99 class UnknownStatus(LauncherError):
99 class UnknownStatus(LauncherError):
100 pass
100 pass
101
101
102
102
103 class BaseLauncher(LoggingConfigurable):
103 class BaseLauncher(LoggingConfigurable):
104 """An asbtraction for starting, stopping and signaling a process."""
104 """An asbtraction for starting, stopping and signaling a process."""
105
105
106 # In all of the launchers, the work_dir is where child processes will be
106 # In all of the launchers, the work_dir is where child processes will be
107 # run. This will usually be the profile_dir, but may not be. any work_dir
107 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # passed into the __init__ method will override the config value.
108 # passed into the __init__ method will override the config value.
109 # This should not be used to set the work_dir for the actual engine
109 # This should not be used to set the work_dir for the actual engine
110 # and controller. Instead, use their own config files or the
110 # and controller. Instead, use their own config files or the
111 # controller_args, engine_args attributes of the launchers to add
111 # controller_args, engine_args attributes of the launchers to add
112 # the work_dir option.
112 # the work_dir option.
113 work_dir = Unicode(u'.')
113 work_dir = Unicode(u'.')
114 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115
115
116 start_data = Any()
116 start_data = Any()
117 stop_data = Any()
117 stop_data = Any()
118
118
119 def _loop_default(self):
119 def _loop_default(self):
120 return ioloop.IOLoop.instance()
120 return ioloop.IOLoop.instance()
121
121
122 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 self.state = 'before' # can be before, running, after
124 self.state = 'before' # can be before, running, after
125 self.stop_callbacks = []
125 self.stop_callbacks = []
126 self.start_data = None
126 self.start_data = None
127 self.stop_data = None
127 self.stop_data = None
128
128
129 @property
129 @property
130 def args(self):
130 def args(self):
131 """A list of cmd and args that will be used to start the process.
131 """A list of cmd and args that will be used to start the process.
132
132
133 This is what is passed to :func:`spawnProcess` and the first element
133 This is what is passed to :func:`spawnProcess` and the first element
134 will be the process name.
134 will be the process name.
135 """
135 """
136 return self.find_args()
136 return self.find_args()
137
137
138 def find_args(self):
138 def find_args(self):
139 """The ``.args`` property calls this to find the args list.
139 """The ``.args`` property calls this to find the args list.
140
140
141 Subcommand should implement this to construct the cmd and args.
141 Subcommand should implement this to construct the cmd and args.
142 """
142 """
143 raise NotImplementedError('find_args must be implemented in a subclass')
143 raise NotImplementedError('find_args must be implemented in a subclass')
144
144
145 @property
145 @property
146 def arg_str(self):
146 def arg_str(self):
147 """The string form of the program arguments."""
147 """The string form of the program arguments."""
148 return ' '.join(self.args)
148 return ' '.join(self.args)
149
149
150 @property
150 @property
151 def running(self):
151 def running(self):
152 """Am I running."""
152 """Am I running."""
153 if self.state == 'running':
153 if self.state == 'running':
154 return True
154 return True
155 else:
155 else:
156 return False
156 return False
157
157
158 def start(self):
158 def start(self):
159 """Start the process."""
159 """Start the process."""
160 raise NotImplementedError('start must be implemented in a subclass')
160 raise NotImplementedError('start must be implemented in a subclass')
161
161
162 def stop(self):
162 def stop(self):
163 """Stop the process and notify observers of stopping.
163 """Stop the process and notify observers of stopping.
164
164
165 This method will return None immediately.
165 This method will return None immediately.
166 To observe the actual process stopping, see :meth:`on_stop`.
166 To observe the actual process stopping, see :meth:`on_stop`.
167 """
167 """
168 raise NotImplementedError('stop must be implemented in a subclass')
168 raise NotImplementedError('stop must be implemented in a subclass')
169
169
170 def on_stop(self, f):
170 def on_stop(self, f):
171 """Register a callback to be called with this Launcher's stop_data
171 """Register a callback to be called with this Launcher's stop_data
172 when the process actually finishes.
172 when the process actually finishes.
173 """
173 """
174 if self.state=='after':
174 if self.state=='after':
175 return f(self.stop_data)
175 return f(self.stop_data)
176 else:
176 else:
177 self.stop_callbacks.append(f)
177 self.stop_callbacks.append(f)
178
178
179 def notify_start(self, data):
179 def notify_start(self, data):
180 """Call this to trigger startup actions.
180 """Call this to trigger startup actions.
181
181
182 This logs the process startup and sets the state to 'running'. It is
182 This logs the process startup and sets the state to 'running'. It is
183 a pass-through so it can be used as a callback.
183 a pass-through so it can be used as a callback.
184 """
184 """
185
185
186 self.log.info('Process %r started: %r' % (self.args[0], data))
186 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.start_data = data
187 self.start_data = data
188 self.state = 'running'
188 self.state = 'running'
189 return data
189 return data
190
190
191 def notify_stop(self, data):
191 def notify_stop(self, data):
192 """Call this to trigger process stop actions.
192 """Call this to trigger process stop actions.
193
193
194 This logs the process stopping and sets the state to 'after'. Call
194 This logs the process stopping and sets the state to 'after'. Call
195 this to trigger callbacks registered via :meth:`on_stop`."""
195 this to trigger callbacks registered via :meth:`on_stop`."""
196
196
197 self.log.info('Process %r stopped: %r' % (self.args[0], data))
197 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.stop_data = data
198 self.stop_data = data
199 self.state = 'after'
199 self.state = 'after'
200 for i in range(len(self.stop_callbacks)):
200 for i in range(len(self.stop_callbacks)):
201 d = self.stop_callbacks.pop()
201 d = self.stop_callbacks.pop()
202 d(data)
202 d(data)
203 return data
203 return data
204
204
205 def signal(self, sig):
205 def signal(self, sig):
206 """Signal the process.
206 """Signal the process.
207
207
208 Parameters
208 Parameters
209 ----------
209 ----------
210 sig : str or int
210 sig : str or int
211 'KILL', 'INT', etc., or any signal number
211 'KILL', 'INT', etc., or any signal number
212 """
212 """
213 raise NotImplementedError('signal must be implemented in a subclass')
213 raise NotImplementedError('signal must be implemented in a subclass')
214
214
215
215
216 #-----------------------------------------------------------------------------
216 #-----------------------------------------------------------------------------
217 # Local process launchers
217 # Local process launchers
218 #-----------------------------------------------------------------------------
218 #-----------------------------------------------------------------------------
219
219
220
220
221 class LocalProcessLauncher(BaseLauncher):
221 class LocalProcessLauncher(BaseLauncher):
222 """Start and stop an external process in an asynchronous manner.
222 """Start and stop an external process in an asynchronous manner.
223
223
224 This will launch the external process with a working directory of
224 This will launch the external process with a working directory of
225 ``self.work_dir``.
225 ``self.work_dir``.
226 """
226 """
227
227
228 # This is used to to construct self.args, which is passed to
228 # This is used to to construct self.args, which is passed to
229 # spawnProcess.
229 # spawnProcess.
230 cmd_and_args = List([])
230 cmd_and_args = List([])
231 poll_frequency = Int(100) # in ms
231 poll_frequency = Int(100) # in ms
232
232
233 def __init__(self, work_dir=u'.', config=None, **kwargs):
233 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 super(LocalProcessLauncher, self).__init__(
234 super(LocalProcessLauncher, self).__init__(
235 work_dir=work_dir, config=config, **kwargs
235 work_dir=work_dir, config=config, **kwargs
236 )
236 )
237 self.process = None
237 self.process = None
238 self.poller = None
238 self.poller = None
239
239
240 def find_args(self):
240 def find_args(self):
241 return self.cmd_and_args
241 return self.cmd_and_args
242
242
243 def start(self):
243 def start(self):
244 if self.state == 'before':
244 if self.state == 'before':
245 self.process = Popen(self.args,
245 self.process = Popen(self.args,
246 stdout=PIPE,stderr=PIPE,stdin=PIPE,
246 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 env=os.environ,
247 env=os.environ,
248 cwd=self.work_dir
248 cwd=self.work_dir
249 )
249 )
250 if WINDOWS:
250 if WINDOWS:
251 self.stdout = forward_read_events(self.process.stdout)
251 self.stdout = forward_read_events(self.process.stdout)
252 self.stderr = forward_read_events(self.process.stderr)
252 self.stderr = forward_read_events(self.process.stderr)
253 else:
253 else:
254 self.stdout = self.process.stdout.fileno()
254 self.stdout = self.process.stdout.fileno()
255 self.stderr = self.process.stderr.fileno()
255 self.stderr = self.process.stderr.fileno()
256 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
256 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
257 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 self.poller.start()
259 self.poller.start()
260 self.notify_start(self.process.pid)
260 self.notify_start(self.process.pid)
261 else:
261 else:
262 s = 'The process was already started and has state: %r' % self.state
262 s = 'The process was already started and has state: %r' % self.state
263 raise ProcessStateError(s)
263 raise ProcessStateError(s)
264
264
265 def stop(self):
265 def stop(self):
266 return self.interrupt_then_kill()
266 return self.interrupt_then_kill()
267
267
268 def signal(self, sig):
268 def signal(self, sig):
269 if self.state == 'running':
269 if self.state == 'running':
270 if WINDOWS and sig != SIGINT:
270 if WINDOWS and sig != SIGINT:
271 # use Windows tree-kill for better child cleanup
271 # use Windows tree-kill for better child cleanup
272 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
272 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 else:
273 else:
274 self.process.send_signal(sig)
274 self.process.send_signal(sig)
275
275
276 def interrupt_then_kill(self, delay=2.0):
276 def interrupt_then_kill(self, delay=2.0):
277 """Send INT, wait a delay and then send KILL."""
277 """Send INT, wait a delay and then send KILL."""
278 try:
278 try:
279 self.signal(SIGINT)
279 self.signal(SIGINT)
280 except Exception:
280 except Exception:
281 self.log.debug("interrupt failed")
281 self.log.debug("interrupt failed")
282 pass
282 pass
283 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
283 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 self.killer.start()
284 self.killer.start()
285
285
286 # callbacks, etc:
286 # callbacks, etc:
287
287
288 def handle_stdout(self, fd, events):
288 def handle_stdout(self, fd, events):
289 if WINDOWS:
289 if WINDOWS:
290 line = self.stdout.recv()
290 line = self.stdout.recv()
291 else:
291 else:
292 line = self.process.stdout.readline()
292 line = self.process.stdout.readline()
293 # a stopped process will be readable but return empty strings
293 # a stopped process will be readable but return empty strings
294 if line:
294 if line:
295 self.log.info(line[:-1])
295 self.log.info(line[:-1])
296 else:
296 else:
297 self.poll()
297 self.poll()
298
298
299 def handle_stderr(self, fd, events):
299 def handle_stderr(self, fd, events):
300 if WINDOWS:
300 if WINDOWS:
301 line = self.stderr.recv()
301 line = self.stderr.recv()
302 else:
302 else:
303 line = self.process.stderr.readline()
303 line = self.process.stderr.readline()
304 # a stopped process will be readable but return empty strings
304 # a stopped process will be readable but return empty strings
305 if line:
305 if line:
306 self.log.error(line[:-1])
306 self.log.error(line[:-1])
307 else:
307 else:
308 self.poll()
308 self.poll()
309
309
310 def poll(self):
310 def poll(self):
311 status = self.process.poll()
311 status = self.process.poll()
312 if status is not None:
312 if status is not None:
313 self.poller.stop()
313 self.poller.stop()
314 self.loop.remove_handler(self.stdout)
314 self.loop.remove_handler(self.stdout)
315 self.loop.remove_handler(self.stderr)
315 self.loop.remove_handler(self.stderr)
316 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
316 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 return status
317 return status
318
318
319 class LocalControllerLauncher(LocalProcessLauncher):
319 class LocalControllerLauncher(LocalProcessLauncher):
320 """Launch a controller as a regular external process."""
320 """Launch a controller as a regular external process."""
321
321
322 controller_cmd = List(ipcontroller_cmd_argv, config=True,
322 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 help="""Popen command to launch ipcontroller.""")
323 help="""Popen command to launch ipcontroller.""")
324 # Command line arguments to ipcontroller.
324 # Command line arguments to ipcontroller.
325 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
325 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
326 help="""command-line args to pass to ipcontroller""")
326 help="""command-line args to pass to ipcontroller""")
327
327
328 def find_args(self):
328 def find_args(self):
329 return self.controller_cmd + self.controller_args
329 return self.controller_cmd + self.controller_args
330
330
331 def start(self, profile_dir):
331 def start(self, profile_dir):
332 """Start the controller by profile_dir."""
332 """Start the controller by profile_dir."""
333 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
333 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
334 self.profile_dir = unicode(profile_dir)
334 self.profile_dir = unicode(profile_dir)
335 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
335 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 return super(LocalControllerLauncher, self).start()
336 return super(LocalControllerLauncher, self).start()
337
337
338
338
339 class LocalEngineLauncher(LocalProcessLauncher):
339 class LocalEngineLauncher(LocalProcessLauncher):
340 """Launch a single engine as a regular externall process."""
340 """Launch a single engine as a regular externall process."""
341
341
342 engine_cmd = List(ipengine_cmd_argv, config=True,
342 engine_cmd = List(ipengine_cmd_argv, config=True,
343 help="""command to launch the Engine.""")
343 help="""command to launch the Engine.""")
344 # Command line arguments for ipengine.
344 # Command line arguments for ipengine.
345 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
345 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
346 help="command-line arguments to pass to ipengine"
346 help="command-line arguments to pass to ipengine"
347 )
347 )
348
348
349 def find_args(self):
349 def find_args(self):
350 return self.engine_cmd + self.engine_args
350 return self.engine_cmd + self.engine_args
351
351
352 def start(self, profile_dir):
352 def start(self, profile_dir):
353 """Start the engine by profile_dir."""
353 """Start the engine by profile_dir."""
354 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
354 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
355 self.profile_dir = unicode(profile_dir)
355 self.profile_dir = unicode(profile_dir)
356 return super(LocalEngineLauncher, self).start()
356 return super(LocalEngineLauncher, self).start()
357
357
358
358
359 class LocalEngineSetLauncher(BaseLauncher):
359 class LocalEngineSetLauncher(BaseLauncher):
360 """Launch a set of engines as regular external processes."""
360 """Launch a set of engines as regular external processes."""
361
361
362 # Command line arguments for ipengine.
362 # Command line arguments for ipengine.
363 engine_args = List(
363 engine_args = List(
364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 help="command-line arguments to pass to ipengine"
365 help="command-line arguments to pass to ipengine"
366 )
366 )
367 delay = CFloat(0.1, config=True,
368 help="""delay (in seconds) between starting each engine after the first.
369 This can help force the engines to get their ids in order, or limit
370 process flood when starting many engines."""
371 )
372
367 # launcher class
373 # launcher class
368 launcher_class = LocalEngineLauncher
374 launcher_class = LocalEngineLauncher
369
375
370 launchers = Dict()
376 launchers = Dict()
371 stop_data = Dict()
377 stop_data = Dict()
372
378
373 def __init__(self, work_dir=u'.', config=None, **kwargs):
379 def __init__(self, work_dir=u'.', config=None, **kwargs):
374 super(LocalEngineSetLauncher, self).__init__(
380 super(LocalEngineSetLauncher, self).__init__(
375 work_dir=work_dir, config=config, **kwargs
381 work_dir=work_dir, config=config, **kwargs
376 )
382 )
377 self.stop_data = {}
383 self.stop_data = {}
378
384
379 def start(self, n, profile_dir):
385 def start(self, n, profile_dir):
380 """Start n engines by profile or profile_dir."""
386 """Start n engines by profile or profile_dir."""
381 self.profile_dir = unicode(profile_dir)
387 self.profile_dir = unicode(profile_dir)
382 dlist = []
388 dlist = []
383 for i in range(n):
389 for i in range(n):
390 if i > 0:
391 time.sleep(self.delay)
384 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 # Copy the engine args over to each engine launcher.
393 # Copy the engine args over to each engine launcher.
386 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
387 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
388 d = el.start(profile_dir)
396 d = el.start(profile_dir)
389 if i==0:
397 if i==0:
390 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.launchers[i] = el
399 self.launchers[i] = el
392 dlist.append(d)
400 dlist.append(d)
393 self.notify_start(dlist)
401 self.notify_start(dlist)
394 # The consumeErrors here could be dangerous
402 # The consumeErrors here could be dangerous
395 # dfinal = gatherBoth(dlist, consumeErrors=True)
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal.addCallback(self.notify_start)
404 # dfinal.addCallback(self.notify_start)
397 return dlist
405 return dlist
398
406
399 def find_args(self):
407 def find_args(self):
400 return ['engine set']
408 return ['engine set']
401
409
402 def signal(self, sig):
410 def signal(self, sig):
403 dlist = []
411 dlist = []
404 for el in self.launchers.itervalues():
412 for el in self.launchers.itervalues():
405 d = el.signal(sig)
413 d = el.signal(sig)
406 dlist.append(d)
414 dlist.append(d)
407 # dfinal = gatherBoth(dlist, consumeErrors=True)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
408 return dlist
416 return dlist
409
417
410 def interrupt_then_kill(self, delay=1.0):
418 def interrupt_then_kill(self, delay=1.0):
411 dlist = []
419 dlist = []
412 for el in self.launchers.itervalues():
420 for el in self.launchers.itervalues():
413 d = el.interrupt_then_kill(delay)
421 d = el.interrupt_then_kill(delay)
414 dlist.append(d)
422 dlist.append(d)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 return dlist
424 return dlist
417
425
418 def stop(self):
426 def stop(self):
419 return self.interrupt_then_kill()
427 return self.interrupt_then_kill()
420
428
421 def _notice_engine_stopped(self, data):
429 def _notice_engine_stopped(self, data):
422 pid = data['pid']
430 pid = data['pid']
423 for idx,el in self.launchers.iteritems():
431 for idx,el in self.launchers.iteritems():
424 if el.process.pid == pid:
432 if el.process.pid == pid:
425 break
433 break
426 self.launchers.pop(idx)
434 self.launchers.pop(idx)
427 self.stop_data[idx] = data
435 self.stop_data[idx] = data
428 if not self.launchers:
436 if not self.launchers:
429 self.notify_stop(self.stop_data)
437 self.notify_stop(self.stop_data)
430
438
431
439
432 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
433 # MPIExec launchers
441 # MPIExec launchers
434 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
435
443
436
444
437 class MPIExecLauncher(LocalProcessLauncher):
445 class MPIExecLauncher(LocalProcessLauncher):
438 """Launch an external process using mpiexec."""
446 """Launch an external process using mpiexec."""
439
447
440 mpi_cmd = List(['mpiexec'], config=True,
448 mpi_cmd = List(['mpiexec'], config=True,
441 help="The mpiexec command to use in starting the process."
449 help="The mpiexec command to use in starting the process."
442 )
450 )
443 mpi_args = List([], config=True,
451 mpi_args = List([], config=True,
444 help="The command line arguments to pass to mpiexec."
452 help="The command line arguments to pass to mpiexec."
445 )
453 )
446 program = List(['date'], config=True,
454 program = List(['date'], config=True,
447 help="The program to start via mpiexec.")
455 help="The program to start via mpiexec.")
448 program_args = List([], config=True,
456 program_args = List([], config=True,
449 help="The command line argument to the program."
457 help="The command line argument to the program."
450 )
458 )
451 n = Int(1)
459 n = Int(1)
452
460
453 def find_args(self):
461 def find_args(self):
454 """Build self.args using all the fields."""
462 """Build self.args using all the fields."""
455 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 self.program + self.program_args
464 self.program + self.program_args
457
465
458 def start(self, n):
466 def start(self, n):
459 """Start n instances of the program using mpiexec."""
467 """Start n instances of the program using mpiexec."""
460 self.n = n
468 self.n = n
461 return super(MPIExecLauncher, self).start()
469 return super(MPIExecLauncher, self).start()
462
470
463
471
464 class MPIExecControllerLauncher(MPIExecLauncher):
472 class MPIExecControllerLauncher(MPIExecLauncher):
465 """Launch a controller using mpiexec."""
473 """Launch a controller using mpiexec."""
466
474
467 controller_cmd = List(ipcontroller_cmd_argv, config=True,
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
468 help="Popen command to launch the Contropper"
476 help="Popen command to launch the Contropper"
469 )
477 )
470 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
478 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
471 help="Command line arguments to pass to ipcontroller."
479 help="Command line arguments to pass to ipcontroller."
472 )
480 )
473 n = Int(1)
481 n = Int(1)
474
482
475 def start(self, profile_dir):
483 def start(self, profile_dir):
476 """Start the controller by profile_dir."""
484 """Start the controller by profile_dir."""
477 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
485 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
478 self.profile_dir = unicode(profile_dir)
486 self.profile_dir = unicode(profile_dir)
479 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 return super(MPIExecControllerLauncher, self).start(1)
488 return super(MPIExecControllerLauncher, self).start(1)
481
489
482 def find_args(self):
490 def find_args(self):
483 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
491 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
484 self.controller_cmd + self.controller_args
492 self.controller_cmd + self.controller_args
485
493
486
494
487 class MPIExecEngineSetLauncher(MPIExecLauncher):
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
488
496
489 program = List(ipengine_cmd_argv, config=True,
497 program = List(ipengine_cmd_argv, config=True,
490 help="Popen command for ipengine"
498 help="Popen command for ipengine"
491 )
499 )
492 program_args = List(
500 program_args = List(
493 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
501 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
494 help="Command line arguments for ipengine."
502 help="Command line arguments for ipengine."
495 )
503 )
496 n = Int(1)
504 n = Int(1)
497
505
498 def start(self, n, profile_dir):
506 def start(self, n, profile_dir):
499 """Start n engines by profile or profile_dir."""
507 """Start n engines by profile or profile_dir."""
500 self.program_args.extend(['--profile-dir=%s'%profile_dir])
508 self.program_args.extend(['--profile-dir=%s'%profile_dir])
501 self.profile_dir = unicode(profile_dir)
509 self.profile_dir = unicode(profile_dir)
502 self.n = n
510 self.n = n
503 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
504 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
505
513
506 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
507 # SSH launchers
515 # SSH launchers
508 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
509
517
510 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
511
519
512 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
513 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
514
522
515 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
516 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
517 as well.
525 as well.
518 """
526 """
519
527
520 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
521 help="command for starting ssh")
529 help="command for starting ssh")
522 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
523 help="args to pass to ssh")
531 help="args to pass to ssh")
524 program = List(['date'], config=True,
532 program = List(['date'], config=True,
525 help="Program to launch via ssh")
533 help="Program to launch via ssh")
526 program_args = List([], config=True,
534 program_args = List([], config=True,
527 help="args to pass to remote program")
535 help="args to pass to remote program")
528 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
529 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
530 user = Unicode('', config=True,
538 user = Unicode('', config=True,
531 help="username for ssh")
539 help="username for ssh")
532 location = Unicode('', config=True,
540 location = Unicode('', config=True,
533 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
534
542
535 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
536 if self.user:
544 if self.user:
537 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
538 else:
546 else:
539 self.location = new
547 self.location = new
540
548
541 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
542 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
543
551
544 def find_args(self):
552 def find_args(self):
545 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
546 self.program + self.program_args
554 self.program + self.program_args
547
555
548 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, profile_dir, hostname=None, user=None):
549 self.profile_dir = unicode(profile_dir)
557 self.profile_dir = unicode(profile_dir)
550 if hostname is not None:
558 if hostname is not None:
551 self.hostname = hostname
559 self.hostname = hostname
552 if user is not None:
560 if user is not None:
553 self.user = user
561 self.user = user
554
562
555 return super(SSHLauncher, self).start()
563 return super(SSHLauncher, self).start()
556
564
557 def signal(self, sig):
565 def signal(self, sig):
558 if self.state == 'running':
566 if self.state == 'running':
559 # send escaped ssh connection-closer
567 # send escaped ssh connection-closer
560 self.process.stdin.write('~.')
568 self.process.stdin.write('~.')
561 self.process.stdin.flush()
569 self.process.stdin.flush()
562
570
563
571
564
572
565 class SSHControllerLauncher(SSHLauncher):
573 class SSHControllerLauncher(SSHLauncher):
566
574
567 program = List(ipcontroller_cmd_argv, config=True,
575 program = List(ipcontroller_cmd_argv, config=True,
568 help="remote ipcontroller command.")
576 help="remote ipcontroller command.")
569 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
577 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
570 help="Command line arguments to ipcontroller.")
578 help="Command line arguments to ipcontroller.")
571
579
572
580
573 class SSHEngineLauncher(SSHLauncher):
581 class SSHEngineLauncher(SSHLauncher):
574 program = List(ipengine_cmd_argv, config=True,
582 program = List(ipengine_cmd_argv, config=True,
575 help="remote ipengine command.")
583 help="remote ipengine command.")
576 # Command line arguments for ipengine.
584 # Command line arguments for ipengine.
577 program_args = List(
585 program_args = List(
578 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
586 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipengine."
587 help="Command line arguments to ipengine."
580 )
588 )
581
589
582 class SSHEngineSetLauncher(LocalEngineSetLauncher):
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
583 launcher_class = SSHEngineLauncher
591 launcher_class = SSHEngineLauncher
584 engines = Dict(config=True,
592 engines = Dict(config=True,
585 help="""dict of engines to launch. This is a dict by hostname of ints,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
586 corresponding to the number of engines to start on that host.""")
594 corresponding to the number of engines to start on that host.""")
587
595
588 def start(self, n, profile_dir):
596 def start(self, n, profile_dir):
589 """Start engines by profile or profile_dir.
597 """Start engines by profile or profile_dir.
590 `n` is ignored, and the `engines` config property is used instead.
598 `n` is ignored, and the `engines` config property is used instead.
591 """
599 """
592
600
593 self.profile_dir = unicode(profile_dir)
601 self.profile_dir = unicode(profile_dir)
594 dlist = []
602 dlist = []
595 for host, n in self.engines.iteritems():
603 for host, n in self.engines.iteritems():
596 if isinstance(n, (tuple, list)):
604 if isinstance(n, (tuple, list)):
597 n, args = n
605 n, args = n
598 else:
606 else:
599 args = copy.deepcopy(self.engine_args)
607 args = copy.deepcopy(self.engine_args)
600
608
601 if '@' in host:
609 if '@' in host:
602 user,host = host.split('@',1)
610 user,host = host.split('@',1)
603 else:
611 else:
604 user=None
612 user=None
605 for i in range(n):
613 for i in range(n):
614 if i > 0:
615 time.sleep(self.delay)
606 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
616 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607
617
608 # Copy the engine args over to each engine launcher.
618 # Copy the engine args over to each engine launcher.
609 i
619 i
610 el.program_args = args
620 el.program_args = args
611 el.on_stop(self._notice_engine_stopped)
621 el.on_stop(self._notice_engine_stopped)
612 d = el.start(profile_dir, user=user, hostname=host)
622 d = el.start(profile_dir, user=user, hostname=host)
613 if i==0:
623 if i==0:
614 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
615 self.launchers[host+str(i)] = el
625 self.launchers[host+str(i)] = el
616 dlist.append(d)
626 dlist.append(d)
617 self.notify_start(dlist)
627 self.notify_start(dlist)
618 return dlist
628 return dlist
619
629
620
630
621
631
622 #-----------------------------------------------------------------------------
632 #-----------------------------------------------------------------------------
623 # Windows HPC Server 2008 scheduler launchers
633 # Windows HPC Server 2008 scheduler launchers
624 #-----------------------------------------------------------------------------
634 #-----------------------------------------------------------------------------
625
635
626
636
627 # This is only used on Windows.
637 # This is only used on Windows.
628 def find_job_cmd():
638 def find_job_cmd():
629 if WINDOWS:
639 if WINDOWS:
630 try:
640 try:
631 return find_cmd('job')
641 return find_cmd('job')
632 except (FindCmdError, ImportError):
642 except (FindCmdError, ImportError):
633 # ImportError will be raised if win32api is not installed
643 # ImportError will be raised if win32api is not installed
634 return 'job'
644 return 'job'
635 else:
645 else:
636 return 'job'
646 return 'job'
637
647
638
648
639 class WindowsHPCLauncher(BaseLauncher):
649 class WindowsHPCLauncher(BaseLauncher):
640
650
641 job_id_regexp = Unicode(r'\d+', config=True,
651 job_id_regexp = Unicode(r'\d+', config=True,
642 help="""A regular expression used to get the job id from the output of the
652 help="""A regular expression used to get the job id from the output of the
643 submit_command. """
653 submit_command. """
644 )
654 )
645 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 job_file_name = Unicode(u'ipython_job.xml', config=True,
646 help="The filename of the instantiated job script.")
656 help="The filename of the instantiated job script.")
647 # The full path to the instantiated job script. This gets made dynamically
657 # The full path to the instantiated job script. This gets made dynamically
648 # by combining the work_dir with the job_file_name.
658 # by combining the work_dir with the job_file_name.
649 job_file = Unicode(u'')
659 job_file = Unicode(u'')
650 scheduler = Unicode('', config=True,
660 scheduler = Unicode('', config=True,
651 help="The hostname of the scheduler to submit the job to.")
661 help="The hostname of the scheduler to submit the job to.")
652 job_cmd = Unicode(find_job_cmd(), config=True,
662 job_cmd = Unicode(find_job_cmd(), config=True,
653 help="The command for submitting jobs.")
663 help="The command for submitting jobs.")
654
664
655 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 def __init__(self, work_dir=u'.', config=None, **kwargs):
656 super(WindowsHPCLauncher, self).__init__(
666 super(WindowsHPCLauncher, self).__init__(
657 work_dir=work_dir, config=config, **kwargs
667 work_dir=work_dir, config=config, **kwargs
658 )
668 )
659
669
660 @property
670 @property
661 def job_file(self):
671 def job_file(self):
662 return os.path.join(self.work_dir, self.job_file_name)
672 return os.path.join(self.work_dir, self.job_file_name)
663
673
664 def write_job_file(self, n):
674 def write_job_file(self, n):
665 raise NotImplementedError("Implement write_job_file in a subclass.")
675 raise NotImplementedError("Implement write_job_file in a subclass.")
666
676
667 def find_args(self):
677 def find_args(self):
668 return [u'job.exe']
678 return [u'job.exe']
669
679
670 def parse_job_id(self, output):
680 def parse_job_id(self, output):
671 """Take the output of the submit command and return the job id."""
681 """Take the output of the submit command and return the job id."""
672 m = re.search(self.job_id_regexp, output)
682 m = re.search(self.job_id_regexp, output)
673 if m is not None:
683 if m is not None:
674 job_id = m.group()
684 job_id = m.group()
675 else:
685 else:
676 raise LauncherError("Job id couldn't be determined: %s" % output)
686 raise LauncherError("Job id couldn't be determined: %s" % output)
677 self.job_id = job_id
687 self.job_id = job_id
678 self.log.info('Job started with job id: %r' % job_id)
688 self.log.info('Job started with job id: %r' % job_id)
679 return job_id
689 return job_id
680
690
681 def start(self, n):
691 def start(self, n):
682 """Start n copies of the process using the Win HPC job scheduler."""
692 """Start n copies of the process using the Win HPC job scheduler."""
683 self.write_job_file(n)
693 self.write_job_file(n)
684 args = [
694 args = [
685 'submit',
695 'submit',
686 '/jobfile:%s' % self.job_file,
696 '/jobfile:%s' % self.job_file,
687 '/scheduler:%s' % self.scheduler
697 '/scheduler:%s' % self.scheduler
688 ]
698 ]
689 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
690
700
691 output = check_output([self.job_cmd]+args,
701 output = check_output([self.job_cmd]+args,
692 env=os.environ,
702 env=os.environ,
693 cwd=self.work_dir,
703 cwd=self.work_dir,
694 stderr=STDOUT
704 stderr=STDOUT
695 )
705 )
696 job_id = self.parse_job_id(output)
706 job_id = self.parse_job_id(output)
697 self.notify_start(job_id)
707 self.notify_start(job_id)
698 return job_id
708 return job_id
699
709
700 def stop(self):
710 def stop(self):
701 args = [
711 args = [
702 'cancel',
712 'cancel',
703 self.job_id,
713 self.job_id,
704 '/scheduler:%s' % self.scheduler
714 '/scheduler:%s' % self.scheduler
705 ]
715 ]
706 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
707 try:
717 try:
708 output = check_output([self.job_cmd]+args,
718 output = check_output([self.job_cmd]+args,
709 env=os.environ,
719 env=os.environ,
710 cwd=self.work_dir,
720 cwd=self.work_dir,
711 stderr=STDOUT
721 stderr=STDOUT
712 )
722 )
713 except:
723 except:
714 output = 'The job already appears to be stoppped: %r' % self.job_id
724 output = 'The job already appears to be stoppped: %r' % self.job_id
715 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
716 return output
726 return output
717
727
718
728
719 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
720
730
721 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
722 help="WinHPC xml job file.")
732 help="WinHPC xml job file.")
723 extra_args = List([], config=False,
733 extra_args = List([], config=False,
724 help="extra args to pass to ipcontroller")
734 help="extra args to pass to ipcontroller")
725
735
726 def write_job_file(self, n):
736 def write_job_file(self, n):
727 job = IPControllerJob(config=self.config)
737 job = IPControllerJob(config=self.config)
728
738
729 t = IPControllerTask(config=self.config)
739 t = IPControllerTask(config=self.config)
730 # The tasks work directory is *not* the actual work directory of
740 # The tasks work directory is *not* the actual work directory of
731 # the controller. It is used as the base path for the stdout/stderr
741 # the controller. It is used as the base path for the stdout/stderr
732 # files that the scheduler redirects to.
742 # files that the scheduler redirects to.
733 t.work_directory = self.profile_dir
743 t.work_directory = self.profile_dir
734 # Add the profile_dir and from self.start().
744 # Add the profile_dir and from self.start().
735 t.controller_args.extend(self.extra_args)
745 t.controller_args.extend(self.extra_args)
736 job.add_task(t)
746 job.add_task(t)
737
747
738 self.log.info("Writing job description file: %s" % self.job_file)
748 self.log.info("Writing job description file: %s" % self.job_file)
739 job.write(self.job_file)
749 job.write(self.job_file)
740
750
741 @property
751 @property
742 def job_file(self):
752 def job_file(self):
743 return os.path.join(self.profile_dir, self.job_file_name)
753 return os.path.join(self.profile_dir, self.job_file_name)
744
754
745 def start(self, profile_dir):
755 def start(self, profile_dir):
746 """Start the controller by profile_dir."""
756 """Start the controller by profile_dir."""
747 self.extra_args = ['--profile-dir=%s'%profile_dir]
757 self.extra_args = ['--profile-dir=%s'%profile_dir]
748 self.profile_dir = unicode(profile_dir)
758 self.profile_dir = unicode(profile_dir)
749 return super(WindowsHPCControllerLauncher, self).start(1)
759 return super(WindowsHPCControllerLauncher, self).start(1)
750
760
751
761
752 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
753
763
754 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
755 help="jobfile for ipengines job")
765 help="jobfile for ipengines job")
756 extra_args = List([], config=False,
766 extra_args = List([], config=False,
757 help="extra args to pas to ipengine")
767 help="extra args to pas to ipengine")
758
768
759 def write_job_file(self, n):
769 def write_job_file(self, n):
760 job = IPEngineSetJob(config=self.config)
770 job = IPEngineSetJob(config=self.config)
761
771
762 for i in range(n):
772 for i in range(n):
763 t = IPEngineTask(config=self.config)
773 t = IPEngineTask(config=self.config)
764 # The tasks work directory is *not* the actual work directory of
774 # The tasks work directory is *not* the actual work directory of
765 # the engine. It is used as the base path for the stdout/stderr
775 # the engine. It is used as the base path for the stdout/stderr
766 # files that the scheduler redirects to.
776 # files that the scheduler redirects to.
767 t.work_directory = self.profile_dir
777 t.work_directory = self.profile_dir
768 # Add the profile_dir and from self.start().
778 # Add the profile_dir and from self.start().
769 t.engine_args.extend(self.extra_args)
779 t.engine_args.extend(self.extra_args)
770 job.add_task(t)
780 job.add_task(t)
771
781
772 self.log.info("Writing job description file: %s" % self.job_file)
782 self.log.info("Writing job description file: %s" % self.job_file)
773 job.write(self.job_file)
783 job.write(self.job_file)
774
784
775 @property
785 @property
776 def job_file(self):
786 def job_file(self):
777 return os.path.join(self.profile_dir, self.job_file_name)
787 return os.path.join(self.profile_dir, self.job_file_name)
778
788
779 def start(self, n, profile_dir):
789 def start(self, n, profile_dir):
780 """Start the controller by profile_dir."""
790 """Start the controller by profile_dir."""
781 self.extra_args = ['--profile-dir=%s'%profile_dir]
791 self.extra_args = ['--profile-dir=%s'%profile_dir]
782 self.profile_dir = unicode(profile_dir)
792 self.profile_dir = unicode(profile_dir)
783 return super(WindowsHPCEngineSetLauncher, self).start(n)
793 return super(WindowsHPCEngineSetLauncher, self).start(n)
784
794
785
795
786 #-----------------------------------------------------------------------------
796 #-----------------------------------------------------------------------------
787 # Batch (PBS) system launchers
797 # Batch (PBS) system launchers
788 #-----------------------------------------------------------------------------
798 #-----------------------------------------------------------------------------
789
799
790 class BatchSystemLauncher(BaseLauncher):
800 class BatchSystemLauncher(BaseLauncher):
791 """Launch an external process using a batch system.
801 """Launch an external process using a batch system.
792
802
793 This class is designed to work with UNIX batch systems like PBS, LSF,
803 This class is designed to work with UNIX batch systems like PBS, LSF,
794 GridEngine, etc. The overall model is that there are different commands
804 GridEngine, etc. The overall model is that there are different commands
795 like qsub, qdel, etc. that handle the starting and stopping of the process.
805 like qsub, qdel, etc. that handle the starting and stopping of the process.
796
806
797 This class also has the notion of a batch script. The ``batch_template``
807 This class also has the notion of a batch script. The ``batch_template``
798 attribute can be set to a string that is a template for the batch script.
808 attribute can be set to a string that is a template for the batch script.
799 This template is instantiated using string formatting. Thus the template can
809 This template is instantiated using string formatting. Thus the template can
800 use {n} fot the number of instances. Subclasses can add additional variables
810 use {n} fot the number of instances. Subclasses can add additional variables
801 to the template dict.
811 to the template dict.
802 """
812 """
803
813
804 # Subclasses must fill these in. See PBSEngineSet
814 # Subclasses must fill these in. See PBSEngineSet
805 submit_command = List([''], config=True,
815 submit_command = List([''], config=True,
806 help="The name of the command line program used to submit jobs.")
816 help="The name of the command line program used to submit jobs.")
807 delete_command = List([''], config=True,
817 delete_command = List([''], config=True,
808 help="The name of the command line program used to delete jobs.")
818 help="The name of the command line program used to delete jobs.")
809 job_id_regexp = Unicode('', config=True,
819 job_id_regexp = Unicode('', config=True,
810 help="""A regular expression used to get the job id from the output of the
820 help="""A regular expression used to get the job id from the output of the
811 submit_command.""")
821 submit_command.""")
812 batch_template = Unicode('', config=True,
822 batch_template = Unicode('', config=True,
813 help="The string that is the batch script template itself.")
823 help="The string that is the batch script template itself.")
814 batch_template_file = Unicode(u'', config=True,
824 batch_template_file = Unicode(u'', config=True,
815 help="The file that contains the batch template.")
825 help="The file that contains the batch template.")
816 batch_file_name = Unicode(u'batch_script', config=True,
826 batch_file_name = Unicode(u'batch_script', config=True,
817 help="The filename of the instantiated batch script.")
827 help="The filename of the instantiated batch script.")
818 queue = Unicode(u'', config=True,
828 queue = Unicode(u'', config=True,
819 help="The PBS Queue.")
829 help="The PBS Queue.")
820
830
821 # not configurable, override in subclasses
831 # not configurable, override in subclasses
822 # PBS Job Array regex
832 # PBS Job Array regex
823 job_array_regexp = Unicode('')
833 job_array_regexp = Unicode('')
824 job_array_template = Unicode('')
834 job_array_template = Unicode('')
825 # PBS Queue regex
835 # PBS Queue regex
826 queue_regexp = Unicode('')
836 queue_regexp = Unicode('')
827 queue_template = Unicode('')
837 queue_template = Unicode('')
828 # The default batch template, override in subclasses
838 # The default batch template, override in subclasses
829 default_template = Unicode('')
839 default_template = Unicode('')
830 # The full path to the instantiated batch script.
840 # The full path to the instantiated batch script.
831 batch_file = Unicode(u'')
841 batch_file = Unicode(u'')
832 # the format dict used with batch_template:
842 # the format dict used with batch_template:
833 context = Dict()
843 context = Dict()
834 # the Formatter instance for rendering the templates:
844 # the Formatter instance for rendering the templates:
835 formatter = Instance(EvalFormatter, (), {})
845 formatter = Instance(EvalFormatter, (), {})
836
846
837
847
838 def find_args(self):
848 def find_args(self):
839 return self.submit_command + [self.batch_file]
849 return self.submit_command + [self.batch_file]
840
850
841 def __init__(self, work_dir=u'.', config=None, **kwargs):
851 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 super(BatchSystemLauncher, self).__init__(
852 super(BatchSystemLauncher, self).__init__(
843 work_dir=work_dir, config=config, **kwargs
853 work_dir=work_dir, config=config, **kwargs
844 )
854 )
845 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
855 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
846
856
847 def parse_job_id(self, output):
857 def parse_job_id(self, output):
848 """Take the output of the submit command and return the job id."""
858 """Take the output of the submit command and return the job id."""
849 m = re.search(self.job_id_regexp, output)
859 m = re.search(self.job_id_regexp, output)
850 if m is not None:
860 if m is not None:
851 job_id = m.group()
861 job_id = m.group()
852 else:
862 else:
853 raise LauncherError("Job id couldn't be determined: %s" % output)
863 raise LauncherError("Job id couldn't be determined: %s" % output)
854 self.job_id = job_id
864 self.job_id = job_id
855 self.log.info('Job submitted with job id: %r' % job_id)
865 self.log.info('Job submitted with job id: %r' % job_id)
856 return job_id
866 return job_id
857
867
858 def write_batch_script(self, n):
868 def write_batch_script(self, n):
859 """Instantiate and write the batch script to the work_dir."""
869 """Instantiate and write the batch script to the work_dir."""
860 self.context['n'] = n
870 self.context['n'] = n
861 self.context['queue'] = self.queue
871 self.context['queue'] = self.queue
862 # first priority is batch_template if set
872 # first priority is batch_template if set
863 if self.batch_template_file and not self.batch_template:
873 if self.batch_template_file and not self.batch_template:
864 # second priority is batch_template_file
874 # second priority is batch_template_file
865 with open(self.batch_template_file) as f:
875 with open(self.batch_template_file) as f:
866 self.batch_template = f.read()
876 self.batch_template = f.read()
867 if not self.batch_template:
877 if not self.batch_template:
868 # third (last) priority is default_template
878 # third (last) priority is default_template
869 self.batch_template = self.default_template
879 self.batch_template = self.default_template
870
880
871 # add jobarray or queue lines to user-specified template
881 # add jobarray or queue lines to user-specified template
872 # note that this is *only* when user did not specify a template.
882 # note that this is *only* when user did not specify a template.
873 regex = re.compile(self.job_array_regexp)
883 regex = re.compile(self.job_array_regexp)
874 # print regex.search(self.batch_template)
884 # print regex.search(self.batch_template)
875 if not regex.search(self.batch_template):
885 if not regex.search(self.batch_template):
876 self.log.info("adding job array settings to batch script")
886 self.log.info("adding job array settings to batch script")
877 firstline, rest = self.batch_template.split('\n',1)
887 firstline, rest = self.batch_template.split('\n',1)
878 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
888 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
879
889
880 regex = re.compile(self.queue_regexp)
890 regex = re.compile(self.queue_regexp)
881 # print regex.search(self.batch_template)
891 # print regex.search(self.batch_template)
882 if self.queue and not regex.search(self.batch_template):
892 if self.queue and not regex.search(self.batch_template):
883 self.log.info("adding PBS queue settings to batch script")
893 self.log.info("adding PBS queue settings to batch script")
884 firstline, rest = self.batch_template.split('\n',1)
894 firstline, rest = self.batch_template.split('\n',1)
885 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
895 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
886
896
887 script_as_string = self.formatter.format(self.batch_template, **self.context)
897 script_as_string = self.formatter.format(self.batch_template, **self.context)
888 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
898 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
889
899
890 with open(self.batch_file, 'w') as f:
900 with open(self.batch_file, 'w') as f:
891 f.write(script_as_string)
901 f.write(script_as_string)
892 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
902 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
893
903
894 def start(self, n, profile_dir):
904 def start(self, n, profile_dir):
895 """Start n copies of the process using a batch system."""
905 """Start n copies of the process using a batch system."""
896 # Here we save profile_dir in the context so they
906 # Here we save profile_dir in the context so they
897 # can be used in the batch script template as {profile_dir}
907 # can be used in the batch script template as {profile_dir}
898 self.context['profile_dir'] = profile_dir
908 self.context['profile_dir'] = profile_dir
899 self.profile_dir = unicode(profile_dir)
909 self.profile_dir = unicode(profile_dir)
900 self.write_batch_script(n)
910 self.write_batch_script(n)
901 output = check_output(self.args, env=os.environ)
911 output = check_output(self.args, env=os.environ)
902
912
903 job_id = self.parse_job_id(output)
913 job_id = self.parse_job_id(output)
904 self.notify_start(job_id)
914 self.notify_start(job_id)
905 return job_id
915 return job_id
906
916
907 def stop(self):
917 def stop(self):
908 output = check_output(self.delete_command+[self.job_id], env=os.environ)
918 output = check_output(self.delete_command+[self.job_id], env=os.environ)
909 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
919 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
910 return output
920 return output
911
921
912
922
913 class PBSLauncher(BatchSystemLauncher):
923 class PBSLauncher(BatchSystemLauncher):
914 """A BatchSystemLauncher subclass for PBS."""
924 """A BatchSystemLauncher subclass for PBS."""
915
925
916 submit_command = List(['qsub'], config=True,
926 submit_command = List(['qsub'], config=True,
917 help="The PBS submit command ['qsub']")
927 help="The PBS submit command ['qsub']")
918 delete_command = List(['qdel'], config=True,
928 delete_command = List(['qdel'], config=True,
919 help="The PBS delete command ['qsub']")
929 help="The PBS delete command ['qsub']")
920 job_id_regexp = Unicode(r'\d+', config=True,
930 job_id_regexp = Unicode(r'\d+', config=True,
921 help="Regular expresion for identifying the job ID [r'\d+']")
931 help="Regular expresion for identifying the job ID [r'\d+']")
922
932
923 batch_file = Unicode(u'')
933 batch_file = Unicode(u'')
924 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
934 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
925 job_array_template = Unicode('#PBS -t 1-{n}')
935 job_array_template = Unicode('#PBS -t 1-{n}')
926 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
936 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
927 queue_template = Unicode('#PBS -q {queue}')
937 queue_template = Unicode('#PBS -q {queue}')
928
938
929
939
930 class PBSControllerLauncher(PBSLauncher):
940 class PBSControllerLauncher(PBSLauncher):
931 """Launch a controller using PBS."""
941 """Launch a controller using PBS."""
932
942
933 batch_file_name = Unicode(u'pbs_controller', config=True,
943 batch_file_name = Unicode(u'pbs_controller', config=True,
934 help="batch file name for the controller job.")
944 help="batch file name for the controller job.")
935 default_template= Unicode("""#!/bin/sh
945 default_template= Unicode("""#!/bin/sh
936 #PBS -V
946 #PBS -V
937 #PBS -N ipcontroller
947 #PBS -N ipcontroller
938 %s --log-to-file --profile-dir={profile_dir}
948 %s --log-to-file --profile-dir={profile_dir}
939 """%(' '.join(ipcontroller_cmd_argv)))
949 """%(' '.join(ipcontroller_cmd_argv)))
940
950
941 def start(self, profile_dir):
951 def start(self, profile_dir):
942 """Start the controller by profile or profile_dir."""
952 """Start the controller by profile or profile_dir."""
943 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
953 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
944 return super(PBSControllerLauncher, self).start(1, profile_dir)
954 return super(PBSControllerLauncher, self).start(1, profile_dir)
945
955
946
956
947 class PBSEngineSetLauncher(PBSLauncher):
957 class PBSEngineSetLauncher(PBSLauncher):
948 """Launch Engines using PBS"""
958 """Launch Engines using PBS"""
949 batch_file_name = Unicode(u'pbs_engines', config=True,
959 batch_file_name = Unicode(u'pbs_engines', config=True,
950 help="batch file name for the engine(s) job.")
960 help="batch file name for the engine(s) job.")
951 default_template= Unicode(u"""#!/bin/sh
961 default_template= Unicode(u"""#!/bin/sh
952 #PBS -V
962 #PBS -V
953 #PBS -N ipengine
963 #PBS -N ipengine
954 %s --profile-dir={profile_dir}
964 %s --profile-dir={profile_dir}
955 """%(' '.join(ipengine_cmd_argv)))
965 """%(' '.join(ipengine_cmd_argv)))
956
966
957 def start(self, n, profile_dir):
967 def start(self, n, profile_dir):
958 """Start n engines by profile or profile_dir."""
968 """Start n engines by profile or profile_dir."""
959 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
969 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
960 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
970 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
961
971
962 #SGE is very similar to PBS
972 #SGE is very similar to PBS
963
973
964 class SGELauncher(PBSLauncher):
974 class SGELauncher(PBSLauncher):
965 """Sun GridEngine is a PBS clone with slightly different syntax"""
975 """Sun GridEngine is a PBS clone with slightly different syntax"""
966 job_array_regexp = Unicode('#\$\W+\-t')
976 job_array_regexp = Unicode('#\$\W+\-t')
967 job_array_template = Unicode('#$ -t 1-{n}')
977 job_array_template = Unicode('#$ -t 1-{n}')
968 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
978 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
969 queue_template = Unicode('#$ -q {queue}')
979 queue_template = Unicode('#$ -q {queue}')
970
980
971 class SGEControllerLauncher(SGELauncher):
981 class SGEControllerLauncher(SGELauncher):
972 """Launch a controller using SGE."""
982 """Launch a controller using SGE."""
973
983
974 batch_file_name = Unicode(u'sge_controller', config=True,
984 batch_file_name = Unicode(u'sge_controller', config=True,
975 help="batch file name for the ipontroller job.")
985 help="batch file name for the ipontroller job.")
976 default_template= Unicode(u"""#$ -V
986 default_template= Unicode(u"""#$ -V
977 #$ -S /bin/sh
987 #$ -S /bin/sh
978 #$ -N ipcontroller
988 #$ -N ipcontroller
979 %s --log-to-file --profile-dir={profile_dir}
989 %s --log-to-file --profile-dir={profile_dir}
980 """%(' '.join(ipcontroller_cmd_argv)))
990 """%(' '.join(ipcontroller_cmd_argv)))
981
991
982 def start(self, profile_dir):
992 def start(self, profile_dir):
983 """Start the controller by profile or profile_dir."""
993 """Start the controller by profile or profile_dir."""
984 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
994 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
985 return super(SGEControllerLauncher, self).start(1, profile_dir)
995 return super(SGEControllerLauncher, self).start(1, profile_dir)
986
996
987 class SGEEngineSetLauncher(SGELauncher):
997 class SGEEngineSetLauncher(SGELauncher):
988 """Launch Engines with SGE"""
998 """Launch Engines with SGE"""
989 batch_file_name = Unicode(u'sge_engines', config=True,
999 batch_file_name = Unicode(u'sge_engines', config=True,
990 help="batch file name for the engine(s) job.")
1000 help="batch file name for the engine(s) job.")
991 default_template = Unicode("""#$ -V
1001 default_template = Unicode("""#$ -V
992 #$ -S /bin/sh
1002 #$ -S /bin/sh
993 #$ -N ipengine
1003 #$ -N ipengine
994 %s --profile-dir={profile_dir}
1004 %s --profile-dir={profile_dir}
995 """%(' '.join(ipengine_cmd_argv)))
1005 """%(' '.join(ipengine_cmd_argv)))
996
1006
997 def start(self, n, profile_dir):
1007 def start(self, n, profile_dir):
998 """Start n engines by profile or profile_dir."""
1008 """Start n engines by profile or profile_dir."""
999 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1009 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1000 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1010 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1001
1011
1002
1012
1003 # LSF launchers
1013 # LSF launchers
1004
1014
1005 class LSFLauncher(BatchSystemLauncher):
1015 class LSFLauncher(BatchSystemLauncher):
1006 """A BatchSystemLauncher subclass for LSF."""
1016 """A BatchSystemLauncher subclass for LSF."""
1007
1017
1008 submit_command = List(['bsub'], config=True,
1018 submit_command = List(['bsub'], config=True,
1009 help="The PBS submit command ['bsub']")
1019 help="The PBS submit command ['bsub']")
1010 delete_command = List(['bkill'], config=True,
1020 delete_command = List(['bkill'], config=True,
1011 help="The PBS delete command ['bkill']")
1021 help="The PBS delete command ['bkill']")
1012 job_id_regexp = Unicode(r'\d+', config=True,
1022 job_id_regexp = Unicode(r'\d+', config=True,
1013 help="Regular expresion for identifying the job ID [r'\d+']")
1023 help="Regular expresion for identifying the job ID [r'\d+']")
1014
1024
1015 batch_file = Unicode(u'')
1025 batch_file = Unicode(u'')
1016 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1026 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1017 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1027 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1018 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1028 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1019 queue_template = Unicode('#BSUB -q {queue}')
1029 queue_template = Unicode('#BSUB -q {queue}')
1020
1030
1021 def start(self, n, profile_dir):
1031 def start(self, n, profile_dir):
1022 """Start n copies of the process using LSF batch system.
1032 """Start n copies of the process using LSF batch system.
1023 This cant inherit from the base class because bsub expects
1033 This cant inherit from the base class because bsub expects
1024 to be piped a shell script in order to honor the #BSUB directives :
1034 to be piped a shell script in order to honor the #BSUB directives :
1025 bsub < script
1035 bsub < script
1026 """
1036 """
1027 # Here we save profile_dir in the context so they
1037 # Here we save profile_dir in the context so they
1028 # can be used in the batch script template as {profile_dir}
1038 # can be used in the batch script template as {profile_dir}
1029 self.context['profile_dir'] = profile_dir
1039 self.context['profile_dir'] = profile_dir
1030 self.profile_dir = unicode(profile_dir)
1040 self.profile_dir = unicode(profile_dir)
1031 self.write_batch_script(n)
1041 self.write_batch_script(n)
1032 #output = check_output(self.args, env=os.environ)
1042 #output = check_output(self.args, env=os.environ)
1033 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1043 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1034 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1044 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1035 output,err = p.communicate()
1045 output,err = p.communicate()
1036 job_id = self.parse_job_id(output)
1046 job_id = self.parse_job_id(output)
1037 self.notify_start(job_id)
1047 self.notify_start(job_id)
1038 return job_id
1048 return job_id
1039
1049
1040
1050
1041 class LSFControllerLauncher(LSFLauncher):
1051 class LSFControllerLauncher(LSFLauncher):
1042 """Launch a controller using LSF."""
1052 """Launch a controller using LSF."""
1043
1053
1044 batch_file_name = Unicode(u'lsf_controller', config=True,
1054 batch_file_name = Unicode(u'lsf_controller', config=True,
1045 help="batch file name for the controller job.")
1055 help="batch file name for the controller job.")
1046 default_template= Unicode("""#!/bin/sh
1056 default_template= Unicode("""#!/bin/sh
1047 #BSUB -J ipcontroller
1057 #BSUB -J ipcontroller
1048 #BSUB -oo ipcontroller.o.%%J
1058 #BSUB -oo ipcontroller.o.%%J
1049 #BSUB -eo ipcontroller.e.%%J
1059 #BSUB -eo ipcontroller.e.%%J
1050 %s --log-to-file --profile-dir={profile_dir}
1060 %s --log-to-file --profile-dir={profile_dir}
1051 """%(' '.join(ipcontroller_cmd_argv)))
1061 """%(' '.join(ipcontroller_cmd_argv)))
1052
1062
1053 def start(self, profile_dir):
1063 def start(self, profile_dir):
1054 """Start the controller by profile or profile_dir."""
1064 """Start the controller by profile or profile_dir."""
1055 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1065 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1056 return super(LSFControllerLauncher, self).start(1, profile_dir)
1066 return super(LSFControllerLauncher, self).start(1, profile_dir)
1057
1067
1058
1068
1059 class LSFEngineSetLauncher(LSFLauncher):
1069 class LSFEngineSetLauncher(LSFLauncher):
1060 """Launch Engines using LSF"""
1070 """Launch Engines using LSF"""
1061 batch_file_name = Unicode(u'lsf_engines', config=True,
1071 batch_file_name = Unicode(u'lsf_engines', config=True,
1062 help="batch file name for the engine(s) job.")
1072 help="batch file name for the engine(s) job.")
1063 default_template= Unicode(u"""#!/bin/sh
1073 default_template= Unicode(u"""#!/bin/sh
1064 #BSUB -oo ipengine.o.%%J
1074 #BSUB -oo ipengine.o.%%J
1065 #BSUB -eo ipengine.e.%%J
1075 #BSUB -eo ipengine.e.%%J
1066 %s --profile-dir={profile_dir}
1076 %s --profile-dir={profile_dir}
1067 """%(' '.join(ipengine_cmd_argv)))
1077 """%(' '.join(ipengine_cmd_argv)))
1068
1078
1069 def start(self, n, profile_dir):
1079 def start(self, n, profile_dir):
1070 """Start n engines by profile or profile_dir."""
1080 """Start n engines by profile or profile_dir."""
1071 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1081 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1072 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1082 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1073
1083
1074
1084
1075 #-----------------------------------------------------------------------------
1085 #-----------------------------------------------------------------------------
1076 # A launcher for ipcluster itself!
1086 # A launcher for ipcluster itself!
1077 #-----------------------------------------------------------------------------
1087 #-----------------------------------------------------------------------------
1078
1088
1079
1089
1080 class IPClusterLauncher(LocalProcessLauncher):
1090 class IPClusterLauncher(LocalProcessLauncher):
1081 """Launch the ipcluster program in an external process."""
1091 """Launch the ipcluster program in an external process."""
1082
1092
1083 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1093 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1084 help="Popen command for ipcluster")
1094 help="Popen command for ipcluster")
1085 ipcluster_args = List(
1095 ipcluster_args = List(
1086 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1096 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1087 help="Command line arguments to pass to ipcluster.")
1097 help="Command line arguments to pass to ipcluster.")
1088 ipcluster_subcommand = Unicode('start')
1098 ipcluster_subcommand = Unicode('start')
1089 ipcluster_n = Int(2)
1099 ipcluster_n = Int(2)
1090
1100
1091 def find_args(self):
1101 def find_args(self):
1092 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1102 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1093 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1103 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1094
1104
1095 def start(self):
1105 def start(self):
1096 self.log.info("Starting ipcluster: %r" % self.args)
1106 self.log.info("Starting ipcluster: %r" % self.args)
1097 return super(IPClusterLauncher, self).start()
1107 return super(IPClusterLauncher, self).start()
1098
1108
1099 #-----------------------------------------------------------------------------
1109 #-----------------------------------------------------------------------------
1100 # Collections of launchers
1110 # Collections of launchers
1101 #-----------------------------------------------------------------------------
1111 #-----------------------------------------------------------------------------
1102
1112
1103 local_launchers = [
1113 local_launchers = [
1104 LocalControllerLauncher,
1114 LocalControllerLauncher,
1105 LocalEngineLauncher,
1115 LocalEngineLauncher,
1106 LocalEngineSetLauncher,
1116 LocalEngineSetLauncher,
1107 ]
1117 ]
1108 mpi_launchers = [
1118 mpi_launchers = [
1109 MPIExecLauncher,
1119 MPIExecLauncher,
1110 MPIExecControllerLauncher,
1120 MPIExecControllerLauncher,
1111 MPIExecEngineSetLauncher,
1121 MPIExecEngineSetLauncher,
1112 ]
1122 ]
1113 ssh_launchers = [
1123 ssh_launchers = [
1114 SSHLauncher,
1124 SSHLauncher,
1115 SSHControllerLauncher,
1125 SSHControllerLauncher,
1116 SSHEngineLauncher,
1126 SSHEngineLauncher,
1117 SSHEngineSetLauncher,
1127 SSHEngineSetLauncher,
1118 ]
1128 ]
1119 winhpc_launchers = [
1129 winhpc_launchers = [
1120 WindowsHPCLauncher,
1130 WindowsHPCLauncher,
1121 WindowsHPCControllerLauncher,
1131 WindowsHPCControllerLauncher,
1122 WindowsHPCEngineSetLauncher,
1132 WindowsHPCEngineSetLauncher,
1123 ]
1133 ]
1124 pbs_launchers = [
1134 pbs_launchers = [
1125 PBSLauncher,
1135 PBSLauncher,
1126 PBSControllerLauncher,
1136 PBSControllerLauncher,
1127 PBSEngineSetLauncher,
1137 PBSEngineSetLauncher,
1128 ]
1138 ]
1129 sge_launchers = [
1139 sge_launchers = [
1130 SGELauncher,
1140 SGELauncher,
1131 SGEControllerLauncher,
1141 SGEControllerLauncher,
1132 SGEEngineSetLauncher,
1142 SGEEngineSetLauncher,
1133 ]
1143 ]
1134 lsf_launchers = [
1144 lsf_launchers = [
1135 LSFLauncher,
1145 LSFLauncher,
1136 LSFControllerLauncher,
1146 LSFControllerLauncher,
1137 LSFEngineSetLauncher,
1147 LSFEngineSetLauncher,
1138 ]
1148 ]
1139 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1149 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1140 + pbs_launchers + sge_launchers + lsf_launchers
1150 + pbs_launchers + sge_launchers + lsf_launchers
1141
1151
General Comments 0
You need to be logged in to leave comments. Login now