##// END OF EJS Templates
handle potentially absent win32api in launcher.py
MinRK -
Show More
@@ -1,971 +1,972 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 from signal import SIGINT, SIGTERM
24 from signal import SIGINT, SIGTERM
25 try:
25 try:
26 from signal import SIGKILL
26 from signal import SIGKILL
27 except ImportError:
27 except ImportError:
28 SIGKILL=SIGTERM
28 SIGKILL=SIGTERM
29
29
30 from subprocess import Popen, PIPE, STDOUT
30 from subprocess import Popen, PIPE, STDOUT
31 try:
31 try:
32 from subprocess import check_output
32 from subprocess import check_output
33 except ImportError:
33 except ImportError:
34 # pre-2.7, define check_output with Popen
34 # pre-2.7, define check_output with Popen
35 def check_output(*args, **kwargs):
35 def check_output(*args, **kwargs):
36 kwargs.update(dict(stdout=PIPE))
36 kwargs.update(dict(stdout=PIPE))
37 p = Popen(*args, **kwargs)
37 p = Popen(*args, **kwargs)
38 out,err = p.communicate()
38 out,err = p.communicate()
39 return out
39 return out
40
40
41 from zmq.eventloop import ioloop
41 from zmq.eventloop import ioloop
42
42
43 from IPython.external import Itpl
43 from IPython.external import Itpl
44 # from IPython.config.configurable import Configurable
44 # from IPython.config.configurable import Configurable
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 from IPython.utils.path import get_ipython_module_path
46 from IPython.utils.path import get_ipython_module_path
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48
48
49 from IPython.parallel.factory import LoggingFactory
49 from IPython.parallel.factory import LoggingFactory
50
50
51 # load winhpcjob only on Windows
51 # load winhpcjob only on Windows
52 try:
52 try:
53 from .winhpcjob import (
53 from .winhpcjob import (
54 IPControllerTask, IPEngineTask,
54 IPControllerTask, IPEngineTask,
55 IPControllerJob, IPEngineSetJob
55 IPControllerJob, IPEngineSetJob
56 )
56 )
57 except ImportError:
57 except ImportError:
58 pass
58 pass
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # Paths to the kernel apps
62 # Paths to the kernel apps
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65
65
66 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
66 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
67 'IPython.parallel.apps.ipclusterapp'
67 'IPython.parallel.apps.ipclusterapp'
68 ))
68 ))
69
69
70 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
71 'IPython.parallel.apps.ipengineapp'
71 'IPython.parallel.apps.ipengineapp'
72 ))
72 ))
73
73
74 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipcontrollerapp'
75 'IPython.parallel.apps.ipcontrollerapp'
76 ))
76 ))
77
77
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79 # Base launchers and errors
79 # Base launchers and errors
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81
81
82
82
83 class LauncherError(Exception):
83 class LauncherError(Exception):
84 pass
84 pass
85
85
86
86
87 class ProcessStateError(LauncherError):
87 class ProcessStateError(LauncherError):
88 pass
88 pass
89
89
90
90
91 class UnknownStatus(LauncherError):
91 class UnknownStatus(LauncherError):
92 pass
92 pass
93
93
94
94
95 class BaseLauncher(LoggingFactory):
95 class BaseLauncher(LoggingFactory):
96 """An asbtraction for starting, stopping and signaling a process."""
96 """An asbtraction for starting, stopping and signaling a process."""
97
97
98 # In all of the launchers, the work_dir is where child processes will be
98 # In all of the launchers, the work_dir is where child processes will be
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
100 # passed into the __init__ method will override the config value.
100 # passed into the __init__ method will override the config value.
101 # This should not be used to set the work_dir for the actual engine
101 # This should not be used to set the work_dir for the actual engine
102 # and controller. Instead, use their own config files or the
102 # and controller. Instead, use their own config files or the
103 # controller_args, engine_args attributes of the launchers to add
103 # controller_args, engine_args attributes of the launchers to add
104 # the --work-dir option.
104 # the --work-dir option.
105 work_dir = Unicode(u'.')
105 work_dir = Unicode(u'.')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
107
107
108 start_data = Any()
108 start_data = Any()
109 stop_data = Any()
109 stop_data = Any()
110
110
111 def _loop_default(self):
111 def _loop_default(self):
112 return ioloop.IOLoop.instance()
112 return ioloop.IOLoop.instance()
113
113
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
116 self.state = 'before' # can be before, running, after
116 self.state = 'before' # can be before, running, after
117 self.stop_callbacks = []
117 self.stop_callbacks = []
118 self.start_data = None
118 self.start_data = None
119 self.stop_data = None
119 self.stop_data = None
120
120
121 @property
121 @property
122 def args(self):
122 def args(self):
123 """A list of cmd and args that will be used to start the process.
123 """A list of cmd and args that will be used to start the process.
124
124
125 This is what is passed to :func:`spawnProcess` and the first element
125 This is what is passed to :func:`spawnProcess` and the first element
126 will be the process name.
126 will be the process name.
127 """
127 """
128 return self.find_args()
128 return self.find_args()
129
129
130 def find_args(self):
130 def find_args(self):
131 """The ``.args`` property calls this to find the args list.
131 """The ``.args`` property calls this to find the args list.
132
132
133 Subcommand should implement this to construct the cmd and args.
133 Subcommand should implement this to construct the cmd and args.
134 """
134 """
135 raise NotImplementedError('find_args must be implemented in a subclass')
135 raise NotImplementedError('find_args must be implemented in a subclass')
136
136
137 @property
137 @property
138 def arg_str(self):
138 def arg_str(self):
139 """The string form of the program arguments."""
139 """The string form of the program arguments."""
140 return ' '.join(self.args)
140 return ' '.join(self.args)
141
141
142 @property
142 @property
143 def running(self):
143 def running(self):
144 """Am I running."""
144 """Am I running."""
145 if self.state == 'running':
145 if self.state == 'running':
146 return True
146 return True
147 else:
147 else:
148 return False
148 return False
149
149
150 def start(self):
150 def start(self):
151 """Start the process.
151 """Start the process.
152
152
153 This must return a deferred that fires with information about the
153 This must return a deferred that fires with information about the
154 process starting (like a pid, job id, etc.).
154 process starting (like a pid, job id, etc.).
155 """
155 """
156 raise NotImplementedError('start must be implemented in a subclass')
156 raise NotImplementedError('start must be implemented in a subclass')
157
157
158 def stop(self):
158 def stop(self):
159 """Stop the process and notify observers of stopping.
159 """Stop the process and notify observers of stopping.
160
160
161 This must return a deferred that fires with information about the
161 This must return a deferred that fires with information about the
162 processing stopping, like errors that occur while the process is
162 processing stopping, like errors that occur while the process is
163 attempting to be shut down. This deferred won't fire when the process
163 attempting to be shut down. This deferred won't fire when the process
164 actually stops. To observe the actual process stopping, see
164 actually stops. To observe the actual process stopping, see
165 :func:`observe_stop`.
165 :func:`observe_stop`.
166 """
166 """
167 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
168
168
169 def on_stop(self, f):
169 def on_stop(self, f):
170 """Get a deferred that will fire when the process stops.
170 """Get a deferred that will fire when the process stops.
171
171
172 The deferred will fire with data that contains information about
172 The deferred will fire with data that contains information about
173 the exit status of the process.
173 the exit status of the process.
174 """
174 """
175 if self.state=='after':
175 if self.state=='after':
176 return f(self.stop_data)
176 return f(self.stop_data)
177 else:
177 else:
178 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
179
179
180 def notify_start(self, data):
180 def notify_start(self, data):
181 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
182
182
183 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
185 """
185 """
186
186
187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
188 self.start_data = data
189 self.state = 'running'
189 self.state = 'running'
190 return data
190 return data
191
191
192 def notify_stop(self, data):
192 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
194
194
195 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger all the deferreds from :func:`observe_stop`."""
196 this to trigger all the deferreds from :func:`observe_stop`."""
197
197
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
199 self.stop_data = data
200 self.state = 'after'
200 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
203 d(data)
203 d(data)
204 return data
204 return data
205
205
206 def signal(self, sig):
206 def signal(self, sig):
207 """Signal the process.
207 """Signal the process.
208
208
209 Return a semi-meaningless deferred after signaling the process.
209 Return a semi-meaningless deferred after signaling the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218
218
219 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220 # Local process launchers
220 # Local process launchers
221 #-----------------------------------------------------------------------------
221 #-----------------------------------------------------------------------------
222
222
223
223
224 class LocalProcessLauncher(BaseLauncher):
224 class LocalProcessLauncher(BaseLauncher):
225 """Start and stop an external process in an asynchronous manner.
225 """Start and stop an external process in an asynchronous manner.
226
226
227 This will launch the external process with a working directory of
227 This will launch the external process with a working directory of
228 ``self.work_dir``.
228 ``self.work_dir``.
229 """
229 """
230
230
231 # This is used to to construct self.args, which is passed to
231 # This is used to to construct self.args, which is passed to
232 # spawnProcess.
232 # spawnProcess.
233 cmd_and_args = List([])
233 cmd_and_args = List([])
234 poll_frequency = Int(100) # in ms
234 poll_frequency = Int(100) # in ms
235
235
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
237 super(LocalProcessLauncher, self).__init__(
237 super(LocalProcessLauncher, self).__init__(
238 work_dir=work_dir, config=config, **kwargs
238 work_dir=work_dir, config=config, **kwargs
239 )
239 )
240 self.process = None
240 self.process = None
241 self.start_deferred = None
241 self.start_deferred = None
242 self.poller = None
242 self.poller = None
243
243
244 def find_args(self):
244 def find_args(self):
245 return self.cmd_and_args
245 return self.cmd_and_args
246
246
247 def start(self):
247 def start(self):
248 if self.state == 'before':
248 if self.state == 'before':
249 self.process = Popen(self.args,
249 self.process = Popen(self.args,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
251 env=os.environ,
251 env=os.environ,
252 cwd=self.work_dir
252 cwd=self.work_dir
253 )
253 )
254
254
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 self.poller.start()
258 self.poller.start()
259 self.notify_start(self.process.pid)
259 self.notify_start(self.process.pid)
260 else:
260 else:
261 s = 'The process was already started and has state: %r' % self.state
261 s = 'The process was already started and has state: %r' % self.state
262 raise ProcessStateError(s)
262 raise ProcessStateError(s)
263
263
264 def stop(self):
264 def stop(self):
265 return self.interrupt_then_kill()
265 return self.interrupt_then_kill()
266
266
267 def signal(self, sig):
267 def signal(self, sig):
268 if self.state == 'running':
268 if self.state == 'running':
269 self.process.send_signal(sig)
269 self.process.send_signal(sig)
270
270
271 def interrupt_then_kill(self, delay=2.0):
271 def interrupt_then_kill(self, delay=2.0):
272 """Send INT, wait a delay and then send KILL."""
272 """Send INT, wait a delay and then send KILL."""
273 self.signal(SIGINT)
273 self.signal(SIGINT)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
275 self.killer.start()
275 self.killer.start()
276
276
277 # callbacks, etc:
277 # callbacks, etc:
278
278
279 def handle_stdout(self, fd, events):
279 def handle_stdout(self, fd, events):
280 line = self.process.stdout.readline()
280 line = self.process.stdout.readline()
281 # a stopped process will be readable but return empty strings
281 # a stopped process will be readable but return empty strings
282 if line:
282 if line:
283 self.log.info(line[:-1])
283 self.log.info(line[:-1])
284 else:
284 else:
285 self.poll()
285 self.poll()
286
286
287 def handle_stderr(self, fd, events):
287 def handle_stderr(self, fd, events):
288 line = self.process.stderr.readline()
288 line = self.process.stderr.readline()
289 # a stopped process will be readable but return empty strings
289 # a stopped process will be readable but return empty strings
290 if line:
290 if line:
291 self.log.error(line[:-1])
291 self.log.error(line[:-1])
292 else:
292 else:
293 self.poll()
293 self.poll()
294
294
295 def poll(self):
295 def poll(self):
296 status = self.process.poll()
296 status = self.process.poll()
297 if status is not None:
297 if status is not None:
298 self.poller.stop()
298 self.poller.stop()
299 self.loop.remove_handler(self.process.stdout.fileno())
299 self.loop.remove_handler(self.process.stdout.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 return status
302 return status
303
303
304 class LocalControllerLauncher(LocalProcessLauncher):
304 class LocalControllerLauncher(LocalProcessLauncher):
305 """Launch a controller as a regular external process."""
305 """Launch a controller as a regular external process."""
306
306
307 controller_cmd = List(ipcontroller_cmd_argv, config=True)
307 controller_cmd = List(ipcontroller_cmd_argv, config=True)
308 # Command line arguments to ipcontroller.
308 # Command line arguments to ipcontroller.
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
310
310
311 def find_args(self):
311 def find_args(self):
312 return self.controller_cmd + self.controller_args
312 return self.controller_cmd + self.controller_args
313
313
314 def start(self, cluster_dir):
314 def start(self, cluster_dir):
315 """Start the controller by cluster_dir."""
315 """Start the controller by cluster_dir."""
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
317 self.cluster_dir = unicode(cluster_dir)
317 self.cluster_dir = unicode(cluster_dir)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
319 return super(LocalControllerLauncher, self).start()
319 return super(LocalControllerLauncher, self).start()
320
320
321
321
322 class LocalEngineLauncher(LocalProcessLauncher):
322 class LocalEngineLauncher(LocalProcessLauncher):
323 """Launch a single engine as a regular externall process."""
323 """Launch a single engine as a regular externall process."""
324
324
325 engine_cmd = List(ipengine_cmd_argv, config=True)
325 engine_cmd = List(ipengine_cmd_argv, config=True)
326 # Command line arguments for ipengine.
326 # Command line arguments for ipengine.
327 engine_args = List(
327 engine_args = List(
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
329 )
329 )
330
330
331 def find_args(self):
331 def find_args(self):
332 return self.engine_cmd + self.engine_args
332 return self.engine_cmd + self.engine_args
333
333
334 def start(self, cluster_dir):
334 def start(self, cluster_dir):
335 """Start the engine by cluster_dir."""
335 """Start the engine by cluster_dir."""
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
337 self.cluster_dir = unicode(cluster_dir)
337 self.cluster_dir = unicode(cluster_dir)
338 return super(LocalEngineLauncher, self).start()
338 return super(LocalEngineLauncher, self).start()
339
339
340
340
341 class LocalEngineSetLauncher(BaseLauncher):
341 class LocalEngineSetLauncher(BaseLauncher):
342 """Launch a set of engines as regular external processes."""
342 """Launch a set of engines as regular external processes."""
343
343
344 # Command line arguments for ipengine.
344 # Command line arguments for ipengine.
345 engine_args = List(
345 engine_args = List(
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
347 )
347 )
348 # launcher class
348 # launcher class
349 launcher_class = LocalEngineLauncher
349 launcher_class = LocalEngineLauncher
350
350
351 launchers = Dict()
351 launchers = Dict()
352 stop_data = Dict()
352 stop_data = Dict()
353
353
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
355 super(LocalEngineSetLauncher, self).__init__(
355 super(LocalEngineSetLauncher, self).__init__(
356 work_dir=work_dir, config=config, **kwargs
356 work_dir=work_dir, config=config, **kwargs
357 )
357 )
358 self.stop_data = {}
358 self.stop_data = {}
359
359
360 def start(self, n, cluster_dir):
360 def start(self, n, cluster_dir):
361 """Start n engines by profile or cluster_dir."""
361 """Start n engines by profile or cluster_dir."""
362 self.cluster_dir = unicode(cluster_dir)
362 self.cluster_dir = unicode(cluster_dir)
363 dlist = []
363 dlist = []
364 for i in range(n):
364 for i in range(n):
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
366 # Copy the engine args over to each engine launcher.
366 # Copy the engine args over to each engine launcher.
367 el.engine_args = copy.deepcopy(self.engine_args)
367 el.engine_args = copy.deepcopy(self.engine_args)
368 el.on_stop(self._notice_engine_stopped)
368 el.on_stop(self._notice_engine_stopped)
369 d = el.start(cluster_dir)
369 d = el.start(cluster_dir)
370 if i==0:
370 if i==0:
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
372 self.launchers[i] = el
372 self.launchers[i] = el
373 dlist.append(d)
373 dlist.append(d)
374 self.notify_start(dlist)
374 self.notify_start(dlist)
375 # The consumeErrors here could be dangerous
375 # The consumeErrors here could be dangerous
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
377 # dfinal.addCallback(self.notify_start)
377 # dfinal.addCallback(self.notify_start)
378 return dlist
378 return dlist
379
379
380 def find_args(self):
380 def find_args(self):
381 return ['engine set']
381 return ['engine set']
382
382
383 def signal(self, sig):
383 def signal(self, sig):
384 dlist = []
384 dlist = []
385 for el in self.launchers.itervalues():
385 for el in self.launchers.itervalues():
386 d = el.signal(sig)
386 d = el.signal(sig)
387 dlist.append(d)
387 dlist.append(d)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 return dlist
389 return dlist
390
390
391 def interrupt_then_kill(self, delay=1.0):
391 def interrupt_then_kill(self, delay=1.0):
392 dlist = []
392 dlist = []
393 for el in self.launchers.itervalues():
393 for el in self.launchers.itervalues():
394 d = el.interrupt_then_kill(delay)
394 d = el.interrupt_then_kill(delay)
395 dlist.append(d)
395 dlist.append(d)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 return dlist
397 return dlist
398
398
399 def stop(self):
399 def stop(self):
400 return self.interrupt_then_kill()
400 return self.interrupt_then_kill()
401
401
402 def _notice_engine_stopped(self, data):
402 def _notice_engine_stopped(self, data):
403 pid = data['pid']
403 pid = data['pid']
404 for idx,el in self.launchers.iteritems():
404 for idx,el in self.launchers.iteritems():
405 if el.process.pid == pid:
405 if el.process.pid == pid:
406 break
406 break
407 self.launchers.pop(idx)
407 self.launchers.pop(idx)
408 self.stop_data[idx] = data
408 self.stop_data[idx] = data
409 if not self.launchers:
409 if not self.launchers:
410 self.notify_stop(self.stop_data)
410 self.notify_stop(self.stop_data)
411
411
412
412
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414 # MPIExec launchers
414 # MPIExec launchers
415 #-----------------------------------------------------------------------------
415 #-----------------------------------------------------------------------------
416
416
417
417
418 class MPIExecLauncher(LocalProcessLauncher):
418 class MPIExecLauncher(LocalProcessLauncher):
419 """Launch an external process using mpiexec."""
419 """Launch an external process using mpiexec."""
420
420
421 # The mpiexec command to use in starting the process.
421 # The mpiexec command to use in starting the process.
422 mpi_cmd = List(['mpiexec'], config=True)
422 mpi_cmd = List(['mpiexec'], config=True)
423 # The command line arguments to pass to mpiexec.
423 # The command line arguments to pass to mpiexec.
424 mpi_args = List([], config=True)
424 mpi_args = List([], config=True)
425 # The program to start using mpiexec.
425 # The program to start using mpiexec.
426 program = List(['date'], config=True)
426 program = List(['date'], config=True)
427 # The command line argument to the program.
427 # The command line argument to the program.
428 program_args = List([], config=True)
428 program_args = List([], config=True)
429 # The number of instances of the program to start.
429 # The number of instances of the program to start.
430 n = Int(1, config=True)
430 n = Int(1, config=True)
431
431
432 def find_args(self):
432 def find_args(self):
433 """Build self.args using all the fields."""
433 """Build self.args using all the fields."""
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
435 self.program + self.program_args
435 self.program + self.program_args
436
436
437 def start(self, n):
437 def start(self, n):
438 """Start n instances of the program using mpiexec."""
438 """Start n instances of the program using mpiexec."""
439 self.n = n
439 self.n = n
440 return super(MPIExecLauncher, self).start()
440 return super(MPIExecLauncher, self).start()
441
441
442
442
443 class MPIExecControllerLauncher(MPIExecLauncher):
443 class MPIExecControllerLauncher(MPIExecLauncher):
444 """Launch a controller using mpiexec."""
444 """Launch a controller using mpiexec."""
445
445
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 # Command line arguments to ipcontroller.
447 # Command line arguments to ipcontroller.
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
449 n = Int(1, config=False)
449 n = Int(1, config=False)
450
450
451 def start(self, cluster_dir):
451 def start(self, cluster_dir):
452 """Start the controller by cluster_dir."""
452 """Start the controller by cluster_dir."""
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
454 self.cluster_dir = unicode(cluster_dir)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
456 return super(MPIExecControllerLauncher, self).start(1)
456 return super(MPIExecControllerLauncher, self).start(1)
457
457
458 def find_args(self):
458 def find_args(self):
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 self.controller_cmd + self.controller_args
460 self.controller_cmd + self.controller_args
461
461
462
462
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
464
464
465 program = List(ipengine_cmd_argv, config=True)
465 program = List(ipengine_cmd_argv, config=True)
466 # Command line arguments for ipengine.
466 # Command line arguments for ipengine.
467 program_args = List(
467 program_args = List(
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
469 )
469 )
470 n = Int(1, config=True)
470 n = Int(1, config=True)
471
471
472 def start(self, n, cluster_dir):
472 def start(self, n, cluster_dir):
473 """Start n engines by profile or cluster_dir."""
473 """Start n engines by profile or cluster_dir."""
474 self.program_args.extend(['--cluster-dir', cluster_dir])
474 self.program_args.extend(['--cluster-dir', cluster_dir])
475 self.cluster_dir = unicode(cluster_dir)
475 self.cluster_dir = unicode(cluster_dir)
476 self.n = n
476 self.n = n
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 return super(MPIExecEngineSetLauncher, self).start(n)
478 return super(MPIExecEngineSetLauncher, self).start(n)
479
479
480 #-----------------------------------------------------------------------------
480 #-----------------------------------------------------------------------------
481 # SSH launchers
481 # SSH launchers
482 #-----------------------------------------------------------------------------
482 #-----------------------------------------------------------------------------
483
483
484 # TODO: Get SSH Launcher working again.
484 # TODO: Get SSH Launcher working again.
485
485
486 class SSHLauncher(LocalProcessLauncher):
486 class SSHLauncher(LocalProcessLauncher):
487 """A minimal launcher for ssh.
487 """A minimal launcher for ssh.
488
488
489 To be useful this will probably have to be extended to use the ``sshx``
489 To be useful this will probably have to be extended to use the ``sshx``
490 idea for environment variables. There could be other things this needs
490 idea for environment variables. There could be other things this needs
491 as well.
491 as well.
492 """
492 """
493
493
494 ssh_cmd = List(['ssh'], config=True)
494 ssh_cmd = List(['ssh'], config=True)
495 ssh_args = List(['-tt'], config=True)
495 ssh_args = List(['-tt'], config=True)
496 program = List(['date'], config=True)
496 program = List(['date'], config=True)
497 program_args = List([], config=True)
497 program_args = List([], config=True)
498 hostname = CUnicode('', config=True)
498 hostname = CUnicode('', config=True)
499 user = CUnicode('', config=True)
499 user = CUnicode('', config=True)
500 location = CUnicode('')
500 location = CUnicode('')
501
501
502 def _hostname_changed(self, name, old, new):
502 def _hostname_changed(self, name, old, new):
503 if self.user:
503 if self.user:
504 self.location = u'%s@%s' % (self.user, new)
504 self.location = u'%s@%s' % (self.user, new)
505 else:
505 else:
506 self.location = new
506 self.location = new
507
507
508 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
509 self.location = u'%s@%s' % (new, self.hostname)
509 self.location = u'%s@%s' % (new, self.hostname)
510
510
511 def find_args(self):
511 def find_args(self):
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 self.program + self.program_args
513 self.program + self.program_args
514
514
515 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
516 self.cluster_dir = unicode(cluster_dir)
516 self.cluster_dir = unicode(cluster_dir)
517 if hostname is not None:
517 if hostname is not None:
518 self.hostname = hostname
518 self.hostname = hostname
519 if user is not None:
519 if user is not None:
520 self.user = user
520 self.user = user
521
521
522 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
523
524 def signal(self, sig):
524 def signal(self, sig):
525 if self.state == 'running':
525 if self.state == 'running':
526 # send escaped ssh connection-closer
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
528 self.process.stdin.flush()
529
529
530
530
531
531
532 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
533
533
534 program = List(ipcontroller_cmd_argv, config=True)
534 program = List(ipcontroller_cmd_argv, config=True)
535 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
537
537
538
538
539 class SSHEngineLauncher(SSHLauncher):
539 class SSHEngineLauncher(SSHLauncher):
540 program = List(ipengine_cmd_argv, config=True)
540 program = List(ipengine_cmd_argv, config=True)
541 # Command line arguments for ipengine.
541 # Command line arguments for ipengine.
542 program_args = List(
542 program_args = List(
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 )
544 )
545
545
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 launcher_class = SSHEngineLauncher
547 launcher_class = SSHEngineLauncher
548 engines = Dict(config=True)
548 engines = Dict(config=True)
549
549
550 def start(self, n, cluster_dir):
550 def start(self, n, cluster_dir):
551 """Start engines by profile or cluster_dir.
551 """Start engines by profile or cluster_dir.
552 `n` is ignored, and the `engines` config property is used instead.
552 `n` is ignored, and the `engines` config property is used instead.
553 """
553 """
554
554
555 self.cluster_dir = unicode(cluster_dir)
555 self.cluster_dir = unicode(cluster_dir)
556 dlist = []
556 dlist = []
557 for host, n in self.engines.iteritems():
557 for host, n in self.engines.iteritems():
558 if isinstance(n, (tuple, list)):
558 if isinstance(n, (tuple, list)):
559 n, args = n
559 n, args = n
560 else:
560 else:
561 args = copy.deepcopy(self.engine_args)
561 args = copy.deepcopy(self.engine_args)
562
562
563 if '@' in host:
563 if '@' in host:
564 user,host = host.split('@',1)
564 user,host = host.split('@',1)
565 else:
565 else:
566 user=None
566 user=None
567 for i in range(n):
567 for i in range(n):
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
569
569
570 # Copy the engine args over to each engine launcher.
570 # Copy the engine args over to each engine launcher.
571 i
571 i
572 el.program_args = args
572 el.program_args = args
573 el.on_stop(self._notice_engine_stopped)
573 el.on_stop(self._notice_engine_stopped)
574 d = el.start(cluster_dir, user=user, hostname=host)
574 d = el.start(cluster_dir, user=user, hostname=host)
575 if i==0:
575 if i==0:
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
577 self.launchers[host+str(i)] = el
577 self.launchers[host+str(i)] = el
578 dlist.append(d)
578 dlist.append(d)
579 self.notify_start(dlist)
579 self.notify_start(dlist)
580 return dlist
580 return dlist
581
581
582
582
583
583
584 #-----------------------------------------------------------------------------
584 #-----------------------------------------------------------------------------
585 # Windows HPC Server 2008 scheduler launchers
585 # Windows HPC Server 2008 scheduler launchers
586 #-----------------------------------------------------------------------------
586 #-----------------------------------------------------------------------------
587
587
588
588
589 # This is only used on Windows.
589 # This is only used on Windows.
590 def find_job_cmd():
590 def find_job_cmd():
591 if os.name=='nt':
591 if os.name=='nt':
592 try:
592 try:
593 return find_cmd('job')
593 return find_cmd('job')
594 except FindCmdError:
594 except (FindCmdError, ImportError):
595 # ImportError will be raised if win32api is not installed
595 return 'job'
596 return 'job'
596 else:
597 else:
597 return 'job'
598 return 'job'
598
599
599
600
600 class WindowsHPCLauncher(BaseLauncher):
601 class WindowsHPCLauncher(BaseLauncher):
601
602
602 # A regular expression used to get the job id from the output of the
603 # A regular expression used to get the job id from the output of the
603 # submit_command.
604 # submit_command.
604 job_id_regexp = Str(r'\d+', config=True)
605 job_id_regexp = Str(r'\d+', config=True)
605 # The filename of the instantiated job script.
606 # The filename of the instantiated job script.
606 job_file_name = CUnicode(u'ipython_job.xml', config=True)
607 job_file_name = CUnicode(u'ipython_job.xml', config=True)
607 # The full path to the instantiated job script. This gets made dynamically
608 # The full path to the instantiated job script. This gets made dynamically
608 # by combining the work_dir with the job_file_name.
609 # by combining the work_dir with the job_file_name.
609 job_file = CUnicode(u'')
610 job_file = CUnicode(u'')
610 # The hostname of the scheduler to submit the job to
611 # The hostname of the scheduler to submit the job to
611 scheduler = CUnicode('', config=True)
612 scheduler = CUnicode('', config=True)
612 job_cmd = CUnicode(find_job_cmd(), config=True)
613 job_cmd = CUnicode(find_job_cmd(), config=True)
613
614
614 def __init__(self, work_dir=u'.', config=None, **kwargs):
615 def __init__(self, work_dir=u'.', config=None, **kwargs):
615 super(WindowsHPCLauncher, self).__init__(
616 super(WindowsHPCLauncher, self).__init__(
616 work_dir=work_dir, config=config, **kwargs
617 work_dir=work_dir, config=config, **kwargs
617 )
618 )
618
619
619 @property
620 @property
620 def job_file(self):
621 def job_file(self):
621 return os.path.join(self.work_dir, self.job_file_name)
622 return os.path.join(self.work_dir, self.job_file_name)
622
623
623 def write_job_file(self, n):
624 def write_job_file(self, n):
624 raise NotImplementedError("Implement write_job_file in a subclass.")
625 raise NotImplementedError("Implement write_job_file in a subclass.")
625
626
626 def find_args(self):
627 def find_args(self):
627 return [u'job.exe']
628 return [u'job.exe']
628
629
629 def parse_job_id(self, output):
630 def parse_job_id(self, output):
630 """Take the output of the submit command and return the job id."""
631 """Take the output of the submit command and return the job id."""
631 m = re.search(self.job_id_regexp, output)
632 m = re.search(self.job_id_regexp, output)
632 if m is not None:
633 if m is not None:
633 job_id = m.group()
634 job_id = m.group()
634 else:
635 else:
635 raise LauncherError("Job id couldn't be determined: %s" % output)
636 raise LauncherError("Job id couldn't be determined: %s" % output)
636 self.job_id = job_id
637 self.job_id = job_id
637 self.log.info('Job started with job id: %r' % job_id)
638 self.log.info('Job started with job id: %r' % job_id)
638 return job_id
639 return job_id
639
640
640 def start(self, n):
641 def start(self, n):
641 """Start n copies of the process using the Win HPC job scheduler."""
642 """Start n copies of the process using the Win HPC job scheduler."""
642 self.write_job_file(n)
643 self.write_job_file(n)
643 args = [
644 args = [
644 'submit',
645 'submit',
645 '/jobfile:%s' % self.job_file,
646 '/jobfile:%s' % self.job_file,
646 '/scheduler:%s' % self.scheduler
647 '/scheduler:%s' % self.scheduler
647 ]
648 ]
648 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
649 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
649 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
650 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
650 output = check_output([self.job_cmd]+args,
651 output = check_output([self.job_cmd]+args,
651 env=os.environ,
652 env=os.environ,
652 cwd=self.work_dir,
653 cwd=self.work_dir,
653 stderr=STDOUT
654 stderr=STDOUT
654 )
655 )
655 job_id = self.parse_job_id(output)
656 job_id = self.parse_job_id(output)
656 self.notify_start(job_id)
657 self.notify_start(job_id)
657 return job_id
658 return job_id
658
659
659 def stop(self):
660 def stop(self):
660 args = [
661 args = [
661 'cancel',
662 'cancel',
662 self.job_id,
663 self.job_id,
663 '/scheduler:%s' % self.scheduler
664 '/scheduler:%s' % self.scheduler
664 ]
665 ]
665 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
666 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
666 try:
667 try:
667 output = check_output([self.job_cmd]+args,
668 output = check_output([self.job_cmd]+args,
668 env=os.environ,
669 env=os.environ,
669 cwd=self.work_dir,
670 cwd=self.work_dir,
670 stderr=STDOUT
671 stderr=STDOUT
671 )
672 )
672 except:
673 except:
673 output = 'The job already appears to be stoppped: %r' % self.job_id
674 output = 'The job already appears to be stoppped: %r' % self.job_id
674 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
675 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
675 return output
676 return output
676
677
677
678
678 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
679 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
679
680
680 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
681 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
681 extra_args = List([], config=False)
682 extra_args = List([], config=False)
682
683
683 def write_job_file(self, n):
684 def write_job_file(self, n):
684 job = IPControllerJob(config=self.config)
685 job = IPControllerJob(config=self.config)
685
686
686 t = IPControllerTask(config=self.config)
687 t = IPControllerTask(config=self.config)
687 # The tasks work directory is *not* the actual work directory of
688 # The tasks work directory is *not* the actual work directory of
688 # the controller. It is used as the base path for the stdout/stderr
689 # the controller. It is used as the base path for the stdout/stderr
689 # files that the scheduler redirects to.
690 # files that the scheduler redirects to.
690 t.work_directory = self.cluster_dir
691 t.work_directory = self.cluster_dir
691 # Add the --cluster-dir and from self.start().
692 # Add the --cluster-dir and from self.start().
692 t.controller_args.extend(self.extra_args)
693 t.controller_args.extend(self.extra_args)
693 job.add_task(t)
694 job.add_task(t)
694
695
695 self.log.info("Writing job description file: %s" % self.job_file)
696 self.log.info("Writing job description file: %s" % self.job_file)
696 job.write(self.job_file)
697 job.write(self.job_file)
697
698
698 @property
699 @property
699 def job_file(self):
700 def job_file(self):
700 return os.path.join(self.cluster_dir, self.job_file_name)
701 return os.path.join(self.cluster_dir, self.job_file_name)
701
702
702 def start(self, cluster_dir):
703 def start(self, cluster_dir):
703 """Start the controller by cluster_dir."""
704 """Start the controller by cluster_dir."""
704 self.extra_args = ['--cluster-dir', cluster_dir]
705 self.extra_args = ['--cluster-dir', cluster_dir]
705 self.cluster_dir = unicode(cluster_dir)
706 self.cluster_dir = unicode(cluster_dir)
706 return super(WindowsHPCControllerLauncher, self).start(1)
707 return super(WindowsHPCControllerLauncher, self).start(1)
707
708
708
709
709 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
710 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
710
711
711 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
712 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
712 extra_args = List([], config=False)
713 extra_args = List([], config=False)
713
714
714 def write_job_file(self, n):
715 def write_job_file(self, n):
715 job = IPEngineSetJob(config=self.config)
716 job = IPEngineSetJob(config=self.config)
716
717
717 for i in range(n):
718 for i in range(n):
718 t = IPEngineTask(config=self.config)
719 t = IPEngineTask(config=self.config)
719 # The tasks work directory is *not* the actual work directory of
720 # The tasks work directory is *not* the actual work directory of
720 # the engine. It is used as the base path for the stdout/stderr
721 # the engine. It is used as the base path for the stdout/stderr
721 # files that the scheduler redirects to.
722 # files that the scheduler redirects to.
722 t.work_directory = self.cluster_dir
723 t.work_directory = self.cluster_dir
723 # Add the --cluster-dir and from self.start().
724 # Add the --cluster-dir and from self.start().
724 t.engine_args.extend(self.extra_args)
725 t.engine_args.extend(self.extra_args)
725 job.add_task(t)
726 job.add_task(t)
726
727
727 self.log.info("Writing job description file: %s" % self.job_file)
728 self.log.info("Writing job description file: %s" % self.job_file)
728 job.write(self.job_file)
729 job.write(self.job_file)
729
730
730 @property
731 @property
731 def job_file(self):
732 def job_file(self):
732 return os.path.join(self.cluster_dir, self.job_file_name)
733 return os.path.join(self.cluster_dir, self.job_file_name)
733
734
734 def start(self, n, cluster_dir):
735 def start(self, n, cluster_dir):
735 """Start the controller by cluster_dir."""
736 """Start the controller by cluster_dir."""
736 self.extra_args = ['--cluster-dir', cluster_dir]
737 self.extra_args = ['--cluster-dir', cluster_dir]
737 self.cluster_dir = unicode(cluster_dir)
738 self.cluster_dir = unicode(cluster_dir)
738 return super(WindowsHPCEngineSetLauncher, self).start(n)
739 return super(WindowsHPCEngineSetLauncher, self).start(n)
739
740
740
741
741 #-----------------------------------------------------------------------------
742 #-----------------------------------------------------------------------------
742 # Batch (PBS) system launchers
743 # Batch (PBS) system launchers
743 #-----------------------------------------------------------------------------
744 #-----------------------------------------------------------------------------
744
745
745 class BatchSystemLauncher(BaseLauncher):
746 class BatchSystemLauncher(BaseLauncher):
746 """Launch an external process using a batch system.
747 """Launch an external process using a batch system.
747
748
748 This class is designed to work with UNIX batch systems like PBS, LSF,
749 This class is designed to work with UNIX batch systems like PBS, LSF,
749 GridEngine, etc. The overall model is that there are different commands
750 GridEngine, etc. The overall model is that there are different commands
750 like qsub, qdel, etc. that handle the starting and stopping of the process.
751 like qsub, qdel, etc. that handle the starting and stopping of the process.
751
752
752 This class also has the notion of a batch script. The ``batch_template``
753 This class also has the notion of a batch script. The ``batch_template``
753 attribute can be set to a string that is a template for the batch script.
754 attribute can be set to a string that is a template for the batch script.
754 This template is instantiated using Itpl. Thus the template can use
755 This template is instantiated using Itpl. Thus the template can use
755 ${n} fot the number of instances. Subclasses can add additional variables
756 ${n} fot the number of instances. Subclasses can add additional variables
756 to the template dict.
757 to the template dict.
757 """
758 """
758
759
759 # Subclasses must fill these in. See PBSEngineSet
760 # Subclasses must fill these in. See PBSEngineSet
760 # The name of the command line program used to submit jobs.
761 # The name of the command line program used to submit jobs.
761 submit_command = List([''], config=True)
762 submit_command = List([''], config=True)
762 # The name of the command line program used to delete jobs.
763 # The name of the command line program used to delete jobs.
763 delete_command = List([''], config=True)
764 delete_command = List([''], config=True)
764 # A regular expression used to get the job id from the output of the
765 # A regular expression used to get the job id from the output of the
765 # submit_command.
766 # submit_command.
766 job_id_regexp = CUnicode('', config=True)
767 job_id_regexp = CUnicode('', config=True)
767 # The string that is the batch script template itself.
768 # The string that is the batch script template itself.
768 batch_template = CUnicode('', config=True)
769 batch_template = CUnicode('', config=True)
769 # The file that contains the batch template
770 # The file that contains the batch template
770 batch_template_file = CUnicode(u'', config=True)
771 batch_template_file = CUnicode(u'', config=True)
771 # The filename of the instantiated batch script.
772 # The filename of the instantiated batch script.
772 batch_file_name = CUnicode(u'batch_script', config=True)
773 batch_file_name = CUnicode(u'batch_script', config=True)
773 # The PBS Queue
774 # The PBS Queue
774 queue = CUnicode(u'', config=True)
775 queue = CUnicode(u'', config=True)
775
776
776 # not configurable, override in subclasses
777 # not configurable, override in subclasses
777 # PBS Job Array regex
778 # PBS Job Array regex
778 job_array_regexp = CUnicode('')
779 job_array_regexp = CUnicode('')
779 job_array_template = CUnicode('')
780 job_array_template = CUnicode('')
780 # PBS Queue regex
781 # PBS Queue regex
781 queue_regexp = CUnicode('')
782 queue_regexp = CUnicode('')
782 queue_template = CUnicode('')
783 queue_template = CUnicode('')
783 # The default batch template, override in subclasses
784 # The default batch template, override in subclasses
784 default_template = CUnicode('')
785 default_template = CUnicode('')
785 # The full path to the instantiated batch script.
786 # The full path to the instantiated batch script.
786 batch_file = CUnicode(u'')
787 batch_file = CUnicode(u'')
787 # the format dict used with batch_template:
788 # the format dict used with batch_template:
788 context = Dict()
789 context = Dict()
789
790
790
791
791 def find_args(self):
792 def find_args(self):
792 return self.submit_command + [self.batch_file]
793 return self.submit_command + [self.batch_file]
793
794
794 def __init__(self, work_dir=u'.', config=None, **kwargs):
795 def __init__(self, work_dir=u'.', config=None, **kwargs):
795 super(BatchSystemLauncher, self).__init__(
796 super(BatchSystemLauncher, self).__init__(
796 work_dir=work_dir, config=config, **kwargs
797 work_dir=work_dir, config=config, **kwargs
797 )
798 )
798 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
799 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
799
800
800 def parse_job_id(self, output):
801 def parse_job_id(self, output):
801 """Take the output of the submit command and return the job id."""
802 """Take the output of the submit command and return the job id."""
802 m = re.search(self.job_id_regexp, output)
803 m = re.search(self.job_id_regexp, output)
803 if m is not None:
804 if m is not None:
804 job_id = m.group()
805 job_id = m.group()
805 else:
806 else:
806 raise LauncherError("Job id couldn't be determined: %s" % output)
807 raise LauncherError("Job id couldn't be determined: %s" % output)
807 self.job_id = job_id
808 self.job_id = job_id
808 self.log.info('Job submitted with job id: %r' % job_id)
809 self.log.info('Job submitted with job id: %r' % job_id)
809 return job_id
810 return job_id
810
811
811 def write_batch_script(self, n):
812 def write_batch_script(self, n):
812 """Instantiate and write the batch script to the work_dir."""
813 """Instantiate and write the batch script to the work_dir."""
813 self.context['n'] = n
814 self.context['n'] = n
814 self.context['queue'] = self.queue
815 self.context['queue'] = self.queue
815 print self.context
816 print self.context
816 # first priority is batch_template if set
817 # first priority is batch_template if set
817 if self.batch_template_file and not self.batch_template:
818 if self.batch_template_file and not self.batch_template:
818 # second priority is batch_template_file
819 # second priority is batch_template_file
819 with open(self.batch_template_file) as f:
820 with open(self.batch_template_file) as f:
820 self.batch_template = f.read()
821 self.batch_template = f.read()
821 if not self.batch_template:
822 if not self.batch_template:
822 # third (last) priority is default_template
823 # third (last) priority is default_template
823 self.batch_template = self.default_template
824 self.batch_template = self.default_template
824
825
825 regex = re.compile(self.job_array_regexp)
826 regex = re.compile(self.job_array_regexp)
826 # print regex.search(self.batch_template)
827 # print regex.search(self.batch_template)
827 if not regex.search(self.batch_template):
828 if not regex.search(self.batch_template):
828 self.log.info("adding job array settings to batch script")
829 self.log.info("adding job array settings to batch script")
829 firstline, rest = self.batch_template.split('\n',1)
830 firstline, rest = self.batch_template.split('\n',1)
830 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831
832
832 regex = re.compile(self.queue_regexp)
833 regex = re.compile(self.queue_regexp)
833 # print regex.search(self.batch_template)
834 # print regex.search(self.batch_template)
834 if self.queue and not regex.search(self.batch_template):
835 if self.queue and not regex.search(self.batch_template):
835 self.log.info("adding PBS queue settings to batch script")
836 self.log.info("adding PBS queue settings to batch script")
836 firstline, rest = self.batch_template.split('\n',1)
837 firstline, rest = self.batch_template.split('\n',1)
837 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838
839
839 script_as_string = Itpl.itplns(self.batch_template, self.context)
840 script_as_string = Itpl.itplns(self.batch_template, self.context)
840 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
841 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
841
842
842 with open(self.batch_file, 'w') as f:
843 with open(self.batch_file, 'w') as f:
843 f.write(script_as_string)
844 f.write(script_as_string)
844 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
845 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
845
846
846 def start(self, n, cluster_dir):
847 def start(self, n, cluster_dir):
847 """Start n copies of the process using a batch system."""
848 """Start n copies of the process using a batch system."""
848 # Here we save profile and cluster_dir in the context so they
849 # Here we save profile and cluster_dir in the context so they
849 # can be used in the batch script template as ${profile} and
850 # can be used in the batch script template as ${profile} and
850 # ${cluster_dir}
851 # ${cluster_dir}
851 self.context['cluster_dir'] = cluster_dir
852 self.context['cluster_dir'] = cluster_dir
852 self.cluster_dir = unicode(cluster_dir)
853 self.cluster_dir = unicode(cluster_dir)
853 self.write_batch_script(n)
854 self.write_batch_script(n)
854 output = check_output(self.args, env=os.environ)
855 output = check_output(self.args, env=os.environ)
855
856
856 job_id = self.parse_job_id(output)
857 job_id = self.parse_job_id(output)
857 self.notify_start(job_id)
858 self.notify_start(job_id)
858 return job_id
859 return job_id
859
860
860 def stop(self):
861 def stop(self):
861 output = check_output(self.delete_command+[self.job_id], env=os.environ)
862 output = check_output(self.delete_command+[self.job_id], env=os.environ)
862 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
863 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
863 return output
864 return output
864
865
865
866
866 class PBSLauncher(BatchSystemLauncher):
867 class PBSLauncher(BatchSystemLauncher):
867 """A BatchSystemLauncher subclass for PBS."""
868 """A BatchSystemLauncher subclass for PBS."""
868
869
869 submit_command = List(['qsub'], config=True)
870 submit_command = List(['qsub'], config=True)
870 delete_command = List(['qdel'], config=True)
871 delete_command = List(['qdel'], config=True)
871 job_id_regexp = CUnicode(r'\d+', config=True)
872 job_id_regexp = CUnicode(r'\d+', config=True)
872
873
873 batch_file = CUnicode(u'')
874 batch_file = CUnicode(u'')
874 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 job_array_template = CUnicode('#PBS -t 1-$n')
876 job_array_template = CUnicode('#PBS -t 1-$n')
876 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 queue_template = CUnicode('#PBS -q $queue')
878 queue_template = CUnicode('#PBS -q $queue')
878
879
879
880
880 class PBSControllerLauncher(PBSLauncher):
881 class PBSControllerLauncher(PBSLauncher):
881 """Launch a controller using PBS."""
882 """Launch a controller using PBS."""
882
883
883 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 default_template= CUnicode("""#!/bin/sh
885 default_template= CUnicode("""#!/bin/sh
885 #PBS -V
886 #PBS -V
886 #PBS -N ipcontroller
887 #PBS -N ipcontroller
887 %s --log-to-file --cluster-dir $cluster_dir
888 %s --log-to-file --cluster-dir $cluster_dir
888 """%(' '.join(ipcontroller_cmd_argv)))
889 """%(' '.join(ipcontroller_cmd_argv)))
889
890
890 def start(self, cluster_dir):
891 def start(self, cluster_dir):
891 """Start the controller by profile or cluster_dir."""
892 """Start the controller by profile or cluster_dir."""
892 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
893 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
893 return super(PBSControllerLauncher, self).start(1, cluster_dir)
894 return super(PBSControllerLauncher, self).start(1, cluster_dir)
894
895
895
896
896 class PBSEngineSetLauncher(PBSLauncher):
897 class PBSEngineSetLauncher(PBSLauncher):
897 """Launch Engines using PBS"""
898 """Launch Engines using PBS"""
898 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 default_template= CUnicode(u"""#!/bin/sh
900 default_template= CUnicode(u"""#!/bin/sh
900 #PBS -V
901 #PBS -V
901 #PBS -N ipengine
902 #PBS -N ipengine
902 %s --cluster-dir $cluster_dir
903 %s --cluster-dir $cluster_dir
903 """%(' '.join(ipengine_cmd_argv)))
904 """%(' '.join(ipengine_cmd_argv)))
904
905
905 def start(self, n, cluster_dir):
906 def start(self, n, cluster_dir):
906 """Start n engines by profile or cluster_dir."""
907 """Start n engines by profile or cluster_dir."""
907 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909
910
910 #SGE is very similar to PBS
911 #SGE is very similar to PBS
911
912
912 class SGELauncher(PBSLauncher):
913 class SGELauncher(PBSLauncher):
913 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 job_array_template = CUnicode('#$$ -t 1-$n')
916 job_array_template = CUnicode('#$$ -t 1-$n')
916 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 queue_template = CUnicode('#$$ -q $queue')
918 queue_template = CUnicode('#$$ -q $queue')
918
919
919 class SGEControllerLauncher(SGELauncher):
920 class SGEControllerLauncher(SGELauncher):
920 """Launch a controller using SGE."""
921 """Launch a controller using SGE."""
921
922
922 batch_file_name = CUnicode(u'sge_controller', config=True)
923 batch_file_name = CUnicode(u'sge_controller', config=True)
923 default_template= CUnicode(u"""#$$ -V
924 default_template= CUnicode(u"""#$$ -V
924 #$$ -S /bin/sh
925 #$$ -S /bin/sh
925 #$$ -N ipcontroller
926 #$$ -N ipcontroller
926 %s --log-to-file --cluster-dir $cluster_dir
927 %s --log-to-file --cluster-dir $cluster_dir
927 """%(' '.join(ipcontroller_cmd_argv)))
928 """%(' '.join(ipcontroller_cmd_argv)))
928
929
929 def start(self, cluster_dir):
930 def start(self, cluster_dir):
930 """Start the controller by profile or cluster_dir."""
931 """Start the controller by profile or cluster_dir."""
931 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933
934
934 class SGEEngineSetLauncher(SGELauncher):
935 class SGEEngineSetLauncher(SGELauncher):
935 """Launch Engines with SGE"""
936 """Launch Engines with SGE"""
936 batch_file_name = CUnicode(u'sge_engines', config=True)
937 batch_file_name = CUnicode(u'sge_engines', config=True)
937 default_template = CUnicode("""#$$ -V
938 default_template = CUnicode("""#$$ -V
938 #$$ -S /bin/sh
939 #$$ -S /bin/sh
939 #$$ -N ipengine
940 #$$ -N ipengine
940 %s --cluster-dir $cluster_dir
941 %s --cluster-dir $cluster_dir
941 """%(' '.join(ipengine_cmd_argv)))
942 """%(' '.join(ipengine_cmd_argv)))
942
943
943 def start(self, n, cluster_dir):
944 def start(self, n, cluster_dir):
944 """Start n engines by profile or cluster_dir."""
945 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947
948
948
949
949 #-----------------------------------------------------------------------------
950 #-----------------------------------------------------------------------------
950 # A launcher for ipcluster itself!
951 # A launcher for ipcluster itself!
951 #-----------------------------------------------------------------------------
952 #-----------------------------------------------------------------------------
952
953
953
954
954 class IPClusterLauncher(LocalProcessLauncher):
955 class IPClusterLauncher(LocalProcessLauncher):
955 """Launch the ipcluster program in an external process."""
956 """Launch the ipcluster program in an external process."""
956
957
957 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
958 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
958 # Command line arguments to pass to ipcluster.
959 # Command line arguments to pass to ipcluster.
959 ipcluster_args = List(
960 ipcluster_args = List(
960 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
961 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
961 ipcluster_subcommand = Str('start')
962 ipcluster_subcommand = Str('start')
962 ipcluster_n = Int(2)
963 ipcluster_n = Int(2)
963
964
964 def find_args(self):
965 def find_args(self):
965 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
966 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
966 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
967 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
967
968
968 def start(self):
969 def start(self):
969 self.log.info("Starting ipcluster: %r" % self.args)
970 self.log.info("Starting ipcluster: %r" % self.args)
970 return super(IPClusterLauncher, self).start()
971 return super(IPClusterLauncher, self).start()
971
972
General Comments 0
You need to be logged in to leave comments. Login now