##// END OF EJS Templates
interrupt windows subprocesses with CTRL-C instead of SIGINT...
MinRK -
Show More
@@ -1,985 +1,994 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 from signal import SIGINT, SIGTERM
24 from signal import SIGINT, SIGTERM
25 try:
25 try:
26 from signal import SIGKILL
26 from signal import SIGKILL
27 except ImportError:
27 except ImportError:
28 # windows
28 SIGKILL=SIGTERM
29 SIGKILL=SIGTERM
29
30
30 from subprocess import Popen, PIPE, STDOUT
31 from subprocess import Popen, PIPE, STDOUT
31 try:
32 try:
32 from subprocess import check_output
33 from subprocess import check_output
33 except ImportError:
34 except ImportError:
34 # pre-2.7, define check_output with Popen
35 # pre-2.7, define check_output with Popen
35 def check_output(*args, **kwargs):
36 def check_output(*args, **kwargs):
36 kwargs.update(dict(stdout=PIPE))
37 kwargs.update(dict(stdout=PIPE))
37 p = Popen(*args, **kwargs)
38 p = Popen(*args, **kwargs)
38 out,err = p.communicate()
39 out,err = p.communicate()
39 return out
40 return out
40
41
41 from zmq.eventloop import ioloop
42 from zmq.eventloop import ioloop
42
43
43 from IPython.external import Itpl
44 from IPython.external import Itpl
44 # from IPython.config.configurable import Configurable
45 # from IPython.config.configurable import Configurable
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 from IPython.utils.path import get_ipython_module_path
47 from IPython.utils.path import get_ipython_module_path
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48
49
49 from IPython.parallel.factory import LoggingFactory
50 from IPython.parallel.factory import LoggingFactory
50
51
51 from .win32support import forward_read_events
52 from .win32support import forward_read_events
52
53
53 # load winhpcjob only on Windows
54 # load winhpcjob only on Windows
54 try:
55 try:
55 from .winhpcjob import (
56 from .winhpcjob import (
56 IPControllerTask, IPEngineTask,
57 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
58 IPControllerJob, IPEngineSetJob
58 )
59 )
59 except ImportError:
60 except ImportError:
60 pass
61 pass
61
62
62 WINDOWS = os.name == 'nt'
63 WINDOWS = os.name == 'nt'
64
65 if WINDOWS:
66 try:
67 # >= 2.7, 3.2
68 from signal import CTRL_C_EVENT as SIGINT
69 except ImportError:
70 pass
71
63 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
64 # Paths to the kernel apps
73 # Paths to the kernel apps
65 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
66
75
67
76
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
69 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
70 ))
79 ))
71
80
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
73 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
74 ))
83 ))
75
84
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
77 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
78 ))
87 ))
79
88
80 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
81 # Base launchers and errors
90 # Base launchers and errors
82 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
83
92
84
93
85 class LauncherError(Exception):
94 class LauncherError(Exception):
86 pass
95 pass
87
96
88
97
89 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
90 pass
99 pass
91
100
92
101
93 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
94 pass
103 pass
95
104
96
105
97 class BaseLauncher(LoggingFactory):
106 class BaseLauncher(LoggingFactory):
98 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
99
108
100 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
110 # run. This will usually be the cluster_dir, but may not be. any work_dir
102 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
103 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
104 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
105 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
106 # the --work-dir option.
115 # the --work-dir option.
107 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109
118
110 start_data = Any()
119 start_data = Any()
111 stop_data = Any()
120 stop_data = Any()
112
121
113 def _loop_default(self):
122 def _loop_default(self):
114 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
115
124
116 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
117 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
118 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
119 self.stop_callbacks = []
128 self.stop_callbacks = []
120 self.start_data = None
129 self.start_data = None
121 self.stop_data = None
130 self.stop_data = None
122
131
123 @property
132 @property
124 def args(self):
133 def args(self):
125 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
126
135
127 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
128 will be the process name.
137 will be the process name.
129 """
138 """
130 return self.find_args()
139 return self.find_args()
131
140
132 def find_args(self):
141 def find_args(self):
133 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
134
143
135 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
136 """
145 """
137 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
138
147
139 @property
148 @property
140 def arg_str(self):
149 def arg_str(self):
141 """The string form of the program arguments."""
150 """The string form of the program arguments."""
142 return ' '.join(self.args)
151 return ' '.join(self.args)
143
152
144 @property
153 @property
145 def running(self):
154 def running(self):
146 """Am I running."""
155 """Am I running."""
147 if self.state == 'running':
156 if self.state == 'running':
148 return True
157 return True
149 else:
158 else:
150 return False
159 return False
151
160
152 def start(self):
161 def start(self):
153 """Start the process.
162 """Start the process.
154
163
155 This must return a deferred that fires with information about the
164 This must return a deferred that fires with information about the
156 process starting (like a pid, job id, etc.).
165 process starting (like a pid, job id, etc.).
157 """
166 """
158 raise NotImplementedError('start must be implemented in a subclass')
167 raise NotImplementedError('start must be implemented in a subclass')
159
168
160 def stop(self):
169 def stop(self):
161 """Stop the process and notify observers of stopping.
170 """Stop the process and notify observers of stopping.
162
171
163 This must return a deferred that fires with information about the
172 This must return a deferred that fires with information about the
164 processing stopping, like errors that occur while the process is
173 processing stopping, like errors that occur while the process is
165 attempting to be shut down. This deferred won't fire when the process
174 attempting to be shut down. This deferred won't fire when the process
166 actually stops. To observe the actual process stopping, see
175 actually stops. To observe the actual process stopping, see
167 :func:`observe_stop`.
176 :func:`observe_stop`.
168 """
177 """
169 raise NotImplementedError('stop must be implemented in a subclass')
178 raise NotImplementedError('stop must be implemented in a subclass')
170
179
171 def on_stop(self, f):
180 def on_stop(self, f):
172 """Get a deferred that will fire when the process stops.
181 """Get a deferred that will fire when the process stops.
173
182
174 The deferred will fire with data that contains information about
183 The deferred will fire with data that contains information about
175 the exit status of the process.
184 the exit status of the process.
176 """
185 """
177 if self.state=='after':
186 if self.state=='after':
178 return f(self.stop_data)
187 return f(self.stop_data)
179 else:
188 else:
180 self.stop_callbacks.append(f)
189 self.stop_callbacks.append(f)
181
190
182 def notify_start(self, data):
191 def notify_start(self, data):
183 """Call this to trigger startup actions.
192 """Call this to trigger startup actions.
184
193
185 This logs the process startup and sets the state to 'running'. It is
194 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
195 a pass-through so it can be used as a callback.
187 """
196 """
188
197
189 self.log.info('Process %r started: %r' % (self.args[0], data))
198 self.log.info('Process %r started: %r' % (self.args[0], data))
190 self.start_data = data
199 self.start_data = data
191 self.state = 'running'
200 self.state = 'running'
192 return data
201 return data
193
202
194 def notify_stop(self, data):
203 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
204 """Call this to trigger process stop actions.
196
205
197 This logs the process stopping and sets the state to 'after'. Call
206 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger all the deferreds from :func:`observe_stop`."""
207 this to trigger all the deferreds from :func:`observe_stop`."""
199
208
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
209 self.log.info('Process %r stopped: %r' % (self.args[0], data))
201 self.stop_data = data
210 self.stop_data = data
202 self.state = 'after'
211 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
212 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
213 d = self.stop_callbacks.pop()
205 d(data)
214 d(data)
206 return data
215 return data
207
216
208 def signal(self, sig):
217 def signal(self, sig):
209 """Signal the process.
218 """Signal the process.
210
219
211 Return a semi-meaningless deferred after signaling the process.
220 Return a semi-meaningless deferred after signaling the process.
212
221
213 Parameters
222 Parameters
214 ----------
223 ----------
215 sig : str or int
224 sig : str or int
216 'KILL', 'INT', etc., or any signal number
225 'KILL', 'INT', etc., or any signal number
217 """
226 """
218 raise NotImplementedError('signal must be implemented in a subclass')
227 raise NotImplementedError('signal must be implemented in a subclass')
219
228
220
229
221 #-----------------------------------------------------------------------------
230 #-----------------------------------------------------------------------------
222 # Local process launchers
231 # Local process launchers
223 #-----------------------------------------------------------------------------
232 #-----------------------------------------------------------------------------
224
233
225
234
226 class LocalProcessLauncher(BaseLauncher):
235 class LocalProcessLauncher(BaseLauncher):
227 """Start and stop an external process in an asynchronous manner.
236 """Start and stop an external process in an asynchronous manner.
228
237
229 This will launch the external process with a working directory of
238 This will launch the external process with a working directory of
230 ``self.work_dir``.
239 ``self.work_dir``.
231 """
240 """
232
241
233 # This is used to to construct self.args, which is passed to
242 # This is used to to construct self.args, which is passed to
234 # spawnProcess.
243 # spawnProcess.
235 cmd_and_args = List([])
244 cmd_and_args = List([])
236 poll_frequency = Int(100) # in ms
245 poll_frequency = Int(100) # in ms
237
246
238 def __init__(self, work_dir=u'.', config=None, **kwargs):
247 def __init__(self, work_dir=u'.', config=None, **kwargs):
239 super(LocalProcessLauncher, self).__init__(
248 super(LocalProcessLauncher, self).__init__(
240 work_dir=work_dir, config=config, **kwargs
249 work_dir=work_dir, config=config, **kwargs
241 )
250 )
242 self.process = None
251 self.process = None
243 self.start_deferred = None
252 self.start_deferred = None
244 self.poller = None
253 self.poller = None
245
254
246 def find_args(self):
255 def find_args(self):
247 return self.cmd_and_args
256 return self.cmd_and_args
248
257
249 def start(self):
258 def start(self):
250 if self.state == 'before':
259 if self.state == 'before':
251 self.process = Popen(self.args,
260 self.process = Popen(self.args,
252 stdout=PIPE,stderr=PIPE,stdin=PIPE,
261 stdout=PIPE,stderr=PIPE,stdin=PIPE,
253 env=os.environ,
262 env=os.environ,
254 cwd=self.work_dir
263 cwd=self.work_dir
255 )
264 )
256 if WINDOWS:
265 if WINDOWS:
257 self.stdout = forward_read_events(self.process.stdout)
266 self.stdout = forward_read_events(self.process.stdout)
258 self.stderr = forward_read_events(self.process.stderr)
267 self.stderr = forward_read_events(self.process.stderr)
259 else:
268 else:
260 self.stdout = self.process.stdout.fileno()
269 self.stdout = self.process.stdout.fileno()
261 self.stderr = self.process.stderr.fileno()
270 self.stderr = self.process.stderr.fileno()
262 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
271 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
263 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
272 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
264 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
273 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
265 self.poller.start()
274 self.poller.start()
266 self.notify_start(self.process.pid)
275 self.notify_start(self.process.pid)
267 else:
276 else:
268 s = 'The process was already started and has state: %r' % self.state
277 s = 'The process was already started and has state: %r' % self.state
269 raise ProcessStateError(s)
278 raise ProcessStateError(s)
270
279
271 def stop(self):
280 def stop(self):
272 return self.interrupt_then_kill()
281 return self.interrupt_then_kill()
273
282
274 def signal(self, sig):
283 def signal(self, sig):
275 if self.state == 'running':
284 if self.state == 'running':
276 self.process.send_signal(sig)
285 self.process.send_signal(sig)
277
286
278 def interrupt_then_kill(self, delay=2.0):
287 def interrupt_then_kill(self, delay=2.0):
279 """Send INT, wait a delay and then send KILL."""
288 """Send INT, wait a delay and then send KILL."""
280 self.signal(SIGINT)
289 self.signal(SIGINT)
281 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
290 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
282 self.killer.start()
291 self.killer.start()
283
292
284 # callbacks, etc:
293 # callbacks, etc:
285
294
286 def handle_stdout(self, fd, events):
295 def handle_stdout(self, fd, events):
287 if WINDOWS:
296 if WINDOWS:
288 line = self.stdout.recv()
297 line = self.stdout.recv()
289 else:
298 else:
290 line = self.process.stdout.readline()
299 line = self.process.stdout.readline()
291 # a stopped process will be readable but return empty strings
300 # a stopped process will be readable but return empty strings
292 if line:
301 if line:
293 self.log.info(line[:-1])
302 self.log.info(line[:-1])
294 else:
303 else:
295 self.poll()
304 self.poll()
296
305
297 def handle_stderr(self, fd, events):
306 def handle_stderr(self, fd, events):
298 if WINDOWS:
307 if WINDOWS:
299 line = self.stderr.recv()
308 line = self.stderr.recv()
300 else:
309 else:
301 line = self.process.stderr.readline()
310 line = self.process.stderr.readline()
302 # a stopped process will be readable but return empty strings
311 # a stopped process will be readable but return empty strings
303 if line:
312 if line:
304 self.log.error(line[:-1])
313 self.log.error(line[:-1])
305 else:
314 else:
306 self.poll()
315 self.poll()
307
316
308 def poll(self):
317 def poll(self):
309 status = self.process.poll()
318 status = self.process.poll()
310 if status is not None:
319 if status is not None:
311 self.poller.stop()
320 self.poller.stop()
312 self.loop.remove_handler(self.stdout)
321 self.loop.remove_handler(self.stdout)
313 self.loop.remove_handler(self.stderr)
322 self.loop.remove_handler(self.stderr)
314 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
323 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
315 return status
324 return status
316
325
317 class LocalControllerLauncher(LocalProcessLauncher):
326 class LocalControllerLauncher(LocalProcessLauncher):
318 """Launch a controller as a regular external process."""
327 """Launch a controller as a regular external process."""
319
328
320 controller_cmd = List(ipcontroller_cmd_argv, config=True)
329 controller_cmd = List(ipcontroller_cmd_argv, config=True)
321 # Command line arguments to ipcontroller.
330 # Command line arguments to ipcontroller.
322 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
331 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
323
332
324 def find_args(self):
333 def find_args(self):
325 return self.controller_cmd + self.controller_args
334 return self.controller_cmd + self.controller_args
326
335
327 def start(self, cluster_dir):
336 def start(self, cluster_dir):
328 """Start the controller by cluster_dir."""
337 """Start the controller by cluster_dir."""
329 self.controller_args.extend(['--cluster-dir', cluster_dir])
338 self.controller_args.extend(['--cluster-dir', cluster_dir])
330 self.cluster_dir = unicode(cluster_dir)
339 self.cluster_dir = unicode(cluster_dir)
331 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
340 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
332 return super(LocalControllerLauncher, self).start()
341 return super(LocalControllerLauncher, self).start()
333
342
334
343
335 class LocalEngineLauncher(LocalProcessLauncher):
344 class LocalEngineLauncher(LocalProcessLauncher):
336 """Launch a single engine as a regular externall process."""
345 """Launch a single engine as a regular externall process."""
337
346
338 engine_cmd = List(ipengine_cmd_argv, config=True)
347 engine_cmd = List(ipengine_cmd_argv, config=True)
339 # Command line arguments for ipengine.
348 # Command line arguments for ipengine.
340 engine_args = List(
349 engine_args = List(
341 ['--log-to-file','--log-level', str(logging.INFO)], config=True
350 ['--log-to-file','--log-level', str(logging.INFO)], config=True
342 )
351 )
343
352
344 def find_args(self):
353 def find_args(self):
345 return self.engine_cmd + self.engine_args
354 return self.engine_cmd + self.engine_args
346
355
347 def start(self, cluster_dir):
356 def start(self, cluster_dir):
348 """Start the engine by cluster_dir."""
357 """Start the engine by cluster_dir."""
349 self.engine_args.extend(['--cluster-dir', cluster_dir])
358 self.engine_args.extend(['--cluster-dir', cluster_dir])
350 self.cluster_dir = unicode(cluster_dir)
359 self.cluster_dir = unicode(cluster_dir)
351 return super(LocalEngineLauncher, self).start()
360 return super(LocalEngineLauncher, self).start()
352
361
353
362
354 class LocalEngineSetLauncher(BaseLauncher):
363 class LocalEngineSetLauncher(BaseLauncher):
355 """Launch a set of engines as regular external processes."""
364 """Launch a set of engines as regular external processes."""
356
365
357 # Command line arguments for ipengine.
366 # Command line arguments for ipengine.
358 engine_args = List(
367 engine_args = List(
359 ['--log-to-file','--log-level', str(logging.INFO)], config=True
368 ['--log-to-file','--log-level', str(logging.INFO)], config=True
360 )
369 )
361 # launcher class
370 # launcher class
362 launcher_class = LocalEngineLauncher
371 launcher_class = LocalEngineLauncher
363
372
364 launchers = Dict()
373 launchers = Dict()
365 stop_data = Dict()
374 stop_data = Dict()
366
375
367 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
368 super(LocalEngineSetLauncher, self).__init__(
377 super(LocalEngineSetLauncher, self).__init__(
369 work_dir=work_dir, config=config, **kwargs
378 work_dir=work_dir, config=config, **kwargs
370 )
379 )
371 self.stop_data = {}
380 self.stop_data = {}
372
381
373 def start(self, n, cluster_dir):
382 def start(self, n, cluster_dir):
374 """Start n engines by profile or cluster_dir."""
383 """Start n engines by profile or cluster_dir."""
375 self.cluster_dir = unicode(cluster_dir)
384 self.cluster_dir = unicode(cluster_dir)
376 dlist = []
385 dlist = []
377 for i in range(n):
386 for i in range(n):
378 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
387 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
379 # Copy the engine args over to each engine launcher.
388 # Copy the engine args over to each engine launcher.
380 el.engine_args = copy.deepcopy(self.engine_args)
389 el.engine_args = copy.deepcopy(self.engine_args)
381 el.on_stop(self._notice_engine_stopped)
390 el.on_stop(self._notice_engine_stopped)
382 d = el.start(cluster_dir)
391 d = el.start(cluster_dir)
383 if i==0:
392 if i==0:
384 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
393 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
385 self.launchers[i] = el
394 self.launchers[i] = el
386 dlist.append(d)
395 dlist.append(d)
387 self.notify_start(dlist)
396 self.notify_start(dlist)
388 # The consumeErrors here could be dangerous
397 # The consumeErrors here could be dangerous
389 # dfinal = gatherBoth(dlist, consumeErrors=True)
398 # dfinal = gatherBoth(dlist, consumeErrors=True)
390 # dfinal.addCallback(self.notify_start)
399 # dfinal.addCallback(self.notify_start)
391 return dlist
400 return dlist
392
401
393 def find_args(self):
402 def find_args(self):
394 return ['engine set']
403 return ['engine set']
395
404
396 def signal(self, sig):
405 def signal(self, sig):
397 dlist = []
406 dlist = []
398 for el in self.launchers.itervalues():
407 for el in self.launchers.itervalues():
399 d = el.signal(sig)
408 d = el.signal(sig)
400 dlist.append(d)
409 dlist.append(d)
401 # dfinal = gatherBoth(dlist, consumeErrors=True)
410 # dfinal = gatherBoth(dlist, consumeErrors=True)
402 return dlist
411 return dlist
403
412
404 def interrupt_then_kill(self, delay=1.0):
413 def interrupt_then_kill(self, delay=1.0):
405 dlist = []
414 dlist = []
406 for el in self.launchers.itervalues():
415 for el in self.launchers.itervalues():
407 d = el.interrupt_then_kill(delay)
416 d = el.interrupt_then_kill(delay)
408 dlist.append(d)
417 dlist.append(d)
409 # dfinal = gatherBoth(dlist, consumeErrors=True)
418 # dfinal = gatherBoth(dlist, consumeErrors=True)
410 return dlist
419 return dlist
411
420
412 def stop(self):
421 def stop(self):
413 return self.interrupt_then_kill()
422 return self.interrupt_then_kill()
414
423
415 def _notice_engine_stopped(self, data):
424 def _notice_engine_stopped(self, data):
416 pid = data['pid']
425 pid = data['pid']
417 for idx,el in self.launchers.iteritems():
426 for idx,el in self.launchers.iteritems():
418 if el.process.pid == pid:
427 if el.process.pid == pid:
419 break
428 break
420 self.launchers.pop(idx)
429 self.launchers.pop(idx)
421 self.stop_data[idx] = data
430 self.stop_data[idx] = data
422 if not self.launchers:
431 if not self.launchers:
423 self.notify_stop(self.stop_data)
432 self.notify_stop(self.stop_data)
424
433
425
434
426 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
427 # MPIExec launchers
436 # MPIExec launchers
428 #-----------------------------------------------------------------------------
437 #-----------------------------------------------------------------------------
429
438
430
439
431 class MPIExecLauncher(LocalProcessLauncher):
440 class MPIExecLauncher(LocalProcessLauncher):
432 """Launch an external process using mpiexec."""
441 """Launch an external process using mpiexec."""
433
442
434 # The mpiexec command to use in starting the process.
443 # The mpiexec command to use in starting the process.
435 mpi_cmd = List(['mpiexec'], config=True)
444 mpi_cmd = List(['mpiexec'], config=True)
436 # The command line arguments to pass to mpiexec.
445 # The command line arguments to pass to mpiexec.
437 mpi_args = List([], config=True)
446 mpi_args = List([], config=True)
438 # The program to start using mpiexec.
447 # The program to start using mpiexec.
439 program = List(['date'], config=True)
448 program = List(['date'], config=True)
440 # The command line argument to the program.
449 # The command line argument to the program.
441 program_args = List([], config=True)
450 program_args = List([], config=True)
442 # The number of instances of the program to start.
451 # The number of instances of the program to start.
443 n = Int(1, config=True)
452 n = Int(1, config=True)
444
453
445 def find_args(self):
454 def find_args(self):
446 """Build self.args using all the fields."""
455 """Build self.args using all the fields."""
447 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
448 self.program + self.program_args
457 self.program + self.program_args
449
458
450 def start(self, n):
459 def start(self, n):
451 """Start n instances of the program using mpiexec."""
460 """Start n instances of the program using mpiexec."""
452 self.n = n
461 self.n = n
453 return super(MPIExecLauncher, self).start()
462 return super(MPIExecLauncher, self).start()
454
463
455
464
456 class MPIExecControllerLauncher(MPIExecLauncher):
465 class MPIExecControllerLauncher(MPIExecLauncher):
457 """Launch a controller using mpiexec."""
466 """Launch a controller using mpiexec."""
458
467
459 controller_cmd = List(ipcontroller_cmd_argv, config=True)
468 controller_cmd = List(ipcontroller_cmd_argv, config=True)
460 # Command line arguments to ipcontroller.
469 # Command line arguments to ipcontroller.
461 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
470 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
462 n = Int(1, config=False)
471 n = Int(1, config=False)
463
472
464 def start(self, cluster_dir):
473 def start(self, cluster_dir):
465 """Start the controller by cluster_dir."""
474 """Start the controller by cluster_dir."""
466 self.controller_args.extend(['--cluster-dir', cluster_dir])
475 self.controller_args.extend(['--cluster-dir', cluster_dir])
467 self.cluster_dir = unicode(cluster_dir)
476 self.cluster_dir = unicode(cluster_dir)
468 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
477 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
469 return super(MPIExecControllerLauncher, self).start(1)
478 return super(MPIExecControllerLauncher, self).start(1)
470
479
471 def find_args(self):
480 def find_args(self):
472 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
473 self.controller_cmd + self.controller_args
482 self.controller_cmd + self.controller_args
474
483
475
484
476 class MPIExecEngineSetLauncher(MPIExecLauncher):
485 class MPIExecEngineSetLauncher(MPIExecLauncher):
477
486
478 program = List(ipengine_cmd_argv, config=True)
487 program = List(ipengine_cmd_argv, config=True)
479 # Command line arguments for ipengine.
488 # Command line arguments for ipengine.
480 program_args = List(
489 program_args = List(
481 ['--log-to-file','--log-level', str(logging.INFO)], config=True
490 ['--log-to-file','--log-level', str(logging.INFO)], config=True
482 )
491 )
483 n = Int(1, config=True)
492 n = Int(1, config=True)
484
493
485 def start(self, n, cluster_dir):
494 def start(self, n, cluster_dir):
486 """Start n engines by profile or cluster_dir."""
495 """Start n engines by profile or cluster_dir."""
487 self.program_args.extend(['--cluster-dir', cluster_dir])
496 self.program_args.extend(['--cluster-dir', cluster_dir])
488 self.cluster_dir = unicode(cluster_dir)
497 self.cluster_dir = unicode(cluster_dir)
489 self.n = n
498 self.n = n
490 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
499 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
491 return super(MPIExecEngineSetLauncher, self).start(n)
500 return super(MPIExecEngineSetLauncher, self).start(n)
492
501
493 #-----------------------------------------------------------------------------
502 #-----------------------------------------------------------------------------
494 # SSH launchers
503 # SSH launchers
495 #-----------------------------------------------------------------------------
504 #-----------------------------------------------------------------------------
496
505
497 # TODO: Get SSH Launcher working again.
506 # TODO: Get SSH Launcher working again.
498
507
499 class SSHLauncher(LocalProcessLauncher):
508 class SSHLauncher(LocalProcessLauncher):
500 """A minimal launcher for ssh.
509 """A minimal launcher for ssh.
501
510
502 To be useful this will probably have to be extended to use the ``sshx``
511 To be useful this will probably have to be extended to use the ``sshx``
503 idea for environment variables. There could be other things this needs
512 idea for environment variables. There could be other things this needs
504 as well.
513 as well.
505 """
514 """
506
515
507 ssh_cmd = List(['ssh'], config=True)
516 ssh_cmd = List(['ssh'], config=True)
508 ssh_args = List(['-tt'], config=True)
517 ssh_args = List(['-tt'], config=True)
509 program = List(['date'], config=True)
518 program = List(['date'], config=True)
510 program_args = List([], config=True)
519 program_args = List([], config=True)
511 hostname = CUnicode('', config=True)
520 hostname = CUnicode('', config=True)
512 user = CUnicode('', config=True)
521 user = CUnicode('', config=True)
513 location = CUnicode('')
522 location = CUnicode('')
514
523
515 def _hostname_changed(self, name, old, new):
524 def _hostname_changed(self, name, old, new):
516 if self.user:
525 if self.user:
517 self.location = u'%s@%s' % (self.user, new)
526 self.location = u'%s@%s' % (self.user, new)
518 else:
527 else:
519 self.location = new
528 self.location = new
520
529
521 def _user_changed(self, name, old, new):
530 def _user_changed(self, name, old, new):
522 self.location = u'%s@%s' % (new, self.hostname)
531 self.location = u'%s@%s' % (new, self.hostname)
523
532
524 def find_args(self):
533 def find_args(self):
525 return self.ssh_cmd + self.ssh_args + [self.location] + \
534 return self.ssh_cmd + self.ssh_args + [self.location] + \
526 self.program + self.program_args
535 self.program + self.program_args
527
536
528 def start(self, cluster_dir, hostname=None, user=None):
537 def start(self, cluster_dir, hostname=None, user=None):
529 self.cluster_dir = unicode(cluster_dir)
538 self.cluster_dir = unicode(cluster_dir)
530 if hostname is not None:
539 if hostname is not None:
531 self.hostname = hostname
540 self.hostname = hostname
532 if user is not None:
541 if user is not None:
533 self.user = user
542 self.user = user
534
543
535 return super(SSHLauncher, self).start()
544 return super(SSHLauncher, self).start()
536
545
537 def signal(self, sig):
546 def signal(self, sig):
538 if self.state == 'running':
547 if self.state == 'running':
539 # send escaped ssh connection-closer
548 # send escaped ssh connection-closer
540 self.process.stdin.write('~.')
549 self.process.stdin.write('~.')
541 self.process.stdin.flush()
550 self.process.stdin.flush()
542
551
543
552
544
553
545 class SSHControllerLauncher(SSHLauncher):
554 class SSHControllerLauncher(SSHLauncher):
546
555
547 program = List(ipcontroller_cmd_argv, config=True)
556 program = List(ipcontroller_cmd_argv, config=True)
548 # Command line arguments to ipcontroller.
557 # Command line arguments to ipcontroller.
549 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
558 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
550
559
551
560
552 class SSHEngineLauncher(SSHLauncher):
561 class SSHEngineLauncher(SSHLauncher):
553 program = List(ipengine_cmd_argv, config=True)
562 program = List(ipengine_cmd_argv, config=True)
554 # Command line arguments for ipengine.
563 # Command line arguments for ipengine.
555 program_args = List(
564 program_args = List(
556 ['--log-to-file','--log-level', str(logging.INFO)], config=True
565 ['--log-to-file','--log-level', str(logging.INFO)], config=True
557 )
566 )
558
567
559 class SSHEngineSetLauncher(LocalEngineSetLauncher):
568 class SSHEngineSetLauncher(LocalEngineSetLauncher):
560 launcher_class = SSHEngineLauncher
569 launcher_class = SSHEngineLauncher
561 engines = Dict(config=True)
570 engines = Dict(config=True)
562
571
563 def start(self, n, cluster_dir):
572 def start(self, n, cluster_dir):
564 """Start engines by profile or cluster_dir.
573 """Start engines by profile or cluster_dir.
565 `n` is ignored, and the `engines` config property is used instead.
574 `n` is ignored, and the `engines` config property is used instead.
566 """
575 """
567
576
568 self.cluster_dir = unicode(cluster_dir)
577 self.cluster_dir = unicode(cluster_dir)
569 dlist = []
578 dlist = []
570 for host, n in self.engines.iteritems():
579 for host, n in self.engines.iteritems():
571 if isinstance(n, (tuple, list)):
580 if isinstance(n, (tuple, list)):
572 n, args = n
581 n, args = n
573 else:
582 else:
574 args = copy.deepcopy(self.engine_args)
583 args = copy.deepcopy(self.engine_args)
575
584
576 if '@' in host:
585 if '@' in host:
577 user,host = host.split('@',1)
586 user,host = host.split('@',1)
578 else:
587 else:
579 user=None
588 user=None
580 for i in range(n):
589 for i in range(n):
581 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
590 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
582
591
583 # Copy the engine args over to each engine launcher.
592 # Copy the engine args over to each engine launcher.
584 i
593 i
585 el.program_args = args
594 el.program_args = args
586 el.on_stop(self._notice_engine_stopped)
595 el.on_stop(self._notice_engine_stopped)
587 d = el.start(cluster_dir, user=user, hostname=host)
596 d = el.start(cluster_dir, user=user, hostname=host)
588 if i==0:
597 if i==0:
589 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
598 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
590 self.launchers[host+str(i)] = el
599 self.launchers[host+str(i)] = el
591 dlist.append(d)
600 dlist.append(d)
592 self.notify_start(dlist)
601 self.notify_start(dlist)
593 return dlist
602 return dlist
594
603
595
604
596
605
597 #-----------------------------------------------------------------------------
606 #-----------------------------------------------------------------------------
598 # Windows HPC Server 2008 scheduler launchers
607 # Windows HPC Server 2008 scheduler launchers
599 #-----------------------------------------------------------------------------
608 #-----------------------------------------------------------------------------
600
609
601
610
602 # This is only used on Windows.
611 # This is only used on Windows.
603 def find_job_cmd():
612 def find_job_cmd():
604 if WINDOWS:
613 if WINDOWS:
605 try:
614 try:
606 return find_cmd('job')
615 return find_cmd('job')
607 except (FindCmdError, ImportError):
616 except (FindCmdError, ImportError):
608 # ImportError will be raised if win32api is not installed
617 # ImportError will be raised if win32api is not installed
609 return 'job'
618 return 'job'
610 else:
619 else:
611 return 'job'
620 return 'job'
612
621
613
622
614 class WindowsHPCLauncher(BaseLauncher):
623 class WindowsHPCLauncher(BaseLauncher):
615
624
616 # A regular expression used to get the job id from the output of the
625 # A regular expression used to get the job id from the output of the
617 # submit_command.
626 # submit_command.
618 job_id_regexp = Str(r'\d+', config=True)
627 job_id_regexp = Str(r'\d+', config=True)
619 # The filename of the instantiated job script.
628 # The filename of the instantiated job script.
620 job_file_name = CUnicode(u'ipython_job.xml', config=True)
629 job_file_name = CUnicode(u'ipython_job.xml', config=True)
621 # The full path to the instantiated job script. This gets made dynamically
630 # The full path to the instantiated job script. This gets made dynamically
622 # by combining the work_dir with the job_file_name.
631 # by combining the work_dir with the job_file_name.
623 job_file = CUnicode(u'')
632 job_file = CUnicode(u'')
624 # The hostname of the scheduler to submit the job to
633 # The hostname of the scheduler to submit the job to
625 scheduler = CUnicode('', config=True)
634 scheduler = CUnicode('', config=True)
626 job_cmd = CUnicode(find_job_cmd(), config=True)
635 job_cmd = CUnicode(find_job_cmd(), config=True)
627
636
628 def __init__(self, work_dir=u'.', config=None, **kwargs):
637 def __init__(self, work_dir=u'.', config=None, **kwargs):
629 super(WindowsHPCLauncher, self).__init__(
638 super(WindowsHPCLauncher, self).__init__(
630 work_dir=work_dir, config=config, **kwargs
639 work_dir=work_dir, config=config, **kwargs
631 )
640 )
632
641
633 @property
642 @property
634 def job_file(self):
643 def job_file(self):
635 return os.path.join(self.work_dir, self.job_file_name)
644 return os.path.join(self.work_dir, self.job_file_name)
636
645
637 def write_job_file(self, n):
646 def write_job_file(self, n):
638 raise NotImplementedError("Implement write_job_file in a subclass.")
647 raise NotImplementedError("Implement write_job_file in a subclass.")
639
648
640 def find_args(self):
649 def find_args(self):
641 return [u'job.exe']
650 return [u'job.exe']
642
651
643 def parse_job_id(self, output):
652 def parse_job_id(self, output):
644 """Take the output of the submit command and return the job id."""
653 """Take the output of the submit command and return the job id."""
645 m = re.search(self.job_id_regexp, output)
654 m = re.search(self.job_id_regexp, output)
646 if m is not None:
655 if m is not None:
647 job_id = m.group()
656 job_id = m.group()
648 else:
657 else:
649 raise LauncherError("Job id couldn't be determined: %s" % output)
658 raise LauncherError("Job id couldn't be determined: %s" % output)
650 self.job_id = job_id
659 self.job_id = job_id
651 self.log.info('Job started with job id: %r' % job_id)
660 self.log.info('Job started with job id: %r' % job_id)
652 return job_id
661 return job_id
653
662
654 def start(self, n):
663 def start(self, n):
655 """Start n copies of the process using the Win HPC job scheduler."""
664 """Start n copies of the process using the Win HPC job scheduler."""
656 self.write_job_file(n)
665 self.write_job_file(n)
657 args = [
666 args = [
658 'submit',
667 'submit',
659 '/jobfile:%s' % self.job_file,
668 '/jobfile:%s' % self.job_file,
660 '/scheduler:%s' % self.scheduler
669 '/scheduler:%s' % self.scheduler
661 ]
670 ]
662 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
671 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
663 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
672 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
664 output = check_output([self.job_cmd]+args,
673 output = check_output([self.job_cmd]+args,
665 env=os.environ,
674 env=os.environ,
666 cwd=self.work_dir,
675 cwd=self.work_dir,
667 stderr=STDOUT
676 stderr=STDOUT
668 )
677 )
669 job_id = self.parse_job_id(output)
678 job_id = self.parse_job_id(output)
670 self.notify_start(job_id)
679 self.notify_start(job_id)
671 return job_id
680 return job_id
672
681
673 def stop(self):
682 def stop(self):
674 args = [
683 args = [
675 'cancel',
684 'cancel',
676 self.job_id,
685 self.job_id,
677 '/scheduler:%s' % self.scheduler
686 '/scheduler:%s' % self.scheduler
678 ]
687 ]
679 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
688 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
680 try:
689 try:
681 output = check_output([self.job_cmd]+args,
690 output = check_output([self.job_cmd]+args,
682 env=os.environ,
691 env=os.environ,
683 cwd=self.work_dir,
692 cwd=self.work_dir,
684 stderr=STDOUT
693 stderr=STDOUT
685 )
694 )
686 except:
695 except:
687 output = 'The job already appears to be stoppped: %r' % self.job_id
696 output = 'The job already appears to be stoppped: %r' % self.job_id
688 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
697 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
689 return output
698 return output
690
699
691
700
692 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
701 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
693
702
694 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
703 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
695 extra_args = List([], config=False)
704 extra_args = List([], config=False)
696
705
697 def write_job_file(self, n):
706 def write_job_file(self, n):
698 job = IPControllerJob(config=self.config)
707 job = IPControllerJob(config=self.config)
699
708
700 t = IPControllerTask(config=self.config)
709 t = IPControllerTask(config=self.config)
701 # The tasks work directory is *not* the actual work directory of
710 # The tasks work directory is *not* the actual work directory of
702 # the controller. It is used as the base path for the stdout/stderr
711 # the controller. It is used as the base path for the stdout/stderr
703 # files that the scheduler redirects to.
712 # files that the scheduler redirects to.
704 t.work_directory = self.cluster_dir
713 t.work_directory = self.cluster_dir
705 # Add the --cluster-dir and from self.start().
714 # Add the --cluster-dir and from self.start().
706 t.controller_args.extend(self.extra_args)
715 t.controller_args.extend(self.extra_args)
707 job.add_task(t)
716 job.add_task(t)
708
717
709 self.log.info("Writing job description file: %s" % self.job_file)
718 self.log.info("Writing job description file: %s" % self.job_file)
710 job.write(self.job_file)
719 job.write(self.job_file)
711
720
712 @property
721 @property
713 def job_file(self):
722 def job_file(self):
714 return os.path.join(self.cluster_dir, self.job_file_name)
723 return os.path.join(self.cluster_dir, self.job_file_name)
715
724
716 def start(self, cluster_dir):
725 def start(self, cluster_dir):
717 """Start the controller by cluster_dir."""
726 """Start the controller by cluster_dir."""
718 self.extra_args = ['--cluster-dir', cluster_dir]
727 self.extra_args = ['--cluster-dir', cluster_dir]
719 self.cluster_dir = unicode(cluster_dir)
728 self.cluster_dir = unicode(cluster_dir)
720 return super(WindowsHPCControllerLauncher, self).start(1)
729 return super(WindowsHPCControllerLauncher, self).start(1)
721
730
722
731
723 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
732 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
724
733
725 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
734 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
726 extra_args = List([], config=False)
735 extra_args = List([], config=False)
727
736
728 def write_job_file(self, n):
737 def write_job_file(self, n):
729 job = IPEngineSetJob(config=self.config)
738 job = IPEngineSetJob(config=self.config)
730
739
731 for i in range(n):
740 for i in range(n):
732 t = IPEngineTask(config=self.config)
741 t = IPEngineTask(config=self.config)
733 # The tasks work directory is *not* the actual work directory of
742 # The tasks work directory is *not* the actual work directory of
734 # the engine. It is used as the base path for the stdout/stderr
743 # the engine. It is used as the base path for the stdout/stderr
735 # files that the scheduler redirects to.
744 # files that the scheduler redirects to.
736 t.work_directory = self.cluster_dir
745 t.work_directory = self.cluster_dir
737 # Add the --cluster-dir and from self.start().
746 # Add the --cluster-dir and from self.start().
738 t.engine_args.extend(self.extra_args)
747 t.engine_args.extend(self.extra_args)
739 job.add_task(t)
748 job.add_task(t)
740
749
741 self.log.info("Writing job description file: %s" % self.job_file)
750 self.log.info("Writing job description file: %s" % self.job_file)
742 job.write(self.job_file)
751 job.write(self.job_file)
743
752
744 @property
753 @property
745 def job_file(self):
754 def job_file(self):
746 return os.path.join(self.cluster_dir, self.job_file_name)
755 return os.path.join(self.cluster_dir, self.job_file_name)
747
756
748 def start(self, n, cluster_dir):
757 def start(self, n, cluster_dir):
749 """Start the controller by cluster_dir."""
758 """Start the controller by cluster_dir."""
750 self.extra_args = ['--cluster-dir', cluster_dir]
759 self.extra_args = ['--cluster-dir', cluster_dir]
751 self.cluster_dir = unicode(cluster_dir)
760 self.cluster_dir = unicode(cluster_dir)
752 return super(WindowsHPCEngineSetLauncher, self).start(n)
761 return super(WindowsHPCEngineSetLauncher, self).start(n)
753
762
754
763
755 #-----------------------------------------------------------------------------
764 #-----------------------------------------------------------------------------
756 # Batch (PBS) system launchers
765 # Batch (PBS) system launchers
757 #-----------------------------------------------------------------------------
766 #-----------------------------------------------------------------------------
758
767
759 class BatchSystemLauncher(BaseLauncher):
768 class BatchSystemLauncher(BaseLauncher):
760 """Launch an external process using a batch system.
769 """Launch an external process using a batch system.
761
770
762 This class is designed to work with UNIX batch systems like PBS, LSF,
771 This class is designed to work with UNIX batch systems like PBS, LSF,
763 GridEngine, etc. The overall model is that there are different commands
772 GridEngine, etc. The overall model is that there are different commands
764 like qsub, qdel, etc. that handle the starting and stopping of the process.
773 like qsub, qdel, etc. that handle the starting and stopping of the process.
765
774
766 This class also has the notion of a batch script. The ``batch_template``
775 This class also has the notion of a batch script. The ``batch_template``
767 attribute can be set to a string that is a template for the batch script.
776 attribute can be set to a string that is a template for the batch script.
768 This template is instantiated using Itpl. Thus the template can use
777 This template is instantiated using Itpl. Thus the template can use
769 ${n} fot the number of instances. Subclasses can add additional variables
778 ${n} fot the number of instances. Subclasses can add additional variables
770 to the template dict.
779 to the template dict.
771 """
780 """
772
781
773 # Subclasses must fill these in. See PBSEngineSet
782 # Subclasses must fill these in. See PBSEngineSet
774 # The name of the command line program used to submit jobs.
783 # The name of the command line program used to submit jobs.
775 submit_command = List([''], config=True)
784 submit_command = List([''], config=True)
776 # The name of the command line program used to delete jobs.
785 # The name of the command line program used to delete jobs.
777 delete_command = List([''], config=True)
786 delete_command = List([''], config=True)
778 # A regular expression used to get the job id from the output of the
787 # A regular expression used to get the job id from the output of the
779 # submit_command.
788 # submit_command.
780 job_id_regexp = CUnicode('', config=True)
789 job_id_regexp = CUnicode('', config=True)
781 # The string that is the batch script template itself.
790 # The string that is the batch script template itself.
782 batch_template = CUnicode('', config=True)
791 batch_template = CUnicode('', config=True)
783 # The file that contains the batch template
792 # The file that contains the batch template
784 batch_template_file = CUnicode(u'', config=True)
793 batch_template_file = CUnicode(u'', config=True)
785 # The filename of the instantiated batch script.
794 # The filename of the instantiated batch script.
786 batch_file_name = CUnicode(u'batch_script', config=True)
795 batch_file_name = CUnicode(u'batch_script', config=True)
787 # The PBS Queue
796 # The PBS Queue
788 queue = CUnicode(u'', config=True)
797 queue = CUnicode(u'', config=True)
789
798
790 # not configurable, override in subclasses
799 # not configurable, override in subclasses
791 # PBS Job Array regex
800 # PBS Job Array regex
792 job_array_regexp = CUnicode('')
801 job_array_regexp = CUnicode('')
793 job_array_template = CUnicode('')
802 job_array_template = CUnicode('')
794 # PBS Queue regex
803 # PBS Queue regex
795 queue_regexp = CUnicode('')
804 queue_regexp = CUnicode('')
796 queue_template = CUnicode('')
805 queue_template = CUnicode('')
797 # The default batch template, override in subclasses
806 # The default batch template, override in subclasses
798 default_template = CUnicode('')
807 default_template = CUnicode('')
799 # The full path to the instantiated batch script.
808 # The full path to the instantiated batch script.
800 batch_file = CUnicode(u'')
809 batch_file = CUnicode(u'')
801 # the format dict used with batch_template:
810 # the format dict used with batch_template:
802 context = Dict()
811 context = Dict()
803
812
804
813
805 def find_args(self):
814 def find_args(self):
806 return self.submit_command + [self.batch_file]
815 return self.submit_command + [self.batch_file]
807
816
808 def __init__(self, work_dir=u'.', config=None, **kwargs):
817 def __init__(self, work_dir=u'.', config=None, **kwargs):
809 super(BatchSystemLauncher, self).__init__(
818 super(BatchSystemLauncher, self).__init__(
810 work_dir=work_dir, config=config, **kwargs
819 work_dir=work_dir, config=config, **kwargs
811 )
820 )
812 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
821 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
813
822
814 def parse_job_id(self, output):
823 def parse_job_id(self, output):
815 """Take the output of the submit command and return the job id."""
824 """Take the output of the submit command and return the job id."""
816 m = re.search(self.job_id_regexp, output)
825 m = re.search(self.job_id_regexp, output)
817 if m is not None:
826 if m is not None:
818 job_id = m.group()
827 job_id = m.group()
819 else:
828 else:
820 raise LauncherError("Job id couldn't be determined: %s" % output)
829 raise LauncherError("Job id couldn't be determined: %s" % output)
821 self.job_id = job_id
830 self.job_id = job_id
822 self.log.info('Job submitted with job id: %r' % job_id)
831 self.log.info('Job submitted with job id: %r' % job_id)
823 return job_id
832 return job_id
824
833
825 def write_batch_script(self, n):
834 def write_batch_script(self, n):
826 """Instantiate and write the batch script to the work_dir."""
835 """Instantiate and write the batch script to the work_dir."""
827 self.context['n'] = n
836 self.context['n'] = n
828 self.context['queue'] = self.queue
837 self.context['queue'] = self.queue
829 print self.context
838 print self.context
830 # first priority is batch_template if set
839 # first priority is batch_template if set
831 if self.batch_template_file and not self.batch_template:
840 if self.batch_template_file and not self.batch_template:
832 # second priority is batch_template_file
841 # second priority is batch_template_file
833 with open(self.batch_template_file) as f:
842 with open(self.batch_template_file) as f:
834 self.batch_template = f.read()
843 self.batch_template = f.read()
835 if not self.batch_template:
844 if not self.batch_template:
836 # third (last) priority is default_template
845 # third (last) priority is default_template
837 self.batch_template = self.default_template
846 self.batch_template = self.default_template
838
847
839 regex = re.compile(self.job_array_regexp)
848 regex = re.compile(self.job_array_regexp)
840 # print regex.search(self.batch_template)
849 # print regex.search(self.batch_template)
841 if not regex.search(self.batch_template):
850 if not regex.search(self.batch_template):
842 self.log.info("adding job array settings to batch script")
851 self.log.info("adding job array settings to batch script")
843 firstline, rest = self.batch_template.split('\n',1)
852 firstline, rest = self.batch_template.split('\n',1)
844 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
853 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
845
854
846 regex = re.compile(self.queue_regexp)
855 regex = re.compile(self.queue_regexp)
847 # print regex.search(self.batch_template)
856 # print regex.search(self.batch_template)
848 if self.queue and not regex.search(self.batch_template):
857 if self.queue and not regex.search(self.batch_template):
849 self.log.info("adding PBS queue settings to batch script")
858 self.log.info("adding PBS queue settings to batch script")
850 firstline, rest = self.batch_template.split('\n',1)
859 firstline, rest = self.batch_template.split('\n',1)
851 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
860 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
852
861
853 script_as_string = Itpl.itplns(self.batch_template, self.context)
862 script_as_string = Itpl.itplns(self.batch_template, self.context)
854 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
863 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
855
864
856 with open(self.batch_file, 'w') as f:
865 with open(self.batch_file, 'w') as f:
857 f.write(script_as_string)
866 f.write(script_as_string)
858 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
867 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
859
868
860 def start(self, n, cluster_dir):
869 def start(self, n, cluster_dir):
861 """Start n copies of the process using a batch system."""
870 """Start n copies of the process using a batch system."""
862 # Here we save profile and cluster_dir in the context so they
871 # Here we save profile and cluster_dir in the context so they
863 # can be used in the batch script template as ${profile} and
872 # can be used in the batch script template as ${profile} and
864 # ${cluster_dir}
873 # ${cluster_dir}
865 self.context['cluster_dir'] = cluster_dir
874 self.context['cluster_dir'] = cluster_dir
866 self.cluster_dir = unicode(cluster_dir)
875 self.cluster_dir = unicode(cluster_dir)
867 self.write_batch_script(n)
876 self.write_batch_script(n)
868 output = check_output(self.args, env=os.environ)
877 output = check_output(self.args, env=os.environ)
869
878
870 job_id = self.parse_job_id(output)
879 job_id = self.parse_job_id(output)
871 self.notify_start(job_id)
880 self.notify_start(job_id)
872 return job_id
881 return job_id
873
882
874 def stop(self):
883 def stop(self):
875 output = check_output(self.delete_command+[self.job_id], env=os.environ)
884 output = check_output(self.delete_command+[self.job_id], env=os.environ)
876 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
885 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
877 return output
886 return output
878
887
879
888
880 class PBSLauncher(BatchSystemLauncher):
889 class PBSLauncher(BatchSystemLauncher):
881 """A BatchSystemLauncher subclass for PBS."""
890 """A BatchSystemLauncher subclass for PBS."""
882
891
883 submit_command = List(['qsub'], config=True)
892 submit_command = List(['qsub'], config=True)
884 delete_command = List(['qdel'], config=True)
893 delete_command = List(['qdel'], config=True)
885 job_id_regexp = CUnicode(r'\d+', config=True)
894 job_id_regexp = CUnicode(r'\d+', config=True)
886
895
887 batch_file = CUnicode(u'')
896 batch_file = CUnicode(u'')
888 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
897 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
889 job_array_template = CUnicode('#PBS -t 1-$n')
898 job_array_template = CUnicode('#PBS -t 1-$n')
890 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
899 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
891 queue_template = CUnicode('#PBS -q $queue')
900 queue_template = CUnicode('#PBS -q $queue')
892
901
893
902
894 class PBSControllerLauncher(PBSLauncher):
903 class PBSControllerLauncher(PBSLauncher):
895 """Launch a controller using PBS."""
904 """Launch a controller using PBS."""
896
905
897 batch_file_name = CUnicode(u'pbs_controller', config=True)
906 batch_file_name = CUnicode(u'pbs_controller', config=True)
898 default_template= CUnicode("""#!/bin/sh
907 default_template= CUnicode("""#!/bin/sh
899 #PBS -V
908 #PBS -V
900 #PBS -N ipcontroller
909 #PBS -N ipcontroller
901 %s --log-to-file --cluster-dir $cluster_dir
910 %s --log-to-file --cluster-dir $cluster_dir
902 """%(' '.join(ipcontroller_cmd_argv)))
911 """%(' '.join(ipcontroller_cmd_argv)))
903
912
904 def start(self, cluster_dir):
913 def start(self, cluster_dir):
905 """Start the controller by profile or cluster_dir."""
914 """Start the controller by profile or cluster_dir."""
906 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
915 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
907 return super(PBSControllerLauncher, self).start(1, cluster_dir)
916 return super(PBSControllerLauncher, self).start(1, cluster_dir)
908
917
909
918
910 class PBSEngineSetLauncher(PBSLauncher):
919 class PBSEngineSetLauncher(PBSLauncher):
911 """Launch Engines using PBS"""
920 """Launch Engines using PBS"""
912 batch_file_name = CUnicode(u'pbs_engines', config=True)
921 batch_file_name = CUnicode(u'pbs_engines', config=True)
913 default_template= CUnicode(u"""#!/bin/sh
922 default_template= CUnicode(u"""#!/bin/sh
914 #PBS -V
923 #PBS -V
915 #PBS -N ipengine
924 #PBS -N ipengine
916 %s --cluster-dir $cluster_dir
925 %s --cluster-dir $cluster_dir
917 """%(' '.join(ipengine_cmd_argv)))
926 """%(' '.join(ipengine_cmd_argv)))
918
927
919 def start(self, n, cluster_dir):
928 def start(self, n, cluster_dir):
920 """Start n engines by profile or cluster_dir."""
929 """Start n engines by profile or cluster_dir."""
921 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
930 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
922 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
931 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
923
932
924 #SGE is very similar to PBS
933 #SGE is very similar to PBS
925
934
926 class SGELauncher(PBSLauncher):
935 class SGELauncher(PBSLauncher):
927 """Sun GridEngine is a PBS clone with slightly different syntax"""
936 """Sun GridEngine is a PBS clone with slightly different syntax"""
928 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
937 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
929 job_array_template = CUnicode('#$$ -t 1-$n')
938 job_array_template = CUnicode('#$$ -t 1-$n')
930 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
939 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
931 queue_template = CUnicode('#$$ -q $queue')
940 queue_template = CUnicode('#$$ -q $queue')
932
941
933 class SGEControllerLauncher(SGELauncher):
942 class SGEControllerLauncher(SGELauncher):
934 """Launch a controller using SGE."""
943 """Launch a controller using SGE."""
935
944
936 batch_file_name = CUnicode(u'sge_controller', config=True)
945 batch_file_name = CUnicode(u'sge_controller', config=True)
937 default_template= CUnicode(u"""#$$ -V
946 default_template= CUnicode(u"""#$$ -V
938 #$$ -S /bin/sh
947 #$$ -S /bin/sh
939 #$$ -N ipcontroller
948 #$$ -N ipcontroller
940 %s --log-to-file --cluster-dir $cluster_dir
949 %s --log-to-file --cluster-dir $cluster_dir
941 """%(' '.join(ipcontroller_cmd_argv)))
950 """%(' '.join(ipcontroller_cmd_argv)))
942
951
943 def start(self, cluster_dir):
952 def start(self, cluster_dir):
944 """Start the controller by profile or cluster_dir."""
953 """Start the controller by profile or cluster_dir."""
945 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
946 return super(PBSControllerLauncher, self).start(1, cluster_dir)
955 return super(PBSControllerLauncher, self).start(1, cluster_dir)
947
956
948 class SGEEngineSetLauncher(SGELauncher):
957 class SGEEngineSetLauncher(SGELauncher):
949 """Launch Engines with SGE"""
958 """Launch Engines with SGE"""
950 batch_file_name = CUnicode(u'sge_engines', config=True)
959 batch_file_name = CUnicode(u'sge_engines', config=True)
951 default_template = CUnicode("""#$$ -V
960 default_template = CUnicode("""#$$ -V
952 #$$ -S /bin/sh
961 #$$ -S /bin/sh
953 #$$ -N ipengine
962 #$$ -N ipengine
954 %s --cluster-dir $cluster_dir
963 %s --cluster-dir $cluster_dir
955 """%(' '.join(ipengine_cmd_argv)))
964 """%(' '.join(ipengine_cmd_argv)))
956
965
957 def start(self, n, cluster_dir):
966 def start(self, n, cluster_dir):
958 """Start n engines by profile or cluster_dir."""
967 """Start n engines by profile or cluster_dir."""
959 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
968 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
960 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
969 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
961
970
962
971
963 #-----------------------------------------------------------------------------
972 #-----------------------------------------------------------------------------
964 # A launcher for ipcluster itself!
973 # A launcher for ipcluster itself!
965 #-----------------------------------------------------------------------------
974 #-----------------------------------------------------------------------------
966
975
967
976
968 class IPClusterLauncher(LocalProcessLauncher):
977 class IPClusterLauncher(LocalProcessLauncher):
969 """Launch the ipcluster program in an external process."""
978 """Launch the ipcluster program in an external process."""
970
979
971 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
980 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
972 # Command line arguments to pass to ipcluster.
981 # Command line arguments to pass to ipcluster.
973 ipcluster_args = List(
982 ipcluster_args = List(
974 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
983 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
975 ipcluster_subcommand = Str('start')
984 ipcluster_subcommand = Str('start')
976 ipcluster_n = Int(2)
985 ipcluster_n = Int(2)
977
986
978 def find_args(self):
987 def find_args(self):
979 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
988 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
980 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
989 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
981
990
982 def start(self):
991 def start(self):
983 self.log.info("Starting ipcluster: %r" % self.args)
992 self.log.info("Starting ipcluster: %r" % self.args)
984 return super(IPClusterLauncher, self).start()
993 return super(IPClusterLauncher, self).start()
985
994
General Comments 0
You need to be logged in to leave comments. Login now