##// END OF EJS Templates
don't automatically add jobarray or queue lines to user template...
MinRK -
Show More
@@ -1,1063 +1,1065 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 import copy
23 import copy
24 import logging
24 import logging
25 import os
25 import os
26 import re
26 import re
27 import stat
27 import stat
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
61 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
63
64 from .win32support import forward_read_events
64 from .win32support import forward_read_events
65
65
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67
67
68 WINDOWS = os.name == 'nt'
68 WINDOWS = os.name == 'nt'
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # Paths to the kernel apps
71 # Paths to the kernel apps
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73
73
74
74
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipclusterapp'
76 'IPython.parallel.apps.ipclusterapp'
77 ))
77 ))
78
78
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipengineapp'
80 'IPython.parallel.apps.ipengineapp'
81 ))
81 ))
82
82
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 'IPython.parallel.apps.ipcontrollerapp'
84 'IPython.parallel.apps.ipcontrollerapp'
85 ))
85 ))
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Base launchers and errors
88 # Base launchers and errors
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91
91
92 class LauncherError(Exception):
92 class LauncherError(Exception):
93 pass
93 pass
94
94
95
95
96 class ProcessStateError(LauncherError):
96 class ProcessStateError(LauncherError):
97 pass
97 pass
98
98
99
99
100 class UnknownStatus(LauncherError):
100 class UnknownStatus(LauncherError):
101 pass
101 pass
102
102
103
103
104 class BaseLauncher(LoggingConfigurable):
104 class BaseLauncher(LoggingConfigurable):
105 """An asbtraction for starting, stopping and signaling a process."""
105 """An asbtraction for starting, stopping and signaling a process."""
106
106
107 # In all of the launchers, the work_dir is where child processes will be
107 # In all of the launchers, the work_dir is where child processes will be
108 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # run. This will usually be the profile_dir, but may not be. any work_dir
109 # passed into the __init__ method will override the config value.
109 # passed into the __init__ method will override the config value.
110 # This should not be used to set the work_dir for the actual engine
110 # This should not be used to set the work_dir for the actual engine
111 # and controller. Instead, use their own config files or the
111 # and controller. Instead, use their own config files or the
112 # controller_args, engine_args attributes of the launchers to add
112 # controller_args, engine_args attributes of the launchers to add
113 # the work_dir option.
113 # the work_dir option.
114 work_dir = Unicode(u'.')
114 work_dir = Unicode(u'.')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116
116
117 start_data = Any()
117 start_data = Any()
118 stop_data = Any()
118 stop_data = Any()
119
119
120 def _loop_default(self):
120 def _loop_default(self):
121 return ioloop.IOLoop.instance()
121 return ioloop.IOLoop.instance()
122
122
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 self.state = 'before' # can be before, running, after
125 self.state = 'before' # can be before, running, after
126 self.stop_callbacks = []
126 self.stop_callbacks = []
127 self.start_data = None
127 self.start_data = None
128 self.stop_data = None
128 self.stop_data = None
129
129
130 @property
130 @property
131 def args(self):
131 def args(self):
132 """A list of cmd and args that will be used to start the process.
132 """A list of cmd and args that will be used to start the process.
133
133
134 This is what is passed to :func:`spawnProcess` and the first element
134 This is what is passed to :func:`spawnProcess` and the first element
135 will be the process name.
135 will be the process name.
136 """
136 """
137 return self.find_args()
137 return self.find_args()
138
138
139 def find_args(self):
139 def find_args(self):
140 """The ``.args`` property calls this to find the args list.
140 """The ``.args`` property calls this to find the args list.
141
141
142 Subcommand should implement this to construct the cmd and args.
142 Subcommand should implement this to construct the cmd and args.
143 """
143 """
144 raise NotImplementedError('find_args must be implemented in a subclass')
144 raise NotImplementedError('find_args must be implemented in a subclass')
145
145
146 @property
146 @property
147 def arg_str(self):
147 def arg_str(self):
148 """The string form of the program arguments."""
148 """The string form of the program arguments."""
149 return ' '.join(self.args)
149 return ' '.join(self.args)
150
150
151 @property
151 @property
152 def running(self):
152 def running(self):
153 """Am I running."""
153 """Am I running."""
154 if self.state == 'running':
154 if self.state == 'running':
155 return True
155 return True
156 else:
156 else:
157 return False
157 return False
158
158
159 def start(self):
159 def start(self):
160 """Start the process."""
160 """Start the process."""
161 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
162
162
163 def stop(self):
163 def stop(self):
164 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
165
165
166 This method will return None immediately.
166 This method will return None immediately.
167 To observe the actual process stopping, see :meth:`on_stop`.
167 To observe the actual process stopping, see :meth:`on_stop`.
168 """
168 """
169 raise NotImplementedError('stop must be implemented in a subclass')
169 raise NotImplementedError('stop must be implemented in a subclass')
170
170
171 def on_stop(self, f):
171 def on_stop(self, f):
172 """Register a callback to be called with this Launcher's stop_data
172 """Register a callback to be called with this Launcher's stop_data
173 when the process actually finishes.
173 when the process actually finishes.
174 """
174 """
175 if self.state=='after':
175 if self.state=='after':
176 return f(self.stop_data)
176 return f(self.stop_data)
177 else:
177 else:
178 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
179
179
180 def notify_start(self, data):
180 def notify_start(self, data):
181 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
182
182
183 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
185 """
185 """
186
186
187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
188 self.start_data = data
189 self.state = 'running'
189 self.state = 'running'
190 return data
190 return data
191
191
192 def notify_stop(self, data):
192 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
194
194
195 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger callbacks registered via :meth:`on_stop`."""
196 this to trigger callbacks registered via :meth:`on_stop`."""
197
197
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
199 self.stop_data = data
200 self.state = 'after'
200 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
203 d(data)
203 d(data)
204 return data
204 return data
205
205
206 def signal(self, sig):
206 def signal(self, sig):
207 """Signal the process.
207 """Signal the process.
208
208
209 Parameters
209 Parameters
210 ----------
210 ----------
211 sig : str or int
211 sig : str or int
212 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
213 """
213 """
214 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
215
215
216
216
217 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
218 # Local process launchers
218 # Local process launchers
219 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220
220
221
221
222 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
223 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
224
224
225 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
226 ``self.work_dir``.
226 ``self.work_dir``.
227 """
227 """
228
228
229 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
230 # spawnProcess.
230 # spawnProcess.
231 cmd_and_args = List([])
231 cmd_and_args = List([])
232 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
233
233
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
236 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
237 )
237 )
238 self.process = None
238 self.process = None
239 self.poller = None
239 self.poller = None
240
240
241 def find_args(self):
241 def find_args(self):
242 return self.cmd_and_args
242 return self.cmd_and_args
243
243
244 def start(self):
244 def start(self):
245 if self.state == 'before':
245 if self.state == 'before':
246 self.process = Popen(self.args,
246 self.process = Popen(self.args,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 env=os.environ,
248 env=os.environ,
249 cwd=self.work_dir
249 cwd=self.work_dir
250 )
250 )
251 if WINDOWS:
251 if WINDOWS:
252 self.stdout = forward_read_events(self.process.stdout)
252 self.stdout = forward_read_events(self.process.stdout)
253 self.stderr = forward_read_events(self.process.stderr)
253 self.stderr = forward_read_events(self.process.stderr)
254 else:
254 else:
255 self.stdout = self.process.stdout.fileno()
255 self.stdout = self.process.stdout.fileno()
256 self.stderr = self.process.stderr.fileno()
256 self.stderr = self.process.stderr.fileno()
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
260 self.poller.start()
260 self.poller.start()
261 self.notify_start(self.process.pid)
261 self.notify_start(self.process.pid)
262 else:
262 else:
263 s = 'The process was already started and has state: %r' % self.state
263 s = 'The process was already started and has state: %r' % self.state
264 raise ProcessStateError(s)
264 raise ProcessStateError(s)
265
265
266 def stop(self):
266 def stop(self):
267 return self.interrupt_then_kill()
267 return self.interrupt_then_kill()
268
268
269 def signal(self, sig):
269 def signal(self, sig):
270 if self.state == 'running':
270 if self.state == 'running':
271 if WINDOWS and sig != SIGINT:
271 if WINDOWS and sig != SIGINT:
272 # use Windows tree-kill for better child cleanup
272 # use Windows tree-kill for better child cleanup
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
274 else:
274 else:
275 self.process.send_signal(sig)
275 self.process.send_signal(sig)
276
276
277 def interrupt_then_kill(self, delay=2.0):
277 def interrupt_then_kill(self, delay=2.0):
278 """Send INT, wait a delay and then send KILL."""
278 """Send INT, wait a delay and then send KILL."""
279 try:
279 try:
280 self.signal(SIGINT)
280 self.signal(SIGINT)
281 except Exception:
281 except Exception:
282 self.log.debug("interrupt failed")
282 self.log.debug("interrupt failed")
283 pass
283 pass
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
285 self.killer.start()
285 self.killer.start()
286
286
287 # callbacks, etc:
287 # callbacks, etc:
288
288
289 def handle_stdout(self, fd, events):
289 def handle_stdout(self, fd, events):
290 if WINDOWS:
290 if WINDOWS:
291 line = self.stdout.recv()
291 line = self.stdout.recv()
292 else:
292 else:
293 line = self.process.stdout.readline()
293 line = self.process.stdout.readline()
294 # a stopped process will be readable but return empty strings
294 # a stopped process will be readable but return empty strings
295 if line:
295 if line:
296 self.log.info(line[:-1])
296 self.log.info(line[:-1])
297 else:
297 else:
298 self.poll()
298 self.poll()
299
299
300 def handle_stderr(self, fd, events):
300 def handle_stderr(self, fd, events):
301 if WINDOWS:
301 if WINDOWS:
302 line = self.stderr.recv()
302 line = self.stderr.recv()
303 else:
303 else:
304 line = self.process.stderr.readline()
304 line = self.process.stderr.readline()
305 # a stopped process will be readable but return empty strings
305 # a stopped process will be readable but return empty strings
306 if line:
306 if line:
307 self.log.error(line[:-1])
307 self.log.error(line[:-1])
308 else:
308 else:
309 self.poll()
309 self.poll()
310
310
311 def poll(self):
311 def poll(self):
312 status = self.process.poll()
312 status = self.process.poll()
313 if status is not None:
313 if status is not None:
314 self.poller.stop()
314 self.poller.stop()
315 self.loop.remove_handler(self.stdout)
315 self.loop.remove_handler(self.stdout)
316 self.loop.remove_handler(self.stderr)
316 self.loop.remove_handler(self.stderr)
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 return status
318 return status
319
319
320 class LocalControllerLauncher(LocalProcessLauncher):
320 class LocalControllerLauncher(LocalProcessLauncher):
321 """Launch a controller as a regular external process."""
321 """Launch a controller as a regular external process."""
322
322
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
326 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
327 help="""command-line args to pass to ipcontroller""")
328
328
329 def find_args(self):
329 def find_args(self):
330 return self.controller_cmd + self.controller_args
330 return self.controller_cmd + self.controller_args
331
331
332 def start(self, profile_dir):
332 def start(self, profile_dir):
333 """Start the controller by profile_dir."""
333 """Start the controller by profile_dir."""
334 self.controller_args.extend(['profile_dir=%s'%profile_dir])
334 self.controller_args.extend(['profile_dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
335 self.profile_dir = unicode(profile_dir)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 return super(LocalControllerLauncher, self).start()
337 return super(LocalControllerLauncher, self).start()
338
338
339
339
340 class LocalEngineLauncher(LocalProcessLauncher):
340 class LocalEngineLauncher(LocalProcessLauncher):
341 """Launch a single engine as a regular externall process."""
341 """Launch a single engine as a regular externall process."""
342
342
343 engine_cmd = List(ipengine_cmd_argv, config=True,
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
346 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
347 help="command-line arguments to pass to ipengine"
348 )
348 )
349
349
350 def find_args(self):
350 def find_args(self):
351 return self.engine_cmd + self.engine_args
351 return self.engine_cmd + self.engine_args
352
352
353 def start(self, profile_dir):
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['profile_dir=%s'%profile_dir])
355 self.engine_args.extend(['profile_dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
357 return super(LocalEngineLauncher, self).start()
358
358
359
359
360 class LocalEngineSetLauncher(BaseLauncher):
360 class LocalEngineSetLauncher(BaseLauncher):
361 """Launch a set of engines as regular external processes."""
361 """Launch a set of engines as regular external processes."""
362
362
363 # Command line arguments for ipengine.
363 # Command line arguments for ipengine.
364 engine_args = List(
364 engine_args = List(
365 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
365 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
366 help="command-line arguments to pass to ipengine"
367 )
367 )
368 # launcher class
368 # launcher class
369 launcher_class = LocalEngineLauncher
369 launcher_class = LocalEngineLauncher
370
370
371 launchers = Dict()
371 launchers = Dict()
372 stop_data = Dict()
372 stop_data = Dict()
373
373
374 def __init__(self, work_dir=u'.', config=None, **kwargs):
374 def __init__(self, work_dir=u'.', config=None, **kwargs):
375 super(LocalEngineSetLauncher, self).__init__(
375 super(LocalEngineSetLauncher, self).__init__(
376 work_dir=work_dir, config=config, **kwargs
376 work_dir=work_dir, config=config, **kwargs
377 )
377 )
378 self.stop_data = {}
378 self.stop_data = {}
379
379
380 def start(self, n, profile_dir):
380 def start(self, n, profile_dir):
381 """Start n engines by profile or profile_dir."""
381 """Start n engines by profile or profile_dir."""
382 self.profile_dir = unicode(profile_dir)
382 self.profile_dir = unicode(profile_dir)
383 dlist = []
383 dlist = []
384 for i in range(n):
384 for i in range(n):
385 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
386 # Copy the engine args over to each engine launcher.
386 # Copy the engine args over to each engine launcher.
387 el.engine_args = copy.deepcopy(self.engine_args)
387 el.engine_args = copy.deepcopy(self.engine_args)
388 el.on_stop(self._notice_engine_stopped)
388 el.on_stop(self._notice_engine_stopped)
389 d = el.start(profile_dir)
389 d = el.start(profile_dir)
390 if i==0:
390 if i==0:
391 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
392 self.launchers[i] = el
392 self.launchers[i] = el
393 dlist.append(d)
393 dlist.append(d)
394 self.notify_start(dlist)
394 self.notify_start(dlist)
395 # The consumeErrors here could be dangerous
395 # The consumeErrors here could be dangerous
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 # dfinal.addCallback(self.notify_start)
397 # dfinal.addCallback(self.notify_start)
398 return dlist
398 return dlist
399
399
400 def find_args(self):
400 def find_args(self):
401 return ['engine set']
401 return ['engine set']
402
402
403 def signal(self, sig):
403 def signal(self, sig):
404 dlist = []
404 dlist = []
405 for el in self.launchers.itervalues():
405 for el in self.launchers.itervalues():
406 d = el.signal(sig)
406 d = el.signal(sig)
407 dlist.append(d)
407 dlist.append(d)
408 # dfinal = gatherBoth(dlist, consumeErrors=True)
408 # dfinal = gatherBoth(dlist, consumeErrors=True)
409 return dlist
409 return dlist
410
410
411 def interrupt_then_kill(self, delay=1.0):
411 def interrupt_then_kill(self, delay=1.0):
412 dlist = []
412 dlist = []
413 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
414 d = el.interrupt_then_kill(delay)
414 d = el.interrupt_then_kill(delay)
415 dlist.append(d)
415 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
417 return dlist
418
418
419 def stop(self):
419 def stop(self):
420 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
421
421
422 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
423 pid = data['pid']
423 pid = data['pid']
424 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
425 if el.process.pid == pid:
425 if el.process.pid == pid:
426 break
426 break
427 self.launchers.pop(idx)
427 self.launchers.pop(idx)
428 self.stop_data[idx] = data
428 self.stop_data[idx] = data
429 if not self.launchers:
429 if not self.launchers:
430 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
431
431
432
432
433 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
434 # MPIExec launchers
434 # MPIExec launchers
435 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
436
436
437
437
438 class MPIExecLauncher(LocalProcessLauncher):
438 class MPIExecLauncher(LocalProcessLauncher):
439 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
440
440
441 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
442 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
443 )
443 )
444 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
445 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
446 )
446 )
447 program = List(['date'], config=True,
447 program = List(['date'], config=True,
448 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
449 program_args = List([], config=True,
449 program_args = List([], config=True,
450 help="The command line argument to the program."
450 help="The command line argument to the program."
451 )
451 )
452 n = Int(1)
452 n = Int(1)
453
453
454 def find_args(self):
454 def find_args(self):
455 """Build self.args using all the fields."""
455 """Build self.args using all the fields."""
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
457 self.program + self.program_args
457 self.program + self.program_args
458
458
459 def start(self, n):
459 def start(self, n):
460 """Start n instances of the program using mpiexec."""
460 """Start n instances of the program using mpiexec."""
461 self.n = n
461 self.n = n
462 return super(MPIExecLauncher, self).start()
462 return super(MPIExecLauncher, self).start()
463
463
464
464
465 class MPIExecControllerLauncher(MPIExecLauncher):
465 class MPIExecControllerLauncher(MPIExecLauncher):
466 """Launch a controller using mpiexec."""
466 """Launch a controller using mpiexec."""
467
467
468 controller_cmd = List(ipcontroller_cmd_argv, config=True,
468 controller_cmd = List(ipcontroller_cmd_argv, config=True,
469 help="Popen command to launch the Contropper"
469 help="Popen command to launch the Contropper"
470 )
470 )
471 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
471 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
472 help="Command line arguments to pass to ipcontroller."
472 help="Command line arguments to pass to ipcontroller."
473 )
473 )
474 n = Int(1)
474 n = Int(1)
475
475
476 def start(self, profile_dir):
476 def start(self, profile_dir):
477 """Start the controller by profile_dir."""
477 """Start the controller by profile_dir."""
478 self.controller_args.extend(['profile_dir=%s'%profile_dir])
478 self.controller_args.extend(['profile_dir=%s'%profile_dir])
479 self.profile_dir = unicode(profile_dir)
479 self.profile_dir = unicode(profile_dir)
480 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
481 return super(MPIExecControllerLauncher, self).start(1)
481 return super(MPIExecControllerLauncher, self).start(1)
482
482
483 def find_args(self):
483 def find_args(self):
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
485 self.controller_cmd + self.controller_args
485 self.controller_cmd + self.controller_args
486
486
487
487
488 class MPIExecEngineSetLauncher(MPIExecLauncher):
488 class MPIExecEngineSetLauncher(MPIExecLauncher):
489
489
490 program = List(ipengine_cmd_argv, config=True,
490 program = List(ipengine_cmd_argv, config=True,
491 help="Popen command for ipengine"
491 help="Popen command for ipengine"
492 )
492 )
493 program_args = List(
493 program_args = List(
494 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
494 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
495 help="Command line arguments for ipengine."
495 help="Command line arguments for ipengine."
496 )
496 )
497 n = Int(1)
497 n = Int(1)
498
498
499 def start(self, n, profile_dir):
499 def start(self, n, profile_dir):
500 """Start n engines by profile or profile_dir."""
500 """Start n engines by profile or profile_dir."""
501 self.program_args.extend(['profile_dir=%s'%profile_dir])
501 self.program_args.extend(['profile_dir=%s'%profile_dir])
502 self.profile_dir = unicode(profile_dir)
502 self.profile_dir = unicode(profile_dir)
503 self.n = n
503 self.n = n
504 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
504 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
505 return super(MPIExecEngineSetLauncher, self).start(n)
505 return super(MPIExecEngineSetLauncher, self).start(n)
506
506
507 #-----------------------------------------------------------------------------
507 #-----------------------------------------------------------------------------
508 # SSH launchers
508 # SSH launchers
509 #-----------------------------------------------------------------------------
509 #-----------------------------------------------------------------------------
510
510
511 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
511 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
512
512
513 class SSHLauncher(LocalProcessLauncher):
513 class SSHLauncher(LocalProcessLauncher):
514 """A minimal launcher for ssh.
514 """A minimal launcher for ssh.
515
515
516 To be useful this will probably have to be extended to use the ``sshx``
516 To be useful this will probably have to be extended to use the ``sshx``
517 idea for environment variables. There could be other things this needs
517 idea for environment variables. There could be other things this needs
518 as well.
518 as well.
519 """
519 """
520
520
521 ssh_cmd = List(['ssh'], config=True,
521 ssh_cmd = List(['ssh'], config=True,
522 help="command for starting ssh")
522 help="command for starting ssh")
523 ssh_args = List(['-tt'], config=True,
523 ssh_args = List(['-tt'], config=True,
524 help="args to pass to ssh")
524 help="args to pass to ssh")
525 program = List(['date'], config=True,
525 program = List(['date'], config=True,
526 help="Program to launch via ssh")
526 help="Program to launch via ssh")
527 program_args = List([], config=True,
527 program_args = List([], config=True,
528 help="args to pass to remote program")
528 help="args to pass to remote program")
529 hostname = Unicode('', config=True,
529 hostname = Unicode('', config=True,
530 help="hostname on which to launch the program")
530 help="hostname on which to launch the program")
531 user = Unicode('', config=True,
531 user = Unicode('', config=True,
532 help="username for ssh")
532 help="username for ssh")
533 location = Unicode('', config=True,
533 location = Unicode('', config=True,
534 help="user@hostname location for ssh in one setting")
534 help="user@hostname location for ssh in one setting")
535
535
536 def _hostname_changed(self, name, old, new):
536 def _hostname_changed(self, name, old, new):
537 if self.user:
537 if self.user:
538 self.location = u'%s@%s' % (self.user, new)
538 self.location = u'%s@%s' % (self.user, new)
539 else:
539 else:
540 self.location = new
540 self.location = new
541
541
542 def _user_changed(self, name, old, new):
542 def _user_changed(self, name, old, new):
543 self.location = u'%s@%s' % (new, self.hostname)
543 self.location = u'%s@%s' % (new, self.hostname)
544
544
545 def find_args(self):
545 def find_args(self):
546 return self.ssh_cmd + self.ssh_args + [self.location] + \
546 return self.ssh_cmd + self.ssh_args + [self.location] + \
547 self.program + self.program_args
547 self.program + self.program_args
548
548
549 def start(self, profile_dir, hostname=None, user=None):
549 def start(self, profile_dir, hostname=None, user=None):
550 self.profile_dir = unicode(profile_dir)
550 self.profile_dir = unicode(profile_dir)
551 if hostname is not None:
551 if hostname is not None:
552 self.hostname = hostname
552 self.hostname = hostname
553 if user is not None:
553 if user is not None:
554 self.user = user
554 self.user = user
555
555
556 return super(SSHLauncher, self).start()
556 return super(SSHLauncher, self).start()
557
557
558 def signal(self, sig):
558 def signal(self, sig):
559 if self.state == 'running':
559 if self.state == 'running':
560 # send escaped ssh connection-closer
560 # send escaped ssh connection-closer
561 self.process.stdin.write('~.')
561 self.process.stdin.write('~.')
562 self.process.stdin.flush()
562 self.process.stdin.flush()
563
563
564
564
565
565
566 class SSHControllerLauncher(SSHLauncher):
566 class SSHControllerLauncher(SSHLauncher):
567
567
568 program = List(ipcontroller_cmd_argv, config=True,
568 program = List(ipcontroller_cmd_argv, config=True,
569 help="remote ipcontroller command.")
569 help="remote ipcontroller command.")
570 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
570 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
571 help="Command line arguments to ipcontroller.")
571 help="Command line arguments to ipcontroller.")
572
572
573
573
574 class SSHEngineLauncher(SSHLauncher):
574 class SSHEngineLauncher(SSHLauncher):
575 program = List(ipengine_cmd_argv, config=True,
575 program = List(ipengine_cmd_argv, config=True,
576 help="remote ipengine command.")
576 help="remote ipengine command.")
577 # Command line arguments for ipengine.
577 # Command line arguments for ipengine.
578 program_args = List(
578 program_args = List(
579 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
579 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
580 help="Command line arguments to ipengine."
580 help="Command line arguments to ipengine."
581 )
581 )
582
582
583 class SSHEngineSetLauncher(LocalEngineSetLauncher):
583 class SSHEngineSetLauncher(LocalEngineSetLauncher):
584 launcher_class = SSHEngineLauncher
584 launcher_class = SSHEngineLauncher
585 engines = Dict(config=True,
585 engines = Dict(config=True,
586 help="""dict of engines to launch. This is a dict by hostname of ints,
586 help="""dict of engines to launch. This is a dict by hostname of ints,
587 corresponding to the number of engines to start on that host.""")
587 corresponding to the number of engines to start on that host.""")
588
588
589 def start(self, n, profile_dir):
589 def start(self, n, profile_dir):
590 """Start engines by profile or profile_dir.
590 """Start engines by profile or profile_dir.
591 `n` is ignored, and the `engines` config property is used instead.
591 `n` is ignored, and the `engines` config property is used instead.
592 """
592 """
593
593
594 self.profile_dir = unicode(profile_dir)
594 self.profile_dir = unicode(profile_dir)
595 dlist = []
595 dlist = []
596 for host, n in self.engines.iteritems():
596 for host, n in self.engines.iteritems():
597 if isinstance(n, (tuple, list)):
597 if isinstance(n, (tuple, list)):
598 n, args = n
598 n, args = n
599 else:
599 else:
600 args = copy.deepcopy(self.engine_args)
600 args = copy.deepcopy(self.engine_args)
601
601
602 if '@' in host:
602 if '@' in host:
603 user,host = host.split('@',1)
603 user,host = host.split('@',1)
604 else:
604 else:
605 user=None
605 user=None
606 for i in range(n):
606 for i in range(n):
607 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
608
608
609 # Copy the engine args over to each engine launcher.
609 # Copy the engine args over to each engine launcher.
610 i
610 i
611 el.program_args = args
611 el.program_args = args
612 el.on_stop(self._notice_engine_stopped)
612 el.on_stop(self._notice_engine_stopped)
613 d = el.start(profile_dir, user=user, hostname=host)
613 d = el.start(profile_dir, user=user, hostname=host)
614 if i==0:
614 if i==0:
615 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
615 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
616 self.launchers[host+str(i)] = el
616 self.launchers[host+str(i)] = el
617 dlist.append(d)
617 dlist.append(d)
618 self.notify_start(dlist)
618 self.notify_start(dlist)
619 return dlist
619 return dlist
620
620
621
621
622
622
623 #-----------------------------------------------------------------------------
623 #-----------------------------------------------------------------------------
624 # Windows HPC Server 2008 scheduler launchers
624 # Windows HPC Server 2008 scheduler launchers
625 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
626
626
627
627
628 # This is only used on Windows.
628 # This is only used on Windows.
629 def find_job_cmd():
629 def find_job_cmd():
630 if WINDOWS:
630 if WINDOWS:
631 try:
631 try:
632 return find_cmd('job')
632 return find_cmd('job')
633 except (FindCmdError, ImportError):
633 except (FindCmdError, ImportError):
634 # ImportError will be raised if win32api is not installed
634 # ImportError will be raised if win32api is not installed
635 return 'job'
635 return 'job'
636 else:
636 else:
637 return 'job'
637 return 'job'
638
638
639
639
640 class WindowsHPCLauncher(BaseLauncher):
640 class WindowsHPCLauncher(BaseLauncher):
641
641
642 job_id_regexp = Unicode(r'\d+', config=True,
642 job_id_regexp = Unicode(r'\d+', config=True,
643 help="""A regular expression used to get the job id from the output of the
643 help="""A regular expression used to get the job id from the output of the
644 submit_command. """
644 submit_command. """
645 )
645 )
646 job_file_name = Unicode(u'ipython_job.xml', config=True,
646 job_file_name = Unicode(u'ipython_job.xml', config=True,
647 help="The filename of the instantiated job script.")
647 help="The filename of the instantiated job script.")
648 # The full path to the instantiated job script. This gets made dynamically
648 # The full path to the instantiated job script. This gets made dynamically
649 # by combining the work_dir with the job_file_name.
649 # by combining the work_dir with the job_file_name.
650 job_file = Unicode(u'')
650 job_file = Unicode(u'')
651 scheduler = Unicode('', config=True,
651 scheduler = Unicode('', config=True,
652 help="The hostname of the scheduler to submit the job to.")
652 help="The hostname of the scheduler to submit the job to.")
653 job_cmd = Unicode(find_job_cmd(), config=True,
653 job_cmd = Unicode(find_job_cmd(), config=True,
654 help="The command for submitting jobs.")
654 help="The command for submitting jobs.")
655
655
656 def __init__(self, work_dir=u'.', config=None, **kwargs):
656 def __init__(self, work_dir=u'.', config=None, **kwargs):
657 super(WindowsHPCLauncher, self).__init__(
657 super(WindowsHPCLauncher, self).__init__(
658 work_dir=work_dir, config=config, **kwargs
658 work_dir=work_dir, config=config, **kwargs
659 )
659 )
660
660
661 @property
661 @property
662 def job_file(self):
662 def job_file(self):
663 return os.path.join(self.work_dir, self.job_file_name)
663 return os.path.join(self.work_dir, self.job_file_name)
664
664
665 def write_job_file(self, n):
665 def write_job_file(self, n):
666 raise NotImplementedError("Implement write_job_file in a subclass.")
666 raise NotImplementedError("Implement write_job_file in a subclass.")
667
667
668 def find_args(self):
668 def find_args(self):
669 return [u'job.exe']
669 return [u'job.exe']
670
670
671 def parse_job_id(self, output):
671 def parse_job_id(self, output):
672 """Take the output of the submit command and return the job id."""
672 """Take the output of the submit command and return the job id."""
673 m = re.search(self.job_id_regexp, output)
673 m = re.search(self.job_id_regexp, output)
674 if m is not None:
674 if m is not None:
675 job_id = m.group()
675 job_id = m.group()
676 else:
676 else:
677 raise LauncherError("Job id couldn't be determined: %s" % output)
677 raise LauncherError("Job id couldn't be determined: %s" % output)
678 self.job_id = job_id
678 self.job_id = job_id
679 self.log.info('Job started with job id: %r' % job_id)
679 self.log.info('Job started with job id: %r' % job_id)
680 return job_id
680 return job_id
681
681
682 def start(self, n):
682 def start(self, n):
683 """Start n copies of the process using the Win HPC job scheduler."""
683 """Start n copies of the process using the Win HPC job scheduler."""
684 self.write_job_file(n)
684 self.write_job_file(n)
685 args = [
685 args = [
686 'submit',
686 'submit',
687 '/jobfile:%s' % self.job_file,
687 '/jobfile:%s' % self.job_file,
688 '/scheduler:%s' % self.scheduler
688 '/scheduler:%s' % self.scheduler
689 ]
689 ]
690 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
690 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
691
691
692 output = check_output([self.job_cmd]+args,
692 output = check_output([self.job_cmd]+args,
693 env=os.environ,
693 env=os.environ,
694 cwd=self.work_dir,
694 cwd=self.work_dir,
695 stderr=STDOUT
695 stderr=STDOUT
696 )
696 )
697 job_id = self.parse_job_id(output)
697 job_id = self.parse_job_id(output)
698 self.notify_start(job_id)
698 self.notify_start(job_id)
699 return job_id
699 return job_id
700
700
701 def stop(self):
701 def stop(self):
702 args = [
702 args = [
703 'cancel',
703 'cancel',
704 self.job_id,
704 self.job_id,
705 '/scheduler:%s' % self.scheduler
705 '/scheduler:%s' % self.scheduler
706 ]
706 ]
707 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
707 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
708 try:
708 try:
709 output = check_output([self.job_cmd]+args,
709 output = check_output([self.job_cmd]+args,
710 env=os.environ,
710 env=os.environ,
711 cwd=self.work_dir,
711 cwd=self.work_dir,
712 stderr=STDOUT
712 stderr=STDOUT
713 )
713 )
714 except:
714 except:
715 output = 'The job already appears to be stoppped: %r' % self.job_id
715 output = 'The job already appears to be stoppped: %r' % self.job_id
716 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
716 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
717 return output
717 return output
718
718
719
719
720 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
720 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
721
721
722 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
722 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
723 help="WinHPC xml job file.")
723 help="WinHPC xml job file.")
724 extra_args = List([], config=False,
724 extra_args = List([], config=False,
725 help="extra args to pass to ipcontroller")
725 help="extra args to pass to ipcontroller")
726
726
727 def write_job_file(self, n):
727 def write_job_file(self, n):
728 job = IPControllerJob(config=self.config)
728 job = IPControllerJob(config=self.config)
729
729
730 t = IPControllerTask(config=self.config)
730 t = IPControllerTask(config=self.config)
731 # The tasks work directory is *not* the actual work directory of
731 # The tasks work directory is *not* the actual work directory of
732 # the controller. It is used as the base path for the stdout/stderr
732 # the controller. It is used as the base path for the stdout/stderr
733 # files that the scheduler redirects to.
733 # files that the scheduler redirects to.
734 t.work_directory = self.profile_dir
734 t.work_directory = self.profile_dir
735 # Add the profile_dir and from self.start().
735 # Add the profile_dir and from self.start().
736 t.controller_args.extend(self.extra_args)
736 t.controller_args.extend(self.extra_args)
737 job.add_task(t)
737 job.add_task(t)
738
738
739 self.log.info("Writing job description file: %s" % self.job_file)
739 self.log.info("Writing job description file: %s" % self.job_file)
740 job.write(self.job_file)
740 job.write(self.job_file)
741
741
742 @property
742 @property
743 def job_file(self):
743 def job_file(self):
744 return os.path.join(self.profile_dir, self.job_file_name)
744 return os.path.join(self.profile_dir, self.job_file_name)
745
745
746 def start(self, profile_dir):
746 def start(self, profile_dir):
747 """Start the controller by profile_dir."""
747 """Start the controller by profile_dir."""
748 self.extra_args = ['profile_dir=%s'%profile_dir]
748 self.extra_args = ['profile_dir=%s'%profile_dir]
749 self.profile_dir = unicode(profile_dir)
749 self.profile_dir = unicode(profile_dir)
750 return super(WindowsHPCControllerLauncher, self).start(1)
750 return super(WindowsHPCControllerLauncher, self).start(1)
751
751
752
752
753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
754
754
755 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
755 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
756 help="jobfile for ipengines job")
756 help="jobfile for ipengines job")
757 extra_args = List([], config=False,
757 extra_args = List([], config=False,
758 help="extra args to pas to ipengine")
758 help="extra args to pas to ipengine")
759
759
760 def write_job_file(self, n):
760 def write_job_file(self, n):
761 job = IPEngineSetJob(config=self.config)
761 job = IPEngineSetJob(config=self.config)
762
762
763 for i in range(n):
763 for i in range(n):
764 t = IPEngineTask(config=self.config)
764 t = IPEngineTask(config=self.config)
765 # The tasks work directory is *not* the actual work directory of
765 # The tasks work directory is *not* the actual work directory of
766 # the engine. It is used as the base path for the stdout/stderr
766 # the engine. It is used as the base path for the stdout/stderr
767 # files that the scheduler redirects to.
767 # files that the scheduler redirects to.
768 t.work_directory = self.profile_dir
768 t.work_directory = self.profile_dir
769 # Add the profile_dir and from self.start().
769 # Add the profile_dir and from self.start().
770 t.engine_args.extend(self.extra_args)
770 t.engine_args.extend(self.extra_args)
771 job.add_task(t)
771 job.add_task(t)
772
772
773 self.log.info("Writing job description file: %s" % self.job_file)
773 self.log.info("Writing job description file: %s" % self.job_file)
774 job.write(self.job_file)
774 job.write(self.job_file)
775
775
776 @property
776 @property
777 def job_file(self):
777 def job_file(self):
778 return os.path.join(self.profile_dir, self.job_file_name)
778 return os.path.join(self.profile_dir, self.job_file_name)
779
779
780 def start(self, n, profile_dir):
780 def start(self, n, profile_dir):
781 """Start the controller by profile_dir."""
781 """Start the controller by profile_dir."""
782 self.extra_args = ['profile_dir=%s'%profile_dir]
782 self.extra_args = ['profile_dir=%s'%profile_dir]
783 self.profile_dir = unicode(profile_dir)
783 self.profile_dir = unicode(profile_dir)
784 return super(WindowsHPCEngineSetLauncher, self).start(n)
784 return super(WindowsHPCEngineSetLauncher, self).start(n)
785
785
786
786
787 #-----------------------------------------------------------------------------
787 #-----------------------------------------------------------------------------
788 # Batch (PBS) system launchers
788 # Batch (PBS) system launchers
789 #-----------------------------------------------------------------------------
789 #-----------------------------------------------------------------------------
790
790
791 class BatchSystemLauncher(BaseLauncher):
791 class BatchSystemLauncher(BaseLauncher):
792 """Launch an external process using a batch system.
792 """Launch an external process using a batch system.
793
793
794 This class is designed to work with UNIX batch systems like PBS, LSF,
794 This class is designed to work with UNIX batch systems like PBS, LSF,
795 GridEngine, etc. The overall model is that there are different commands
795 GridEngine, etc. The overall model is that there are different commands
796 like qsub, qdel, etc. that handle the starting and stopping of the process.
796 like qsub, qdel, etc. that handle the starting and stopping of the process.
797
797
798 This class also has the notion of a batch script. The ``batch_template``
798 This class also has the notion of a batch script. The ``batch_template``
799 attribute can be set to a string that is a template for the batch script.
799 attribute can be set to a string that is a template for the batch script.
800 This template is instantiated using string formatting. Thus the template can
800 This template is instantiated using string formatting. Thus the template can
801 use {n} fot the number of instances. Subclasses can add additional variables
801 use {n} fot the number of instances. Subclasses can add additional variables
802 to the template dict.
802 to the template dict.
803 """
803 """
804
804
805 # Subclasses must fill these in. See PBSEngineSet
805 # Subclasses must fill these in. See PBSEngineSet
806 submit_command = List([''], config=True,
806 submit_command = List([''], config=True,
807 help="The name of the command line program used to submit jobs.")
807 help="The name of the command line program used to submit jobs.")
808 delete_command = List([''], config=True,
808 delete_command = List([''], config=True,
809 help="The name of the command line program used to delete jobs.")
809 help="The name of the command line program used to delete jobs.")
810 job_id_regexp = Unicode('', config=True,
810 job_id_regexp = Unicode('', config=True,
811 help="""A regular expression used to get the job id from the output of the
811 help="""A regular expression used to get the job id from the output of the
812 submit_command.""")
812 submit_command.""")
813 batch_template = Unicode('', config=True,
813 batch_template = Unicode('', config=True,
814 help="The string that is the batch script template itself.")
814 help="The string that is the batch script template itself.")
815 batch_template_file = Unicode(u'', config=True,
815 batch_template_file = Unicode(u'', config=True,
816 help="The file that contains the batch template.")
816 help="The file that contains the batch template.")
817 batch_file_name = Unicode(u'batch_script', config=True,
817 batch_file_name = Unicode(u'batch_script', config=True,
818 help="The filename of the instantiated batch script.")
818 help="The filename of the instantiated batch script.")
819 queue = Unicode(u'', config=True,
819 queue = Unicode(u'', config=True,
820 help="The PBS Queue.")
820 help="The PBS Queue.")
821
821
822 # not configurable, override in subclasses
822 # not configurable, override in subclasses
823 # PBS Job Array regex
823 # PBS Job Array regex
824 job_array_regexp = Unicode('')
824 job_array_regexp = Unicode('')
825 job_array_template = Unicode('')
825 job_array_template = Unicode('')
826 # PBS Queue regex
826 # PBS Queue regex
827 queue_regexp = Unicode('')
827 queue_regexp = Unicode('')
828 queue_template = Unicode('')
828 queue_template = Unicode('')
829 # The default batch template, override in subclasses
829 # The default batch template, override in subclasses
830 default_template = Unicode('')
830 default_template = Unicode('')
831 # The full path to the instantiated batch script.
831 # The full path to the instantiated batch script.
832 batch_file = Unicode(u'')
832 batch_file = Unicode(u'')
833 # the format dict used with batch_template:
833 # the format dict used with batch_template:
834 context = Dict()
834 context = Dict()
835 # the Formatter instance for rendering the templates:
835 # the Formatter instance for rendering the templates:
836 formatter = Instance(EvalFormatter, (), {})
836 formatter = Instance(EvalFormatter, (), {})
837
837
838
838
839 def find_args(self):
839 def find_args(self):
840 return self.submit_command + [self.batch_file]
840 return self.submit_command + [self.batch_file]
841
841
842 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 def __init__(self, work_dir=u'.', config=None, **kwargs):
843 super(BatchSystemLauncher, self).__init__(
843 super(BatchSystemLauncher, self).__init__(
844 work_dir=work_dir, config=config, **kwargs
844 work_dir=work_dir, config=config, **kwargs
845 )
845 )
846 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
846 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
847
847
848 def parse_job_id(self, output):
848 def parse_job_id(self, output):
849 """Take the output of the submit command and return the job id."""
849 """Take the output of the submit command and return the job id."""
850 m = re.search(self.job_id_regexp, output)
850 m = re.search(self.job_id_regexp, output)
851 if m is not None:
851 if m is not None:
852 job_id = m.group()
852 job_id = m.group()
853 else:
853 else:
854 raise LauncherError("Job id couldn't be determined: %s" % output)
854 raise LauncherError("Job id couldn't be determined: %s" % output)
855 self.job_id = job_id
855 self.job_id = job_id
856 self.log.info('Job submitted with job id: %r' % job_id)
856 self.log.info('Job submitted with job id: %r' % job_id)
857 return job_id
857 return job_id
858
858
859 def write_batch_script(self, n):
859 def write_batch_script(self, n):
860 """Instantiate and write the batch script to the work_dir."""
860 """Instantiate and write the batch script to the work_dir."""
861 self.context['n'] = n
861 self.context['n'] = n
862 self.context['queue'] = self.queue
862 self.context['queue'] = self.queue
863 # first priority is batch_template if set
863 # first priority is batch_template if set
864 if self.batch_template_file and not self.batch_template:
864 if self.batch_template_file and not self.batch_template:
865 # second priority is batch_template_file
865 # second priority is batch_template_file
866 with open(self.batch_template_file) as f:
866 with open(self.batch_template_file) as f:
867 self.batch_template = f.read()
867 self.batch_template = f.read()
868 if not self.batch_template:
868 if not self.batch_template:
869 # third (last) priority is default_template
869 # third (last) priority is default_template
870 self.batch_template = self.default_template
870 self.batch_template = self.default_template
871
871
872 regex = re.compile(self.job_array_regexp)
872 # add jobarray or queue lines to user-specified template
873 # print regex.search(self.batch_template)
873 # note that this is *only* when user did not specify a template.
874 if not regex.search(self.batch_template):
874 regex = re.compile(self.job_array_regexp)
875 self.log.info("adding job array settings to batch script")
875 # print regex.search(self.batch_template)
876 firstline, rest = self.batch_template.split('\n',1)
876 if not regex.search(self.batch_template):
877 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
877 self.log.info("adding job array settings to batch script")
878 firstline, rest = self.batch_template.split('\n',1)
879 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
878
880
879 regex = re.compile(self.queue_regexp)
881 regex = re.compile(self.queue_regexp)
880 # print regex.search(self.batch_template)
882 # print regex.search(self.batch_template)
881 if self.queue and not regex.search(self.batch_template):
883 if self.queue and not regex.search(self.batch_template):
882 self.log.info("adding PBS queue settings to batch script")
884 self.log.info("adding PBS queue settings to batch script")
883 firstline, rest = self.batch_template.split('\n',1)
885 firstline, rest = self.batch_template.split('\n',1)
884 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
886 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
885
887
886 script_as_string = self.formatter.format(self.batch_template, **self.context)
888 script_as_string = self.formatter.format(self.batch_template, **self.context)
887 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
889 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
888
890
889 with open(self.batch_file, 'w') as f:
891 with open(self.batch_file, 'w') as f:
890 f.write(script_as_string)
892 f.write(script_as_string)
891 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
893 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
892
894
893 def start(self, n, profile_dir):
895 def start(self, n, profile_dir):
894 """Start n copies of the process using a batch system."""
896 """Start n copies of the process using a batch system."""
895 # Here we save profile_dir in the context so they
897 # Here we save profile_dir in the context so they
896 # can be used in the batch script template as {profile_dir}
898 # can be used in the batch script template as {profile_dir}
897 self.context['profile_dir'] = profile_dir
899 self.context['profile_dir'] = profile_dir
898 self.profile_dir = unicode(profile_dir)
900 self.profile_dir = unicode(profile_dir)
899 self.write_batch_script(n)
901 self.write_batch_script(n)
900 output = check_output(self.args, env=os.environ)
902 output = check_output(self.args, env=os.environ)
901
903
902 job_id = self.parse_job_id(output)
904 job_id = self.parse_job_id(output)
903 self.notify_start(job_id)
905 self.notify_start(job_id)
904 return job_id
906 return job_id
905
907
906 def stop(self):
908 def stop(self):
907 output = check_output(self.delete_command+[self.job_id], env=os.environ)
909 output = check_output(self.delete_command+[self.job_id], env=os.environ)
908 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
910 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
909 return output
911 return output
910
912
911
913
912 class PBSLauncher(BatchSystemLauncher):
914 class PBSLauncher(BatchSystemLauncher):
913 """A BatchSystemLauncher subclass for PBS."""
915 """A BatchSystemLauncher subclass for PBS."""
914
916
915 submit_command = List(['qsub'], config=True,
917 submit_command = List(['qsub'], config=True,
916 help="The PBS submit command ['qsub']")
918 help="The PBS submit command ['qsub']")
917 delete_command = List(['qdel'], config=True,
919 delete_command = List(['qdel'], config=True,
918 help="The PBS delete command ['qsub']")
920 help="The PBS delete command ['qsub']")
919 job_id_regexp = Unicode(r'\d+', config=True,
921 job_id_regexp = Unicode(r'\d+', config=True,
920 help="Regular expresion for identifying the job ID [r'\d+']")
922 help="Regular expresion for identifying the job ID [r'\d+']")
921
923
922 batch_file = Unicode(u'')
924 batch_file = Unicode(u'')
923 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
925 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
924 job_array_template = Unicode('#PBS -t 1-{n}')
926 job_array_template = Unicode('#PBS -t 1-{n}')
925 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
927 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
926 queue_template = Unicode('#PBS -q {queue}')
928 queue_template = Unicode('#PBS -q {queue}')
927
929
928
930
929 class PBSControllerLauncher(PBSLauncher):
931 class PBSControllerLauncher(PBSLauncher):
930 """Launch a controller using PBS."""
932 """Launch a controller using PBS."""
931
933
932 batch_file_name = Unicode(u'pbs_controller', config=True,
934 batch_file_name = Unicode(u'pbs_controller', config=True,
933 help="batch file name for the controller job.")
935 help="batch file name for the controller job.")
934 default_template= Unicode("""#!/bin/sh
936 default_template= Unicode("""#!/bin/sh
935 #PBS -V
937 #PBS -V
936 #PBS -N ipcontroller
938 #PBS -N ipcontroller
937 %s --log-to-file profile_dir={profile_dir}
939 %s --log-to-file profile_dir={profile_dir}
938 """%(' '.join(ipcontroller_cmd_argv)))
940 """%(' '.join(ipcontroller_cmd_argv)))
939
941
940 def start(self, profile_dir):
942 def start(self, profile_dir):
941 """Start the controller by profile or profile_dir."""
943 """Start the controller by profile or profile_dir."""
942 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
944 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
943 return super(PBSControllerLauncher, self).start(1, profile_dir)
945 return super(PBSControllerLauncher, self).start(1, profile_dir)
944
946
945
947
946 class PBSEngineSetLauncher(PBSLauncher):
948 class PBSEngineSetLauncher(PBSLauncher):
947 """Launch Engines using PBS"""
949 """Launch Engines using PBS"""
948 batch_file_name = Unicode(u'pbs_engines', config=True,
950 batch_file_name = Unicode(u'pbs_engines', config=True,
949 help="batch file name for the engine(s) job.")
951 help="batch file name for the engine(s) job.")
950 default_template= Unicode(u"""#!/bin/sh
952 default_template= Unicode(u"""#!/bin/sh
951 #PBS -V
953 #PBS -V
952 #PBS -N ipengine
954 #PBS -N ipengine
953 %s profile_dir={profile_dir}
955 %s profile_dir={profile_dir}
954 """%(' '.join(ipengine_cmd_argv)))
956 """%(' '.join(ipengine_cmd_argv)))
955
957
956 def start(self, n, profile_dir):
958 def start(self, n, profile_dir):
957 """Start n engines by profile or profile_dir."""
959 """Start n engines by profile or profile_dir."""
958 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
960 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
959 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
961 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
960
962
961 #SGE is very similar to PBS
963 #SGE is very similar to PBS
962
964
963 class SGELauncher(PBSLauncher):
965 class SGELauncher(PBSLauncher):
964 """Sun GridEngine is a PBS clone with slightly different syntax"""
966 """Sun GridEngine is a PBS clone with slightly different syntax"""
965 job_array_regexp = Unicode('#\$\W+\-t')
967 job_array_regexp = Unicode('#\$\W+\-t')
966 job_array_template = Unicode('#$ -t 1-{n}')
968 job_array_template = Unicode('#$ -t 1-{n}')
967 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
969 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
968 queue_template = Unicode('#$ -q {queue}')
970 queue_template = Unicode('#$ -q {queue}')
969
971
970 class SGEControllerLauncher(SGELauncher):
972 class SGEControllerLauncher(SGELauncher):
971 """Launch a controller using SGE."""
973 """Launch a controller using SGE."""
972
974
973 batch_file_name = Unicode(u'sge_controller', config=True,
975 batch_file_name = Unicode(u'sge_controller', config=True,
974 help="batch file name for the ipontroller job.")
976 help="batch file name for the ipontroller job.")
975 default_template= Unicode(u"""#$ -V
977 default_template= Unicode(u"""#$ -V
976 #$ -S /bin/sh
978 #$ -S /bin/sh
977 #$ -N ipcontroller
979 #$ -N ipcontroller
978 %s --log-to-file profile_dir={profile_dir}
980 %s --log-to-file profile_dir={profile_dir}
979 """%(' '.join(ipcontroller_cmd_argv)))
981 """%(' '.join(ipcontroller_cmd_argv)))
980
982
981 def start(self, profile_dir):
983 def start(self, profile_dir):
982 """Start the controller by profile or profile_dir."""
984 """Start the controller by profile or profile_dir."""
983 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
985 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
984 return super(SGEControllerLauncher, self).start(1, profile_dir)
986 return super(SGEControllerLauncher, self).start(1, profile_dir)
985
987
986 class SGEEngineSetLauncher(SGELauncher):
988 class SGEEngineSetLauncher(SGELauncher):
987 """Launch Engines with SGE"""
989 """Launch Engines with SGE"""
988 batch_file_name = Unicode(u'sge_engines', config=True,
990 batch_file_name = Unicode(u'sge_engines', config=True,
989 help="batch file name for the engine(s) job.")
991 help="batch file name for the engine(s) job.")
990 default_template = Unicode("""#$ -V
992 default_template = Unicode("""#$ -V
991 #$ -S /bin/sh
993 #$ -S /bin/sh
992 #$ -N ipengine
994 #$ -N ipengine
993 %s profile_dir={profile_dir}
995 %s profile_dir={profile_dir}
994 """%(' '.join(ipengine_cmd_argv)))
996 """%(' '.join(ipengine_cmd_argv)))
995
997
996 def start(self, n, profile_dir):
998 def start(self, n, profile_dir):
997 """Start n engines by profile or profile_dir."""
999 """Start n engines by profile or profile_dir."""
998 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1000 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
999 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1001 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1000
1002
1001
1003
1002 #-----------------------------------------------------------------------------
1004 #-----------------------------------------------------------------------------
1003 # A launcher for ipcluster itself!
1005 # A launcher for ipcluster itself!
1004 #-----------------------------------------------------------------------------
1006 #-----------------------------------------------------------------------------
1005
1007
1006
1008
1007 class IPClusterLauncher(LocalProcessLauncher):
1009 class IPClusterLauncher(LocalProcessLauncher):
1008 """Launch the ipcluster program in an external process."""
1010 """Launch the ipcluster program in an external process."""
1009
1011
1010 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1012 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1011 help="Popen command for ipcluster")
1013 help="Popen command for ipcluster")
1012 ipcluster_args = List(
1014 ipcluster_args = List(
1013 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1015 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1014 help="Command line arguments to pass to ipcluster.")
1016 help="Command line arguments to pass to ipcluster.")
1015 ipcluster_subcommand = Unicode('start')
1017 ipcluster_subcommand = Unicode('start')
1016 ipcluster_n = Int(2)
1018 ipcluster_n = Int(2)
1017
1019
1018 def find_args(self):
1020 def find_args(self):
1019 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1021 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1020 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1022 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1021
1023
1022 def start(self):
1024 def start(self):
1023 self.log.info("Starting ipcluster: %r" % self.args)
1025 self.log.info("Starting ipcluster: %r" % self.args)
1024 return super(IPClusterLauncher, self).start()
1026 return super(IPClusterLauncher, self).start()
1025
1027
1026 #-----------------------------------------------------------------------------
1028 #-----------------------------------------------------------------------------
1027 # Collections of launchers
1029 # Collections of launchers
1028 #-----------------------------------------------------------------------------
1030 #-----------------------------------------------------------------------------
1029
1031
1030 local_launchers = [
1032 local_launchers = [
1031 LocalControllerLauncher,
1033 LocalControllerLauncher,
1032 LocalEngineLauncher,
1034 LocalEngineLauncher,
1033 LocalEngineSetLauncher,
1035 LocalEngineSetLauncher,
1034 ]
1036 ]
1035 mpi_launchers = [
1037 mpi_launchers = [
1036 MPIExecLauncher,
1038 MPIExecLauncher,
1037 MPIExecControllerLauncher,
1039 MPIExecControllerLauncher,
1038 MPIExecEngineSetLauncher,
1040 MPIExecEngineSetLauncher,
1039 ]
1041 ]
1040 ssh_launchers = [
1042 ssh_launchers = [
1041 SSHLauncher,
1043 SSHLauncher,
1042 SSHControllerLauncher,
1044 SSHControllerLauncher,
1043 SSHEngineLauncher,
1045 SSHEngineLauncher,
1044 SSHEngineSetLauncher,
1046 SSHEngineSetLauncher,
1045 ]
1047 ]
1046 winhpc_launchers = [
1048 winhpc_launchers = [
1047 WindowsHPCLauncher,
1049 WindowsHPCLauncher,
1048 WindowsHPCControllerLauncher,
1050 WindowsHPCControllerLauncher,
1049 WindowsHPCEngineSetLauncher,
1051 WindowsHPCEngineSetLauncher,
1050 ]
1052 ]
1051 pbs_launchers = [
1053 pbs_launchers = [
1052 PBSLauncher,
1054 PBSLauncher,
1053 PBSControllerLauncher,
1055 PBSControllerLauncher,
1054 PBSEngineSetLauncher,
1056 PBSEngineSetLauncher,
1055 ]
1057 ]
1056 sge_launchers = [
1058 sge_launchers = [
1057 SGELauncher,
1059 SGELauncher,
1058 SGEControllerLauncher,
1060 SGEControllerLauncher,
1059 SGEEngineSetLauncher,
1061 SGEEngineSetLauncher,
1060 ]
1062 ]
1061 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1063 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1062 + pbs_launchers + sge_launchers
1064 + pbs_launchers + sge_launchers
1063
1065
General Comments 0
You need to be logged in to leave comments. Login now