##// END OF EJS Templates
Added import time to IPython/parallel/apps/launcher.py...
Ben Edwards -
Show More
@@ -1,1151 +1,1152
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27
28
28 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
29
30
30 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
31 try:
32 try:
32 from signal import SIGKILL
33 from signal import SIGKILL
33 except ImportError:
34 except ImportError:
34 # Windows
35 # Windows
35 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
36
37
37 try:
38 try:
38 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
39 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
40 except ImportError:
41 except ImportError:
41 pass
42 pass
42
43
43 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
44 try:
45 try:
45 from subprocess import check_output
46 from subprocess import check_output
46 except ImportError:
47 except ImportError:
47 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
48 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
49 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
50 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
51 out,err = p.communicate()
52 out,err = p.communicate()
52 return out
53 return out
53
54
54 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
55
56
56 from IPython.config.application import Application
57 from IPython.config.application import Application
57 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62
63
63 from .win32support import forward_read_events
64 from .win32support import forward_read_events
64
65
65 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66
67
67 WINDOWS = os.name == 'nt'
68 WINDOWS = os.name == 'nt'
68
69
69 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
70 # Paths to the kernel apps
71 # Paths to the kernel apps
71 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
72
73
73
74
74 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipclusterapp'
76 'IPython.parallel.apps.ipclusterapp'
76 ))
77 ))
77
78
78 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 'IPython.parallel.apps.ipengineapp'
80 'IPython.parallel.apps.ipengineapp'
80 ))
81 ))
81
82
82 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 'IPython.parallel.apps.ipcontrollerapp'
84 'IPython.parallel.apps.ipcontrollerapp'
84 ))
85 ))
85
86
86 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
87 # Base launchers and errors
88 # Base launchers and errors
88 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
89
90
90
91
91 class LauncherError(Exception):
92 class LauncherError(Exception):
92 pass
93 pass
93
94
94
95
95 class ProcessStateError(LauncherError):
96 class ProcessStateError(LauncherError):
96 pass
97 pass
97
98
98
99
99 class UnknownStatus(LauncherError):
100 class UnknownStatus(LauncherError):
100 pass
101 pass
101
102
102
103
103 class BaseLauncher(LoggingConfigurable):
104 class BaseLauncher(LoggingConfigurable):
104 """An asbtraction for starting, stopping and signaling a process."""
105 """An asbtraction for starting, stopping and signaling a process."""
105
106
106 # In all of the launchers, the work_dir is where child processes will be
107 # In all of the launchers, the work_dir is where child processes will be
107 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # passed into the __init__ method will override the config value.
109 # passed into the __init__ method will override the config value.
109 # This should not be used to set the work_dir for the actual engine
110 # This should not be used to set the work_dir for the actual engine
110 # and controller. Instead, use their own config files or the
111 # and controller. Instead, use their own config files or the
111 # controller_args, engine_args attributes of the launchers to add
112 # controller_args, engine_args attributes of the launchers to add
112 # the work_dir option.
113 # the work_dir option.
113 work_dir = Unicode(u'.')
114 work_dir = Unicode(u'.')
114 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115
116
116 start_data = Any()
117 start_data = Any()
117 stop_data = Any()
118 stop_data = Any()
118
119
119 def _loop_default(self):
120 def _loop_default(self):
120 return ioloop.IOLoop.instance()
121 return ioloop.IOLoop.instance()
121
122
122 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 self.state = 'before' # can be before, running, after
125 self.state = 'before' # can be before, running, after
125 self.stop_callbacks = []
126 self.stop_callbacks = []
126 self.start_data = None
127 self.start_data = None
127 self.stop_data = None
128 self.stop_data = None
128
129
129 @property
130 @property
130 def args(self):
131 def args(self):
131 """A list of cmd and args that will be used to start the process.
132 """A list of cmd and args that will be used to start the process.
132
133
133 This is what is passed to :func:`spawnProcess` and the first element
134 This is what is passed to :func:`spawnProcess` and the first element
134 will be the process name.
135 will be the process name.
135 """
136 """
136 return self.find_args()
137 return self.find_args()
137
138
138 def find_args(self):
139 def find_args(self):
139 """The ``.args`` property calls this to find the args list.
140 """The ``.args`` property calls this to find the args list.
140
141
141 Subcommand should implement this to construct the cmd and args.
142 Subcommand should implement this to construct the cmd and args.
142 """
143 """
143 raise NotImplementedError('find_args must be implemented in a subclass')
144 raise NotImplementedError('find_args must be implemented in a subclass')
144
145
145 @property
146 @property
146 def arg_str(self):
147 def arg_str(self):
147 """The string form of the program arguments."""
148 """The string form of the program arguments."""
148 return ' '.join(self.args)
149 return ' '.join(self.args)
149
150
150 @property
151 @property
151 def running(self):
152 def running(self):
152 """Am I running."""
153 """Am I running."""
153 if self.state == 'running':
154 if self.state == 'running':
154 return True
155 return True
155 else:
156 else:
156 return False
157 return False
157
158
158 def start(self):
159 def start(self):
159 """Start the process."""
160 """Start the process."""
160 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
161
162
162 def stop(self):
163 def stop(self):
163 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
164
165
165 This method will return None immediately.
166 This method will return None immediately.
166 To observe the actual process stopping, see :meth:`on_stop`.
167 To observe the actual process stopping, see :meth:`on_stop`.
167 """
168 """
168 raise NotImplementedError('stop must be implemented in a subclass')
169 raise NotImplementedError('stop must be implemented in a subclass')
169
170
170 def on_stop(self, f):
171 def on_stop(self, f):
171 """Register a callback to be called with this Launcher's stop_data
172 """Register a callback to be called with this Launcher's stop_data
172 when the process actually finishes.
173 when the process actually finishes.
173 """
174 """
174 if self.state=='after':
175 if self.state=='after':
175 return f(self.stop_data)
176 return f(self.stop_data)
176 else:
177 else:
177 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
178
179
179 def notify_start(self, data):
180 def notify_start(self, data):
180 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
181
182
182 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
183 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
184 """
185 """
185
186
186 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.start_data = data
188 self.start_data = data
188 self.state = 'running'
189 self.state = 'running'
189 return data
190 return data
190
191
191 def notify_stop(self, data):
192 def notify_stop(self, data):
192 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
193
194
194 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
195 this to trigger callbacks registered via :meth:`on_stop`."""
196 this to trigger callbacks registered via :meth:`on_stop`."""
196
197
197 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.stop_data = data
199 self.stop_data = data
199 self.state = 'after'
200 self.state = 'after'
200 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
201 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
202 d(data)
203 d(data)
203 return data
204 return data
204
205
205 def signal(self, sig):
206 def signal(self, sig):
206 """Signal the process.
207 """Signal the process.
207
208
208 Parameters
209 Parameters
209 ----------
210 ----------
210 sig : str or int
211 sig : str or int
211 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
212 """
213 """
213 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
214
215
215
216
216 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
217 # Local process launchers
218 # Local process launchers
218 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
219
220
220
221
221 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
222 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
223
224
224 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
225 ``self.work_dir``.
226 ``self.work_dir``.
226 """
227 """
227
228
228 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
229 # spawnProcess.
230 # spawnProcess.
230 cmd_and_args = List([])
231 cmd_and_args = List([])
231 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
232
233
233 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
235 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
236 )
237 )
237 self.process = None
238 self.process = None
238 self.poller = None
239 self.poller = None
239
240
240 def find_args(self):
241 def find_args(self):
241 return self.cmd_and_args
242 return self.cmd_and_args
242
243
243 def start(self):
244 def start(self):
244 if self.state == 'before':
245 if self.state == 'before':
245 self.process = Popen(self.args,
246 self.process = Popen(self.args,
246 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 env=os.environ,
248 env=os.environ,
248 cwd=self.work_dir
249 cwd=self.work_dir
249 )
250 )
250 if WINDOWS:
251 if WINDOWS:
251 self.stdout = forward_read_events(self.process.stdout)
252 self.stdout = forward_read_events(self.process.stdout)
252 self.stderr = forward_read_events(self.process.stderr)
253 self.stderr = forward_read_events(self.process.stderr)
253 else:
254 else:
254 self.stdout = self.process.stdout.fileno()
255 self.stdout = self.process.stdout.fileno()
255 self.stderr = self.process.stderr.fileno()
256 self.stderr = self.process.stderr.fileno()
256 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 self.poller.start()
260 self.poller.start()
260 self.notify_start(self.process.pid)
261 self.notify_start(self.process.pid)
261 else:
262 else:
262 s = 'The process was already started and has state: %r' % self.state
263 s = 'The process was already started and has state: %r' % self.state
263 raise ProcessStateError(s)
264 raise ProcessStateError(s)
264
265
265 def stop(self):
266 def stop(self):
266 return self.interrupt_then_kill()
267 return self.interrupt_then_kill()
267
268
268 def signal(self, sig):
269 def signal(self, sig):
269 if self.state == 'running':
270 if self.state == 'running':
270 if WINDOWS and sig != SIGINT:
271 if WINDOWS and sig != SIGINT:
271 # use Windows tree-kill for better child cleanup
272 # use Windows tree-kill for better child cleanup
272 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 else:
274 else:
274 self.process.send_signal(sig)
275 self.process.send_signal(sig)
275
276
276 def interrupt_then_kill(self, delay=2.0):
277 def interrupt_then_kill(self, delay=2.0):
277 """Send INT, wait a delay and then send KILL."""
278 """Send INT, wait a delay and then send KILL."""
278 try:
279 try:
279 self.signal(SIGINT)
280 self.signal(SIGINT)
280 except Exception:
281 except Exception:
281 self.log.debug("interrupt failed")
282 self.log.debug("interrupt failed")
282 pass
283 pass
283 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 self.killer.start()
285 self.killer.start()
285
286
286 # callbacks, etc:
287 # callbacks, etc:
287
288
288 def handle_stdout(self, fd, events):
289 def handle_stdout(self, fd, events):
289 if WINDOWS:
290 if WINDOWS:
290 line = self.stdout.recv()
291 line = self.stdout.recv()
291 else:
292 else:
292 line = self.process.stdout.readline()
293 line = self.process.stdout.readline()
293 # a stopped process will be readable but return empty strings
294 # a stopped process will be readable but return empty strings
294 if line:
295 if line:
295 self.log.info(line[:-1])
296 self.log.info(line[:-1])
296 else:
297 else:
297 self.poll()
298 self.poll()
298
299
299 def handle_stderr(self, fd, events):
300 def handle_stderr(self, fd, events):
300 if WINDOWS:
301 if WINDOWS:
301 line = self.stderr.recv()
302 line = self.stderr.recv()
302 else:
303 else:
303 line = self.process.stderr.readline()
304 line = self.process.stderr.readline()
304 # a stopped process will be readable but return empty strings
305 # a stopped process will be readable but return empty strings
305 if line:
306 if line:
306 self.log.error(line[:-1])
307 self.log.error(line[:-1])
307 else:
308 else:
308 self.poll()
309 self.poll()
309
310
310 def poll(self):
311 def poll(self):
311 status = self.process.poll()
312 status = self.process.poll()
312 if status is not None:
313 if status is not None:
313 self.poller.stop()
314 self.poller.stop()
314 self.loop.remove_handler(self.stdout)
315 self.loop.remove_handler(self.stdout)
315 self.loop.remove_handler(self.stderr)
316 self.loop.remove_handler(self.stderr)
316 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 return status
318 return status
318
319
319 class LocalControllerLauncher(LocalProcessLauncher):
320 class LocalControllerLauncher(LocalProcessLauncher):
320 """Launch a controller as a regular external process."""
321 """Launch a controller as a regular external process."""
321
322
322 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 help="""Popen command to launch ipcontroller.""")
324 help="""Popen command to launch ipcontroller.""")
324 # Command line arguments to ipcontroller.
325 # Command line arguments to ipcontroller.
325 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
326 help="""command-line args to pass to ipcontroller""")
327 help="""command-line args to pass to ipcontroller""")
327
328
328 def find_args(self):
329 def find_args(self):
329 return self.controller_cmd + self.controller_args
330 return self.controller_cmd + self.controller_args
330
331
331 def start(self, profile_dir):
332 def start(self, profile_dir):
332 """Start the controller by profile_dir."""
333 """Start the controller by profile_dir."""
333 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
334 self.profile_dir = unicode(profile_dir)
335 self.profile_dir = unicode(profile_dir)
335 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 return super(LocalControllerLauncher, self).start()
337 return super(LocalControllerLauncher, self).start()
337
338
338
339
339 class LocalEngineLauncher(LocalProcessLauncher):
340 class LocalEngineLauncher(LocalProcessLauncher):
340 """Launch a single engine as a regular externall process."""
341 """Launch a single engine as a regular externall process."""
341
342
342 engine_cmd = List(ipengine_cmd_argv, config=True,
343 engine_cmd = List(ipengine_cmd_argv, config=True,
343 help="""command to launch the Engine.""")
344 help="""command to launch the Engine.""")
344 # Command line arguments for ipengine.
345 # Command line arguments for ipengine.
345 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
346 help="command-line arguments to pass to ipengine"
347 help="command-line arguments to pass to ipengine"
347 )
348 )
348
349
349 def find_args(self):
350 def find_args(self):
350 return self.engine_cmd + self.engine_args
351 return self.engine_cmd + self.engine_args
351
352
352 def start(self, profile_dir):
353 def start(self, profile_dir):
353 """Start the engine by profile_dir."""
354 """Start the engine by profile_dir."""
354 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
355 self.profile_dir = unicode(profile_dir)
356 self.profile_dir = unicode(profile_dir)
356 return super(LocalEngineLauncher, self).start()
357 return super(LocalEngineLauncher, self).start()
357
358
358
359
359 class LocalEngineSetLauncher(BaseLauncher):
360 class LocalEngineSetLauncher(BaseLauncher):
360 """Launch a set of engines as regular external processes."""
361 """Launch a set of engines as regular external processes."""
361
362
362 # Command line arguments for ipengine.
363 # Command line arguments for ipengine.
363 engine_args = List(
364 engine_args = List(
364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 help="command-line arguments to pass to ipengine"
366 help="command-line arguments to pass to ipengine"
366 )
367 )
367 delay = CFloat(0.1, config=True,
368 delay = CFloat(0.1, config=True,
368 help="""delay (in seconds) between starting each engine after the first.
369 help="""delay (in seconds) between starting each engine after the first.
369 This can help force the engines to get their ids in order, or limit
370 This can help force the engines to get their ids in order, or limit
370 process flood when starting many engines."""
371 process flood when starting many engines."""
371 )
372 )
372
373
373 # launcher class
374 # launcher class
374 launcher_class = LocalEngineLauncher
375 launcher_class = LocalEngineLauncher
375
376
376 launchers = Dict()
377 launchers = Dict()
377 stop_data = Dict()
378 stop_data = Dict()
378
379
379 def __init__(self, work_dir=u'.', config=None, **kwargs):
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
380 super(LocalEngineSetLauncher, self).__init__(
381 super(LocalEngineSetLauncher, self).__init__(
381 work_dir=work_dir, config=config, **kwargs
382 work_dir=work_dir, config=config, **kwargs
382 )
383 )
383 self.stop_data = {}
384 self.stop_data = {}
384
385
385 def start(self, n, profile_dir):
386 def start(self, n, profile_dir):
386 """Start n engines by profile or profile_dir."""
387 """Start n engines by profile or profile_dir."""
387 self.profile_dir = unicode(profile_dir)
388 self.profile_dir = unicode(profile_dir)
388 dlist = []
389 dlist = []
389 for i in range(n):
390 for i in range(n):
390 if i > 0:
391 if i > 0:
391 time.sleep(self.delay)
392 time.sleep(self.delay)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
393 # Copy the engine args over to each engine launcher.
394 # Copy the engine args over to each engine launcher.
394 el.engine_args = copy.deepcopy(self.engine_args)
395 el.engine_args = copy.deepcopy(self.engine_args)
395 el.on_stop(self._notice_engine_stopped)
396 el.on_stop(self._notice_engine_stopped)
396 d = el.start(profile_dir)
397 d = el.start(profile_dir)
397 if i==0:
398 if i==0:
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.launchers[i] = el
400 self.launchers[i] = el
400 dlist.append(d)
401 dlist.append(d)
401 self.notify_start(dlist)
402 self.notify_start(dlist)
402 # The consumeErrors here could be dangerous
403 # The consumeErrors here could be dangerous
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal.addCallback(self.notify_start)
405 # dfinal.addCallback(self.notify_start)
405 return dlist
406 return dlist
406
407
407 def find_args(self):
408 def find_args(self):
408 return ['engine set']
409 return ['engine set']
409
410
410 def signal(self, sig):
411 def signal(self, sig):
411 dlist = []
412 dlist = []
412 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
413 d = el.signal(sig)
414 d = el.signal(sig)
414 dlist.append(d)
415 dlist.append(d)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 return dlist
417 return dlist
417
418
418 def interrupt_then_kill(self, delay=1.0):
419 def interrupt_then_kill(self, delay=1.0):
419 dlist = []
420 dlist = []
420 for el in self.launchers.itervalues():
421 for el in self.launchers.itervalues():
421 d = el.interrupt_then_kill(delay)
422 d = el.interrupt_then_kill(delay)
422 dlist.append(d)
423 dlist.append(d)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 return dlist
425 return dlist
425
426
426 def stop(self):
427 def stop(self):
427 return self.interrupt_then_kill()
428 return self.interrupt_then_kill()
428
429
429 def _notice_engine_stopped(self, data):
430 def _notice_engine_stopped(self, data):
430 pid = data['pid']
431 pid = data['pid']
431 for idx,el in self.launchers.iteritems():
432 for idx,el in self.launchers.iteritems():
432 if el.process.pid == pid:
433 if el.process.pid == pid:
433 break
434 break
434 self.launchers.pop(idx)
435 self.launchers.pop(idx)
435 self.stop_data[idx] = data
436 self.stop_data[idx] = data
436 if not self.launchers:
437 if not self.launchers:
437 self.notify_stop(self.stop_data)
438 self.notify_stop(self.stop_data)
438
439
439
440
440 #-----------------------------------------------------------------------------
441 #-----------------------------------------------------------------------------
441 # MPIExec launchers
442 # MPIExec launchers
442 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
443
444
444
445
445 class MPIExecLauncher(LocalProcessLauncher):
446 class MPIExecLauncher(LocalProcessLauncher):
446 """Launch an external process using mpiexec."""
447 """Launch an external process using mpiexec."""
447
448
448 mpi_cmd = List(['mpiexec'], config=True,
449 mpi_cmd = List(['mpiexec'], config=True,
449 help="The mpiexec command to use in starting the process."
450 help="The mpiexec command to use in starting the process."
450 )
451 )
451 mpi_args = List([], config=True,
452 mpi_args = List([], config=True,
452 help="The command line arguments to pass to mpiexec."
453 help="The command line arguments to pass to mpiexec."
453 )
454 )
454 program = List(['date'], config=True,
455 program = List(['date'], config=True,
455 help="The program to start via mpiexec.")
456 help="The program to start via mpiexec.")
456 program_args = List([], config=True,
457 program_args = List([], config=True,
457 help="The command line argument to the program."
458 help="The command line argument to the program."
458 )
459 )
459 n = Int(1)
460 n = Int(1)
460
461
461 def find_args(self):
462 def find_args(self):
462 """Build self.args using all the fields."""
463 """Build self.args using all the fields."""
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 self.program + self.program_args
465 self.program + self.program_args
465
466
466 def start(self, n):
467 def start(self, n):
467 """Start n instances of the program using mpiexec."""
468 """Start n instances of the program using mpiexec."""
468 self.n = n
469 self.n = n
469 return super(MPIExecLauncher, self).start()
470 return super(MPIExecLauncher, self).start()
470
471
471
472
472 class MPIExecControllerLauncher(MPIExecLauncher):
473 class MPIExecControllerLauncher(MPIExecLauncher):
473 """Launch a controller using mpiexec."""
474 """Launch a controller using mpiexec."""
474
475
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 help="Popen command to launch the Contropper"
477 help="Popen command to launch the Contropper"
477 )
478 )
478 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
479 help="Command line arguments to pass to ipcontroller."
480 help="Command line arguments to pass to ipcontroller."
480 )
481 )
481 n = Int(1)
482 n = Int(1)
482
483
483 def start(self, profile_dir):
484 def start(self, profile_dir):
484 """Start the controller by profile_dir."""
485 """Start the controller by profile_dir."""
485 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
486 self.profile_dir = unicode(profile_dir)
487 self.profile_dir = unicode(profile_dir)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 return super(MPIExecControllerLauncher, self).start(1)
489 return super(MPIExecControllerLauncher, self).start(1)
489
490
490 def find_args(self):
491 def find_args(self):
491 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
492 self.controller_cmd + self.controller_args
493 self.controller_cmd + self.controller_args
493
494
494
495
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496
497
497 program = List(ipengine_cmd_argv, config=True,
498 program = List(ipengine_cmd_argv, config=True,
498 help="Popen command for ipengine"
499 help="Popen command for ipengine"
499 )
500 )
500 program_args = List(
501 program_args = List(
501 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
502 help="Command line arguments for ipengine."
503 help="Command line arguments for ipengine."
503 )
504 )
504 n = Int(1)
505 n = Int(1)
505
506
506 def start(self, n, profile_dir):
507 def start(self, n, profile_dir):
507 """Start n engines by profile or profile_dir."""
508 """Start n engines by profile or profile_dir."""
508 self.program_args.extend(['--profile-dir=%s'%profile_dir])
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
509 self.profile_dir = unicode(profile_dir)
510 self.profile_dir = unicode(profile_dir)
510 self.n = n
511 self.n = n
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 return super(MPIExecEngineSetLauncher, self).start(n)
513 return super(MPIExecEngineSetLauncher, self).start(n)
513
514
514 #-----------------------------------------------------------------------------
515 #-----------------------------------------------------------------------------
515 # SSH launchers
516 # SSH launchers
516 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
517
518
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
519
520
520 class SSHLauncher(LocalProcessLauncher):
521 class SSHLauncher(LocalProcessLauncher):
521 """A minimal launcher for ssh.
522 """A minimal launcher for ssh.
522
523
523 To be useful this will probably have to be extended to use the ``sshx``
524 To be useful this will probably have to be extended to use the ``sshx``
524 idea for environment variables. There could be other things this needs
525 idea for environment variables. There could be other things this needs
525 as well.
526 as well.
526 """
527 """
527
528
528 ssh_cmd = List(['ssh'], config=True,
529 ssh_cmd = List(['ssh'], config=True,
529 help="command for starting ssh")
530 help="command for starting ssh")
530 ssh_args = List(['-tt'], config=True,
531 ssh_args = List(['-tt'], config=True,
531 help="args to pass to ssh")
532 help="args to pass to ssh")
532 program = List(['date'], config=True,
533 program = List(['date'], config=True,
533 help="Program to launch via ssh")
534 help="Program to launch via ssh")
534 program_args = List([], config=True,
535 program_args = List([], config=True,
535 help="args to pass to remote program")
536 help="args to pass to remote program")
536 hostname = Unicode('', config=True,
537 hostname = Unicode('', config=True,
537 help="hostname on which to launch the program")
538 help="hostname on which to launch the program")
538 user = Unicode('', config=True,
539 user = Unicode('', config=True,
539 help="username for ssh")
540 help="username for ssh")
540 location = Unicode('', config=True,
541 location = Unicode('', config=True,
541 help="user@hostname location for ssh in one setting")
542 help="user@hostname location for ssh in one setting")
542
543
543 def _hostname_changed(self, name, old, new):
544 def _hostname_changed(self, name, old, new):
544 if self.user:
545 if self.user:
545 self.location = u'%s@%s' % (self.user, new)
546 self.location = u'%s@%s' % (self.user, new)
546 else:
547 else:
547 self.location = new
548 self.location = new
548
549
549 def _user_changed(self, name, old, new):
550 def _user_changed(self, name, old, new):
550 self.location = u'%s@%s' % (new, self.hostname)
551 self.location = u'%s@%s' % (new, self.hostname)
551
552
552 def find_args(self):
553 def find_args(self):
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 self.program + self.program_args
555 self.program + self.program_args
555
556
556 def start(self, profile_dir, hostname=None, user=None):
557 def start(self, profile_dir, hostname=None, user=None):
557 self.profile_dir = unicode(profile_dir)
558 self.profile_dir = unicode(profile_dir)
558 if hostname is not None:
559 if hostname is not None:
559 self.hostname = hostname
560 self.hostname = hostname
560 if user is not None:
561 if user is not None:
561 self.user = user
562 self.user = user
562
563
563 return super(SSHLauncher, self).start()
564 return super(SSHLauncher, self).start()
564
565
565 def signal(self, sig):
566 def signal(self, sig):
566 if self.state == 'running':
567 if self.state == 'running':
567 # send escaped ssh connection-closer
568 # send escaped ssh connection-closer
568 self.process.stdin.write('~.')
569 self.process.stdin.write('~.')
569 self.process.stdin.flush()
570 self.process.stdin.flush()
570
571
571
572
572
573
573 class SSHControllerLauncher(SSHLauncher):
574 class SSHControllerLauncher(SSHLauncher):
574
575
575 program = List(ipcontroller_cmd_argv, config=True,
576 program = List(ipcontroller_cmd_argv, config=True,
576 help="remote ipcontroller command.")
577 help="remote ipcontroller command.")
577 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
578 help="Command line arguments to ipcontroller.")
579 help="Command line arguments to ipcontroller.")
579
580
580
581
581 class SSHEngineLauncher(SSHLauncher):
582 class SSHEngineLauncher(SSHLauncher):
582 program = List(ipengine_cmd_argv, config=True,
583 program = List(ipengine_cmd_argv, config=True,
583 help="remote ipengine command.")
584 help="remote ipengine command.")
584 # Command line arguments for ipengine.
585 # Command line arguments for ipengine.
585 program_args = List(
586 program_args = List(
586 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
587 help="Command line arguments to ipengine."
588 help="Command line arguments to ipengine."
588 )
589 )
589
590
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 launcher_class = SSHEngineLauncher
592 launcher_class = SSHEngineLauncher
592 engines = Dict(config=True,
593 engines = Dict(config=True,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 corresponding to the number of engines to start on that host.""")
595 corresponding to the number of engines to start on that host.""")
595
596
596 def start(self, n, profile_dir):
597 def start(self, n, profile_dir):
597 """Start engines by profile or profile_dir.
598 """Start engines by profile or profile_dir.
598 `n` is ignored, and the `engines` config property is used instead.
599 `n` is ignored, and the `engines` config property is used instead.
599 """
600 """
600
601
601 self.profile_dir = unicode(profile_dir)
602 self.profile_dir = unicode(profile_dir)
602 dlist = []
603 dlist = []
603 for host, n in self.engines.iteritems():
604 for host, n in self.engines.iteritems():
604 if isinstance(n, (tuple, list)):
605 if isinstance(n, (tuple, list)):
605 n, args = n
606 n, args = n
606 else:
607 else:
607 args = copy.deepcopy(self.engine_args)
608 args = copy.deepcopy(self.engine_args)
608
609
609 if '@' in host:
610 if '@' in host:
610 user,host = host.split('@',1)
611 user,host = host.split('@',1)
611 else:
612 else:
612 user=None
613 user=None
613 for i in range(n):
614 for i in range(n):
614 if i > 0:
615 if i > 0:
615 time.sleep(self.delay)
616 time.sleep(self.delay)
616 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
617
618
618 # Copy the engine args over to each engine launcher.
619 # Copy the engine args over to each engine launcher.
619 i
620 i
620 el.program_args = args
621 el.program_args = args
621 el.on_stop(self._notice_engine_stopped)
622 el.on_stop(self._notice_engine_stopped)
622 d = el.start(profile_dir, user=user, hostname=host)
623 d = el.start(profile_dir, user=user, hostname=host)
623 if i==0:
624 if i==0:
624 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
625 self.launchers[host+str(i)] = el
626 self.launchers[host+str(i)] = el
626 dlist.append(d)
627 dlist.append(d)
627 self.notify_start(dlist)
628 self.notify_start(dlist)
628 return dlist
629 return dlist
629
630
630
631
631
632
632 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
633 # Windows HPC Server 2008 scheduler launchers
634 # Windows HPC Server 2008 scheduler launchers
634 #-----------------------------------------------------------------------------
635 #-----------------------------------------------------------------------------
635
636
636
637
637 # This is only used on Windows.
638 # This is only used on Windows.
638 def find_job_cmd():
639 def find_job_cmd():
639 if WINDOWS:
640 if WINDOWS:
640 try:
641 try:
641 return find_cmd('job')
642 return find_cmd('job')
642 except (FindCmdError, ImportError):
643 except (FindCmdError, ImportError):
643 # ImportError will be raised if win32api is not installed
644 # ImportError will be raised if win32api is not installed
644 return 'job'
645 return 'job'
645 else:
646 else:
646 return 'job'
647 return 'job'
647
648
648
649
649 class WindowsHPCLauncher(BaseLauncher):
650 class WindowsHPCLauncher(BaseLauncher):
650
651
651 job_id_regexp = Unicode(r'\d+', config=True,
652 job_id_regexp = Unicode(r'\d+', config=True,
652 help="""A regular expression used to get the job id from the output of the
653 help="""A regular expression used to get the job id from the output of the
653 submit_command. """
654 submit_command. """
654 )
655 )
655 job_file_name = Unicode(u'ipython_job.xml', config=True,
656 job_file_name = Unicode(u'ipython_job.xml', config=True,
656 help="The filename of the instantiated job script.")
657 help="The filename of the instantiated job script.")
657 # The full path to the instantiated job script. This gets made dynamically
658 # The full path to the instantiated job script. This gets made dynamically
658 # by combining the work_dir with the job_file_name.
659 # by combining the work_dir with the job_file_name.
659 job_file = Unicode(u'')
660 job_file = Unicode(u'')
660 scheduler = Unicode('', config=True,
661 scheduler = Unicode('', config=True,
661 help="The hostname of the scheduler to submit the job to.")
662 help="The hostname of the scheduler to submit the job to.")
662 job_cmd = Unicode(find_job_cmd(), config=True,
663 job_cmd = Unicode(find_job_cmd(), config=True,
663 help="The command for submitting jobs.")
664 help="The command for submitting jobs.")
664
665
665 def __init__(self, work_dir=u'.', config=None, **kwargs):
666 def __init__(self, work_dir=u'.', config=None, **kwargs):
666 super(WindowsHPCLauncher, self).__init__(
667 super(WindowsHPCLauncher, self).__init__(
667 work_dir=work_dir, config=config, **kwargs
668 work_dir=work_dir, config=config, **kwargs
668 )
669 )
669
670
670 @property
671 @property
671 def job_file(self):
672 def job_file(self):
672 return os.path.join(self.work_dir, self.job_file_name)
673 return os.path.join(self.work_dir, self.job_file_name)
673
674
674 def write_job_file(self, n):
675 def write_job_file(self, n):
675 raise NotImplementedError("Implement write_job_file in a subclass.")
676 raise NotImplementedError("Implement write_job_file in a subclass.")
676
677
677 def find_args(self):
678 def find_args(self):
678 return [u'job.exe']
679 return [u'job.exe']
679
680
680 def parse_job_id(self, output):
681 def parse_job_id(self, output):
681 """Take the output of the submit command and return the job id."""
682 """Take the output of the submit command and return the job id."""
682 m = re.search(self.job_id_regexp, output)
683 m = re.search(self.job_id_regexp, output)
683 if m is not None:
684 if m is not None:
684 job_id = m.group()
685 job_id = m.group()
685 else:
686 else:
686 raise LauncherError("Job id couldn't be determined: %s" % output)
687 raise LauncherError("Job id couldn't be determined: %s" % output)
687 self.job_id = job_id
688 self.job_id = job_id
688 self.log.info('Job started with job id: %r' % job_id)
689 self.log.info('Job started with job id: %r' % job_id)
689 return job_id
690 return job_id
690
691
691 def start(self, n):
692 def start(self, n):
692 """Start n copies of the process using the Win HPC job scheduler."""
693 """Start n copies of the process using the Win HPC job scheduler."""
693 self.write_job_file(n)
694 self.write_job_file(n)
694 args = [
695 args = [
695 'submit',
696 'submit',
696 '/jobfile:%s' % self.job_file,
697 '/jobfile:%s' % self.job_file,
697 '/scheduler:%s' % self.scheduler
698 '/scheduler:%s' % self.scheduler
698 ]
699 ]
699 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
700
701
701 output = check_output([self.job_cmd]+args,
702 output = check_output([self.job_cmd]+args,
702 env=os.environ,
703 env=os.environ,
703 cwd=self.work_dir,
704 cwd=self.work_dir,
704 stderr=STDOUT
705 stderr=STDOUT
705 )
706 )
706 job_id = self.parse_job_id(output)
707 job_id = self.parse_job_id(output)
707 self.notify_start(job_id)
708 self.notify_start(job_id)
708 return job_id
709 return job_id
709
710
710 def stop(self):
711 def stop(self):
711 args = [
712 args = [
712 'cancel',
713 'cancel',
713 self.job_id,
714 self.job_id,
714 '/scheduler:%s' % self.scheduler
715 '/scheduler:%s' % self.scheduler
715 ]
716 ]
716 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
717 try:
718 try:
718 output = check_output([self.job_cmd]+args,
719 output = check_output([self.job_cmd]+args,
719 env=os.environ,
720 env=os.environ,
720 cwd=self.work_dir,
721 cwd=self.work_dir,
721 stderr=STDOUT
722 stderr=STDOUT
722 )
723 )
723 except:
724 except:
724 output = 'The job already appears to be stoppped: %r' % self.job_id
725 output = 'The job already appears to be stoppped: %r' % self.job_id
725 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
726 return output
727 return output
727
728
728
729
729 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
730
731
731 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
732 help="WinHPC xml job file.")
733 help="WinHPC xml job file.")
733 extra_args = List([], config=False,
734 extra_args = List([], config=False,
734 help="extra args to pass to ipcontroller")
735 help="extra args to pass to ipcontroller")
735
736
736 def write_job_file(self, n):
737 def write_job_file(self, n):
737 job = IPControllerJob(config=self.config)
738 job = IPControllerJob(config=self.config)
738
739
739 t = IPControllerTask(config=self.config)
740 t = IPControllerTask(config=self.config)
740 # The tasks work directory is *not* the actual work directory of
741 # The tasks work directory is *not* the actual work directory of
741 # the controller. It is used as the base path for the stdout/stderr
742 # the controller. It is used as the base path for the stdout/stderr
742 # files that the scheduler redirects to.
743 # files that the scheduler redirects to.
743 t.work_directory = self.profile_dir
744 t.work_directory = self.profile_dir
744 # Add the profile_dir and from self.start().
745 # Add the profile_dir and from self.start().
745 t.controller_args.extend(self.extra_args)
746 t.controller_args.extend(self.extra_args)
746 job.add_task(t)
747 job.add_task(t)
747
748
748 self.log.info("Writing job description file: %s" % self.job_file)
749 self.log.info("Writing job description file: %s" % self.job_file)
749 job.write(self.job_file)
750 job.write(self.job_file)
750
751
751 @property
752 @property
752 def job_file(self):
753 def job_file(self):
753 return os.path.join(self.profile_dir, self.job_file_name)
754 return os.path.join(self.profile_dir, self.job_file_name)
754
755
755 def start(self, profile_dir):
756 def start(self, profile_dir):
756 """Start the controller by profile_dir."""
757 """Start the controller by profile_dir."""
757 self.extra_args = ['--profile-dir=%s'%profile_dir]
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
758 self.profile_dir = unicode(profile_dir)
759 self.profile_dir = unicode(profile_dir)
759 return super(WindowsHPCControllerLauncher, self).start(1)
760 return super(WindowsHPCControllerLauncher, self).start(1)
760
761
761
762
762 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
763
764
764 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
765 help="jobfile for ipengines job")
766 help="jobfile for ipengines job")
766 extra_args = List([], config=False,
767 extra_args = List([], config=False,
767 help="extra args to pas to ipengine")
768 help="extra args to pas to ipengine")
768
769
769 def write_job_file(self, n):
770 def write_job_file(self, n):
770 job = IPEngineSetJob(config=self.config)
771 job = IPEngineSetJob(config=self.config)
771
772
772 for i in range(n):
773 for i in range(n):
773 t = IPEngineTask(config=self.config)
774 t = IPEngineTask(config=self.config)
774 # The tasks work directory is *not* the actual work directory of
775 # The tasks work directory is *not* the actual work directory of
775 # the engine. It is used as the base path for the stdout/stderr
776 # the engine. It is used as the base path for the stdout/stderr
776 # files that the scheduler redirects to.
777 # files that the scheduler redirects to.
777 t.work_directory = self.profile_dir
778 t.work_directory = self.profile_dir
778 # Add the profile_dir and from self.start().
779 # Add the profile_dir and from self.start().
779 t.engine_args.extend(self.extra_args)
780 t.engine_args.extend(self.extra_args)
780 job.add_task(t)
781 job.add_task(t)
781
782
782 self.log.info("Writing job description file: %s" % self.job_file)
783 self.log.info("Writing job description file: %s" % self.job_file)
783 job.write(self.job_file)
784 job.write(self.job_file)
784
785
785 @property
786 @property
786 def job_file(self):
787 def job_file(self):
787 return os.path.join(self.profile_dir, self.job_file_name)
788 return os.path.join(self.profile_dir, self.job_file_name)
788
789
789 def start(self, n, profile_dir):
790 def start(self, n, profile_dir):
790 """Start the controller by profile_dir."""
791 """Start the controller by profile_dir."""
791 self.extra_args = ['--profile-dir=%s'%profile_dir]
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
792 self.profile_dir = unicode(profile_dir)
793 self.profile_dir = unicode(profile_dir)
793 return super(WindowsHPCEngineSetLauncher, self).start(n)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
794
795
795
796
796 #-----------------------------------------------------------------------------
797 #-----------------------------------------------------------------------------
797 # Batch (PBS) system launchers
798 # Batch (PBS) system launchers
798 #-----------------------------------------------------------------------------
799 #-----------------------------------------------------------------------------
799
800
800 class BatchSystemLauncher(BaseLauncher):
801 class BatchSystemLauncher(BaseLauncher):
801 """Launch an external process using a batch system.
802 """Launch an external process using a batch system.
802
803
803 This class is designed to work with UNIX batch systems like PBS, LSF,
804 This class is designed to work with UNIX batch systems like PBS, LSF,
804 GridEngine, etc. The overall model is that there are different commands
805 GridEngine, etc. The overall model is that there are different commands
805 like qsub, qdel, etc. that handle the starting and stopping of the process.
806 like qsub, qdel, etc. that handle the starting and stopping of the process.
806
807
807 This class also has the notion of a batch script. The ``batch_template``
808 This class also has the notion of a batch script. The ``batch_template``
808 attribute can be set to a string that is a template for the batch script.
809 attribute can be set to a string that is a template for the batch script.
809 This template is instantiated using string formatting. Thus the template can
810 This template is instantiated using string formatting. Thus the template can
810 use {n} fot the number of instances. Subclasses can add additional variables
811 use {n} fot the number of instances. Subclasses can add additional variables
811 to the template dict.
812 to the template dict.
812 """
813 """
813
814
814 # Subclasses must fill these in. See PBSEngineSet
815 # Subclasses must fill these in. See PBSEngineSet
815 submit_command = List([''], config=True,
816 submit_command = List([''], config=True,
816 help="The name of the command line program used to submit jobs.")
817 help="The name of the command line program used to submit jobs.")
817 delete_command = List([''], config=True,
818 delete_command = List([''], config=True,
818 help="The name of the command line program used to delete jobs.")
819 help="The name of the command line program used to delete jobs.")
819 job_id_regexp = Unicode('', config=True,
820 job_id_regexp = Unicode('', config=True,
820 help="""A regular expression used to get the job id from the output of the
821 help="""A regular expression used to get the job id from the output of the
821 submit_command.""")
822 submit_command.""")
822 batch_template = Unicode('', config=True,
823 batch_template = Unicode('', config=True,
823 help="The string that is the batch script template itself.")
824 help="The string that is the batch script template itself.")
824 batch_template_file = Unicode(u'', config=True,
825 batch_template_file = Unicode(u'', config=True,
825 help="The file that contains the batch template.")
826 help="The file that contains the batch template.")
826 batch_file_name = Unicode(u'batch_script', config=True,
827 batch_file_name = Unicode(u'batch_script', config=True,
827 help="The filename of the instantiated batch script.")
828 help="The filename of the instantiated batch script.")
828 queue = Unicode(u'', config=True,
829 queue = Unicode(u'', config=True,
829 help="The PBS Queue.")
830 help="The PBS Queue.")
830
831
831 # not configurable, override in subclasses
832 # not configurable, override in subclasses
832 # PBS Job Array regex
833 # PBS Job Array regex
833 job_array_regexp = Unicode('')
834 job_array_regexp = Unicode('')
834 job_array_template = Unicode('')
835 job_array_template = Unicode('')
835 # PBS Queue regex
836 # PBS Queue regex
836 queue_regexp = Unicode('')
837 queue_regexp = Unicode('')
837 queue_template = Unicode('')
838 queue_template = Unicode('')
838 # The default batch template, override in subclasses
839 # The default batch template, override in subclasses
839 default_template = Unicode('')
840 default_template = Unicode('')
840 # The full path to the instantiated batch script.
841 # The full path to the instantiated batch script.
841 batch_file = Unicode(u'')
842 batch_file = Unicode(u'')
842 # the format dict used with batch_template:
843 # the format dict used with batch_template:
843 context = Dict()
844 context = Dict()
844 # the Formatter instance for rendering the templates:
845 # the Formatter instance for rendering the templates:
845 formatter = Instance(EvalFormatter, (), {})
846 formatter = Instance(EvalFormatter, (), {})
846
847
847
848
848 def find_args(self):
849 def find_args(self):
849 return self.submit_command + [self.batch_file]
850 return self.submit_command + [self.batch_file]
850
851
851 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 super(BatchSystemLauncher, self).__init__(
853 super(BatchSystemLauncher, self).__init__(
853 work_dir=work_dir, config=config, **kwargs
854 work_dir=work_dir, config=config, **kwargs
854 )
855 )
855 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
856
857
857 def parse_job_id(self, output):
858 def parse_job_id(self, output):
858 """Take the output of the submit command and return the job id."""
859 """Take the output of the submit command and return the job id."""
859 m = re.search(self.job_id_regexp, output)
860 m = re.search(self.job_id_regexp, output)
860 if m is not None:
861 if m is not None:
861 job_id = m.group()
862 job_id = m.group()
862 else:
863 else:
863 raise LauncherError("Job id couldn't be determined: %s" % output)
864 raise LauncherError("Job id couldn't be determined: %s" % output)
864 self.job_id = job_id
865 self.job_id = job_id
865 self.log.info('Job submitted with job id: %r' % job_id)
866 self.log.info('Job submitted with job id: %r' % job_id)
866 return job_id
867 return job_id
867
868
868 def write_batch_script(self, n):
869 def write_batch_script(self, n):
869 """Instantiate and write the batch script to the work_dir."""
870 """Instantiate and write the batch script to the work_dir."""
870 self.context['n'] = n
871 self.context['n'] = n
871 self.context['queue'] = self.queue
872 self.context['queue'] = self.queue
872 # first priority is batch_template if set
873 # first priority is batch_template if set
873 if self.batch_template_file and not self.batch_template:
874 if self.batch_template_file and not self.batch_template:
874 # second priority is batch_template_file
875 # second priority is batch_template_file
875 with open(self.batch_template_file) as f:
876 with open(self.batch_template_file) as f:
876 self.batch_template = f.read()
877 self.batch_template = f.read()
877 if not self.batch_template:
878 if not self.batch_template:
878 # third (last) priority is default_template
879 # third (last) priority is default_template
879 self.batch_template = self.default_template
880 self.batch_template = self.default_template
880
881
881 # add jobarray or queue lines to user-specified template
882 # add jobarray or queue lines to user-specified template
882 # note that this is *only* when user did not specify a template.
883 # note that this is *only* when user did not specify a template.
883 regex = re.compile(self.job_array_regexp)
884 regex = re.compile(self.job_array_regexp)
884 # print regex.search(self.batch_template)
885 # print regex.search(self.batch_template)
885 if not regex.search(self.batch_template):
886 if not regex.search(self.batch_template):
886 self.log.info("adding job array settings to batch script")
887 self.log.info("adding job array settings to batch script")
887 firstline, rest = self.batch_template.split('\n',1)
888 firstline, rest = self.batch_template.split('\n',1)
888 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
889 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
889
890
890 regex = re.compile(self.queue_regexp)
891 regex = re.compile(self.queue_regexp)
891 # print regex.search(self.batch_template)
892 # print regex.search(self.batch_template)
892 if self.queue and not regex.search(self.batch_template):
893 if self.queue and not regex.search(self.batch_template):
893 self.log.info("adding PBS queue settings to batch script")
894 self.log.info("adding PBS queue settings to batch script")
894 firstline, rest = self.batch_template.split('\n',1)
895 firstline, rest = self.batch_template.split('\n',1)
895 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
896 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
896
897
897 script_as_string = self.formatter.format(self.batch_template, **self.context)
898 script_as_string = self.formatter.format(self.batch_template, **self.context)
898 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
899 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
899
900
900 with open(self.batch_file, 'w') as f:
901 with open(self.batch_file, 'w') as f:
901 f.write(script_as_string)
902 f.write(script_as_string)
902 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
903
904
904 def start(self, n, profile_dir):
905 def start(self, n, profile_dir):
905 """Start n copies of the process using a batch system."""
906 """Start n copies of the process using a batch system."""
906 # Here we save profile_dir in the context so they
907 # Here we save profile_dir in the context so they
907 # can be used in the batch script template as {profile_dir}
908 # can be used in the batch script template as {profile_dir}
908 self.context['profile_dir'] = profile_dir
909 self.context['profile_dir'] = profile_dir
909 self.profile_dir = unicode(profile_dir)
910 self.profile_dir = unicode(profile_dir)
910 self.write_batch_script(n)
911 self.write_batch_script(n)
911 output = check_output(self.args, env=os.environ)
912 output = check_output(self.args, env=os.environ)
912
913
913 job_id = self.parse_job_id(output)
914 job_id = self.parse_job_id(output)
914 self.notify_start(job_id)
915 self.notify_start(job_id)
915 return job_id
916 return job_id
916
917
917 def stop(self):
918 def stop(self):
918 output = check_output(self.delete_command+[self.job_id], env=os.environ)
919 output = check_output(self.delete_command+[self.job_id], env=os.environ)
919 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
920 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
920 return output
921 return output
921
922
922
923
923 class PBSLauncher(BatchSystemLauncher):
924 class PBSLauncher(BatchSystemLauncher):
924 """A BatchSystemLauncher subclass for PBS."""
925 """A BatchSystemLauncher subclass for PBS."""
925
926
926 submit_command = List(['qsub'], config=True,
927 submit_command = List(['qsub'], config=True,
927 help="The PBS submit command ['qsub']")
928 help="The PBS submit command ['qsub']")
928 delete_command = List(['qdel'], config=True,
929 delete_command = List(['qdel'], config=True,
929 help="The PBS delete command ['qsub']")
930 help="The PBS delete command ['qsub']")
930 job_id_regexp = Unicode(r'\d+', config=True,
931 job_id_regexp = Unicode(r'\d+', config=True,
931 help="Regular expresion for identifying the job ID [r'\d+']")
932 help="Regular expresion for identifying the job ID [r'\d+']")
932
933
933 batch_file = Unicode(u'')
934 batch_file = Unicode(u'')
934 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
935 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
935 job_array_template = Unicode('#PBS -t 1-{n}')
936 job_array_template = Unicode('#PBS -t 1-{n}')
936 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
937 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
937 queue_template = Unicode('#PBS -q {queue}')
938 queue_template = Unicode('#PBS -q {queue}')
938
939
939
940
940 class PBSControllerLauncher(PBSLauncher):
941 class PBSControllerLauncher(PBSLauncher):
941 """Launch a controller using PBS."""
942 """Launch a controller using PBS."""
942
943
943 batch_file_name = Unicode(u'pbs_controller', config=True,
944 batch_file_name = Unicode(u'pbs_controller', config=True,
944 help="batch file name for the controller job.")
945 help="batch file name for the controller job.")
945 default_template= Unicode("""#!/bin/sh
946 default_template= Unicode("""#!/bin/sh
946 #PBS -V
947 #PBS -V
947 #PBS -N ipcontroller
948 #PBS -N ipcontroller
948 %s --log-to-file --profile-dir={profile_dir}
949 %s --log-to-file --profile-dir={profile_dir}
949 """%(' '.join(ipcontroller_cmd_argv)))
950 """%(' '.join(ipcontroller_cmd_argv)))
950
951
951 def start(self, profile_dir):
952 def start(self, profile_dir):
952 """Start the controller by profile or profile_dir."""
953 """Start the controller by profile or profile_dir."""
953 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
954 return super(PBSControllerLauncher, self).start(1, profile_dir)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
955
956
956
957
957 class PBSEngineSetLauncher(PBSLauncher):
958 class PBSEngineSetLauncher(PBSLauncher):
958 """Launch Engines using PBS"""
959 """Launch Engines using PBS"""
959 batch_file_name = Unicode(u'pbs_engines', config=True,
960 batch_file_name = Unicode(u'pbs_engines', config=True,
960 help="batch file name for the engine(s) job.")
961 help="batch file name for the engine(s) job.")
961 default_template= Unicode(u"""#!/bin/sh
962 default_template= Unicode(u"""#!/bin/sh
962 #PBS -V
963 #PBS -V
963 #PBS -N ipengine
964 #PBS -N ipengine
964 %s --profile-dir={profile_dir}
965 %s --profile-dir={profile_dir}
965 """%(' '.join(ipengine_cmd_argv)))
966 """%(' '.join(ipengine_cmd_argv)))
966
967
967 def start(self, n, profile_dir):
968 def start(self, n, profile_dir):
968 """Start n engines by profile or profile_dir."""
969 """Start n engines by profile or profile_dir."""
969 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
970 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
971
972
972 #SGE is very similar to PBS
973 #SGE is very similar to PBS
973
974
974 class SGELauncher(PBSLauncher):
975 class SGELauncher(PBSLauncher):
975 """Sun GridEngine is a PBS clone with slightly different syntax"""
976 """Sun GridEngine is a PBS clone with slightly different syntax"""
976 job_array_regexp = Unicode('#\$\W+\-t')
977 job_array_regexp = Unicode('#\$\W+\-t')
977 job_array_template = Unicode('#$ -t 1-{n}')
978 job_array_template = Unicode('#$ -t 1-{n}')
978 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
979 queue_template = Unicode('#$ -q {queue}')
980 queue_template = Unicode('#$ -q {queue}')
980
981
981 class SGEControllerLauncher(SGELauncher):
982 class SGEControllerLauncher(SGELauncher):
982 """Launch a controller using SGE."""
983 """Launch a controller using SGE."""
983
984
984 batch_file_name = Unicode(u'sge_controller', config=True,
985 batch_file_name = Unicode(u'sge_controller', config=True,
985 help="batch file name for the ipontroller job.")
986 help="batch file name for the ipontroller job.")
986 default_template= Unicode(u"""#$ -V
987 default_template= Unicode(u"""#$ -V
987 #$ -S /bin/sh
988 #$ -S /bin/sh
988 #$ -N ipcontroller
989 #$ -N ipcontroller
989 %s --log-to-file --profile-dir={profile_dir}
990 %s --log-to-file --profile-dir={profile_dir}
990 """%(' '.join(ipcontroller_cmd_argv)))
991 """%(' '.join(ipcontroller_cmd_argv)))
991
992
992 def start(self, profile_dir):
993 def start(self, profile_dir):
993 """Start the controller by profile or profile_dir."""
994 """Start the controller by profile or profile_dir."""
994 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
995 return super(SGEControllerLauncher, self).start(1, profile_dir)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
996
997
997 class SGEEngineSetLauncher(SGELauncher):
998 class SGEEngineSetLauncher(SGELauncher):
998 """Launch Engines with SGE"""
999 """Launch Engines with SGE"""
999 batch_file_name = Unicode(u'sge_engines', config=True,
1000 batch_file_name = Unicode(u'sge_engines', config=True,
1000 help="batch file name for the engine(s) job.")
1001 help="batch file name for the engine(s) job.")
1001 default_template = Unicode("""#$ -V
1002 default_template = Unicode("""#$ -V
1002 #$ -S /bin/sh
1003 #$ -S /bin/sh
1003 #$ -N ipengine
1004 #$ -N ipengine
1004 %s --profile-dir={profile_dir}
1005 %s --profile-dir={profile_dir}
1005 """%(' '.join(ipengine_cmd_argv)))
1006 """%(' '.join(ipengine_cmd_argv)))
1006
1007
1007 def start(self, n, profile_dir):
1008 def start(self, n, profile_dir):
1008 """Start n engines by profile or profile_dir."""
1009 """Start n engines by profile or profile_dir."""
1009 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1010 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1011
1012
1012
1013
1013 # LSF launchers
1014 # LSF launchers
1014
1015
1015 class LSFLauncher(BatchSystemLauncher):
1016 class LSFLauncher(BatchSystemLauncher):
1016 """A BatchSystemLauncher subclass for LSF."""
1017 """A BatchSystemLauncher subclass for LSF."""
1017
1018
1018 submit_command = List(['bsub'], config=True,
1019 submit_command = List(['bsub'], config=True,
1019 help="The PBS submit command ['bsub']")
1020 help="The PBS submit command ['bsub']")
1020 delete_command = List(['bkill'], config=True,
1021 delete_command = List(['bkill'], config=True,
1021 help="The PBS delete command ['bkill']")
1022 help="The PBS delete command ['bkill']")
1022 job_id_regexp = Unicode(r'\d+', config=True,
1023 job_id_regexp = Unicode(r'\d+', config=True,
1023 help="Regular expresion for identifying the job ID [r'\d+']")
1024 help="Regular expresion for identifying the job ID [r'\d+']")
1024
1025
1025 batch_file = Unicode(u'')
1026 batch_file = Unicode(u'')
1026 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1027 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1027 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1028 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1028 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1029 queue_template = Unicode('#BSUB -q {queue}')
1030 queue_template = Unicode('#BSUB -q {queue}')
1030
1031
1031 def start(self, n, profile_dir):
1032 def start(self, n, profile_dir):
1032 """Start n copies of the process using LSF batch system.
1033 """Start n copies of the process using LSF batch system.
1033 This cant inherit from the base class because bsub expects
1034 This cant inherit from the base class because bsub expects
1034 to be piped a shell script in order to honor the #BSUB directives :
1035 to be piped a shell script in order to honor the #BSUB directives :
1035 bsub < script
1036 bsub < script
1036 """
1037 """
1037 # Here we save profile_dir in the context so they
1038 # Here we save profile_dir in the context so they
1038 # can be used in the batch script template as {profile_dir}
1039 # can be used in the batch script template as {profile_dir}
1039 self.context['profile_dir'] = profile_dir
1040 self.context['profile_dir'] = profile_dir
1040 self.profile_dir = unicode(profile_dir)
1041 self.profile_dir = unicode(profile_dir)
1041 self.write_batch_script(n)
1042 self.write_batch_script(n)
1042 #output = check_output(self.args, env=os.environ)
1043 #output = check_output(self.args, env=os.environ)
1043 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1044 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1045 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1045 output,err = p.communicate()
1046 output,err = p.communicate()
1046 job_id = self.parse_job_id(output)
1047 job_id = self.parse_job_id(output)
1047 self.notify_start(job_id)
1048 self.notify_start(job_id)
1048 return job_id
1049 return job_id
1049
1050
1050
1051
1051 class LSFControllerLauncher(LSFLauncher):
1052 class LSFControllerLauncher(LSFLauncher):
1052 """Launch a controller using LSF."""
1053 """Launch a controller using LSF."""
1053
1054
1054 batch_file_name = Unicode(u'lsf_controller', config=True,
1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1055 help="batch file name for the controller job.")
1056 help="batch file name for the controller job.")
1056 default_template= Unicode("""#!/bin/sh
1057 default_template= Unicode("""#!/bin/sh
1057 #BSUB -J ipcontroller
1058 #BSUB -J ipcontroller
1058 #BSUB -oo ipcontroller.o.%%J
1059 #BSUB -oo ipcontroller.o.%%J
1059 #BSUB -eo ipcontroller.e.%%J
1060 #BSUB -eo ipcontroller.e.%%J
1060 %s --log-to-file --profile-dir={profile_dir}
1061 %s --log-to-file --profile-dir={profile_dir}
1061 """%(' '.join(ipcontroller_cmd_argv)))
1062 """%(' '.join(ipcontroller_cmd_argv)))
1062
1063
1063 def start(self, profile_dir):
1064 def start(self, profile_dir):
1064 """Start the controller by profile or profile_dir."""
1065 """Start the controller by profile or profile_dir."""
1065 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1066 return super(LSFControllerLauncher, self).start(1, profile_dir)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1067
1068
1068
1069
1069 class LSFEngineSetLauncher(LSFLauncher):
1070 class LSFEngineSetLauncher(LSFLauncher):
1070 """Launch Engines using LSF"""
1071 """Launch Engines using LSF"""
1071 batch_file_name = Unicode(u'lsf_engines', config=True,
1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1072 help="batch file name for the engine(s) job.")
1073 help="batch file name for the engine(s) job.")
1073 default_template= Unicode(u"""#!/bin/sh
1074 default_template= Unicode(u"""#!/bin/sh
1074 #BSUB -oo ipengine.o.%%J
1075 #BSUB -oo ipengine.o.%%J
1075 #BSUB -eo ipengine.e.%%J
1076 #BSUB -eo ipengine.e.%%J
1076 %s --profile-dir={profile_dir}
1077 %s --profile-dir={profile_dir}
1077 """%(' '.join(ipengine_cmd_argv)))
1078 """%(' '.join(ipengine_cmd_argv)))
1078
1079
1079 def start(self, n, profile_dir):
1080 def start(self, n, profile_dir):
1080 """Start n engines by profile or profile_dir."""
1081 """Start n engines by profile or profile_dir."""
1081 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1082 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1083
1084
1084
1085
1085 #-----------------------------------------------------------------------------
1086 #-----------------------------------------------------------------------------
1086 # A launcher for ipcluster itself!
1087 # A launcher for ipcluster itself!
1087 #-----------------------------------------------------------------------------
1088 #-----------------------------------------------------------------------------
1088
1089
1089
1090
1090 class IPClusterLauncher(LocalProcessLauncher):
1091 class IPClusterLauncher(LocalProcessLauncher):
1091 """Launch the ipcluster program in an external process."""
1092 """Launch the ipcluster program in an external process."""
1092
1093
1093 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1094 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1094 help="Popen command for ipcluster")
1095 help="Popen command for ipcluster")
1095 ipcluster_args = List(
1096 ipcluster_args = List(
1096 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1097 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1097 help="Command line arguments to pass to ipcluster.")
1098 help="Command line arguments to pass to ipcluster.")
1098 ipcluster_subcommand = Unicode('start')
1099 ipcluster_subcommand = Unicode('start')
1099 ipcluster_n = Int(2)
1100 ipcluster_n = Int(2)
1100
1101
1101 def find_args(self):
1102 def find_args(self):
1102 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1103 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1103 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1104 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1104
1105
1105 def start(self):
1106 def start(self):
1106 self.log.info("Starting ipcluster: %r" % self.args)
1107 self.log.info("Starting ipcluster: %r" % self.args)
1107 return super(IPClusterLauncher, self).start()
1108 return super(IPClusterLauncher, self).start()
1108
1109
1109 #-----------------------------------------------------------------------------
1110 #-----------------------------------------------------------------------------
1110 # Collections of launchers
1111 # Collections of launchers
1111 #-----------------------------------------------------------------------------
1112 #-----------------------------------------------------------------------------
1112
1113
1113 local_launchers = [
1114 local_launchers = [
1114 LocalControllerLauncher,
1115 LocalControllerLauncher,
1115 LocalEngineLauncher,
1116 LocalEngineLauncher,
1116 LocalEngineSetLauncher,
1117 LocalEngineSetLauncher,
1117 ]
1118 ]
1118 mpi_launchers = [
1119 mpi_launchers = [
1119 MPIExecLauncher,
1120 MPIExecLauncher,
1120 MPIExecControllerLauncher,
1121 MPIExecControllerLauncher,
1121 MPIExecEngineSetLauncher,
1122 MPIExecEngineSetLauncher,
1122 ]
1123 ]
1123 ssh_launchers = [
1124 ssh_launchers = [
1124 SSHLauncher,
1125 SSHLauncher,
1125 SSHControllerLauncher,
1126 SSHControllerLauncher,
1126 SSHEngineLauncher,
1127 SSHEngineLauncher,
1127 SSHEngineSetLauncher,
1128 SSHEngineSetLauncher,
1128 ]
1129 ]
1129 winhpc_launchers = [
1130 winhpc_launchers = [
1130 WindowsHPCLauncher,
1131 WindowsHPCLauncher,
1131 WindowsHPCControllerLauncher,
1132 WindowsHPCControllerLauncher,
1132 WindowsHPCEngineSetLauncher,
1133 WindowsHPCEngineSetLauncher,
1133 ]
1134 ]
1134 pbs_launchers = [
1135 pbs_launchers = [
1135 PBSLauncher,
1136 PBSLauncher,
1136 PBSControllerLauncher,
1137 PBSControllerLauncher,
1137 PBSEngineSetLauncher,
1138 PBSEngineSetLauncher,
1138 ]
1139 ]
1139 sge_launchers = [
1140 sge_launchers = [
1140 SGELauncher,
1141 SGELauncher,
1141 SGEControllerLauncher,
1142 SGEControllerLauncher,
1142 SGEEngineSetLauncher,
1143 SGEEngineSetLauncher,
1143 ]
1144 ]
1144 lsf_launchers = [
1145 lsf_launchers = [
1145 LSFLauncher,
1146 LSFLauncher,
1146 LSFControllerLauncher,
1147 LSFControllerLauncher,
1147 LSFEngineSetLauncher,
1148 LSFEngineSetLauncher,
1148 ]
1149 ]
1149 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1150 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1150 + pbs_launchers + sge_launchers + lsf_launchers
1151 + pbs_launchers + sge_launchers + lsf_launchers
1151
1152
General Comments 0
You need to be logged in to leave comments. Login now