##// END OF EJS Templates
Made ipcontroller/ipengine/ipcluster command locations configurable....
bgranger -
Show More
@@ -1,700 +1,700 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26
26
27 from twisted.internet import reactor, defer
27 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
30 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
32 from twisted.python import log
33 from twisted.python.failure import Failure
33 from twisted.python.failure import Failure
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Generic launchers
36 # Generic launchers
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 class LauncherError(Exception):
40 class LauncherError(Exception):
41 pass
41 pass
42
42
43
43
44 class ProcessStateError(LauncherError):
44 class ProcessStateError(LauncherError):
45 pass
45 pass
46
46
47
47
48 class UnknownStatus(LauncherError):
48 class UnknownStatus(LauncherError):
49 pass
49 pass
50
50
51
51
52 class BaseLauncher(Component):
52 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
53 """An asbtraction for starting, stopping and signaling a process."""
54
54
55 # A directory for files related to the process. But, we don't cd to
55 # A directory for files related to the process. But, we don't cd to
56 # this directory,
56 # this directory,
57 working_dir = Unicode(u'')
57 working_dir = Unicode(u'')
58
58
59 def __init__(self, working_dir, parent=None, name=None, config=None):
59 def __init__(self, working_dir, parent=None, name=None, config=None):
60 super(BaseLauncher, self).__init__(parent, name, config)
60 super(BaseLauncher, self).__init__(parent, name, config)
61 self.working_dir = working_dir
61 self.working_dir = working_dir
62 self.state = 'before' # can be before, running, after
62 self.state = 'before' # can be before, running, after
63 self.stop_deferreds = []
63 self.stop_deferreds = []
64 self.start_data = None
64 self.start_data = None
65 self.stop_data = None
65 self.stop_data = None
66
66
67 @property
67 @property
68 def args(self):
68 def args(self):
69 """A list of cmd and args that will be used to start the process.
69 """A list of cmd and args that will be used to start the process.
70
70
71 This is what is passed to :func:`spawnProcess` and the first element
71 This is what is passed to :func:`spawnProcess` and the first element
72 will be the process name.
72 will be the process name.
73 """
73 """
74 return self.find_args()
74 return self.find_args()
75
75
76 def find_args(self):
76 def find_args(self):
77 """The ``.args`` property calls this to find the args list.
77 """The ``.args`` property calls this to find the args list.
78
78
79 Subcommand should implement this to construct the cmd and args.
79 Subcommand should implement this to construct the cmd and args.
80 """
80 """
81 raise NotImplementedError('find_args must be implemented in a subclass')
81 raise NotImplementedError('find_args must be implemented in a subclass')
82
82
83 @property
83 @property
84 def arg_str(self):
84 def arg_str(self):
85 """The string form of the program arguments."""
85 """The string form of the program arguments."""
86 return ' '.join(self.args)
86 return ' '.join(self.args)
87
87
88 @property
88 @property
89 def running(self):
89 def running(self):
90 """Am I running."""
90 """Am I running."""
91 if self.state == 'running':
91 if self.state == 'running':
92 return True
92 return True
93 else:
93 else:
94 return False
94 return False
95
95
96 def start(self):
96 def start(self):
97 """Start the process.
97 """Start the process.
98
98
99 This must return a deferred that fires with information about the
99 This must return a deferred that fires with information about the
100 process starting (like a pid, job id, etc.).
100 process starting (like a pid, job id, etc.).
101 """
101 """
102 return defer.fail(
102 return defer.fail(
103 Failure(NotImplementedError(
103 Failure(NotImplementedError(
104 'start must be implemented in a subclass')
104 'start must be implemented in a subclass')
105 )
105 )
106 )
106 )
107
107
108 def stop(self):
108 def stop(self):
109 """Stop the process and notify observers of stopping.
109 """Stop the process and notify observers of stopping.
110
110
111 This must return a deferred that fires with information about the
111 This must return a deferred that fires with information about the
112 processing stopping, like errors that occur while the process is
112 processing stopping, like errors that occur while the process is
113 attempting to be shut down. This deferred won't fire when the process
113 attempting to be shut down. This deferred won't fire when the process
114 actually stops. To observe the actual process stopping, see
114 actually stops. To observe the actual process stopping, see
115 :func:`observe_stop`.
115 :func:`observe_stop`.
116 """
116 """
117 return defer.fail(
117 return defer.fail(
118 Failure(NotImplementedError(
118 Failure(NotImplementedError(
119 'stop must be implemented in a subclass')
119 'stop must be implemented in a subclass')
120 )
120 )
121 )
121 )
122
122
123 def observe_stop(self):
123 def observe_stop(self):
124 """Get a deferred that will fire when the process stops.
124 """Get a deferred that will fire when the process stops.
125
125
126 The deferred will fire with data that contains information about
126 The deferred will fire with data that contains information about
127 the exit status of the process.
127 the exit status of the process.
128 """
128 """
129 if self.state=='after':
129 if self.state=='after':
130 return defer.succeed(self.stop_data)
130 return defer.succeed(self.stop_data)
131 else:
131 else:
132 d = defer.Deferred()
132 d = defer.Deferred()
133 self.stop_deferreds.append(d)
133 self.stop_deferreds.append(d)
134 return d
134 return d
135
135
136 def notify_start(self, data):
136 def notify_start(self, data):
137 """Call this to trigger startup actions.
137 """Call this to trigger startup actions.
138
138
139 This logs the process startup and sets the state to 'running'. It is
139 This logs the process startup and sets the state to 'running'. It is
140 a pass-through so it can be used as a callback.
140 a pass-through so it can be used as a callback.
141 """
141 """
142
142
143 log.msg('Process %r started: %r' % (self.args[0], data))
143 log.msg('Process %r started: %r' % (self.args[0], data))
144 self.start_data = data
144 self.start_data = data
145 self.state = 'running'
145 self.state = 'running'
146 return data
146 return data
147
147
148 def notify_stop(self, data):
148 def notify_stop(self, data):
149 """Call this to trigger process stop actions.
149 """Call this to trigger process stop actions.
150
150
151 This logs the process stopping and sets the state to 'after'. Call
151 This logs the process stopping and sets the state to 'after'. Call
152 this to trigger all the deferreds from :func:`observe_stop`."""
152 this to trigger all the deferreds from :func:`observe_stop`."""
153
153
154 log.msg('Process %r stopped: %r' % (self.args[0], data))
154 log.msg('Process %r stopped: %r' % (self.args[0], data))
155 self.stop_data = data
155 self.stop_data = data
156 self.state = 'after'
156 self.state = 'after'
157 for i in range(len(self.stop_deferreds)):
157 for i in range(len(self.stop_deferreds)):
158 d = self.stop_deferreds.pop()
158 d = self.stop_deferreds.pop()
159 d.callback(data)
159 d.callback(data)
160 return data
160 return data
161
161
162 def signal(self, sig):
162 def signal(self, sig):
163 """Signal the process.
163 """Signal the process.
164
164
165 Return a semi-meaningless deferred after signaling the process.
165 Return a semi-meaningless deferred after signaling the process.
166
166
167 Parameters
167 Parameters
168 ----------
168 ----------
169 sig : str or int
169 sig : str or int
170 'KILL', 'INT', etc., or any signal number
170 'KILL', 'INT', etc., or any signal number
171 """
171 """
172 return defer.fail(
172 return defer.fail(
173 Failure(NotImplementedError(
173 Failure(NotImplementedError(
174 'signal must be implemented in a subclass')
174 'signal must be implemented in a subclass')
175 )
175 )
176 )
176 )
177
177
178
178
179 class LocalProcessLauncherProtocol(ProcessProtocol):
179 class LocalProcessLauncherProtocol(ProcessProtocol):
180 """A ProcessProtocol to go with the LocalProcessLauncher."""
180 """A ProcessProtocol to go with the LocalProcessLauncher."""
181
181
182 def __init__(self, process_launcher):
182 def __init__(self, process_launcher):
183 self.process_launcher = process_launcher
183 self.process_launcher = process_launcher
184 self.pid = None
184 self.pid = None
185
185
186 def connectionMade(self):
186 def connectionMade(self):
187 self.pid = self.transport.pid
187 self.pid = self.transport.pid
188 self.process_launcher.notify_start(self.transport.pid)
188 self.process_launcher.notify_start(self.transport.pid)
189
189
190 def processEnded(self, status):
190 def processEnded(self, status):
191 value = status.value
191 value = status.value
192 if isinstance(value, ProcessDone):
192 if isinstance(value, ProcessDone):
193 self.process_launcher.notify_stop(
193 self.process_launcher.notify_stop(
194 {'exit_code':0,
194 {'exit_code':0,
195 'signal':None,
195 'signal':None,
196 'status':None,
196 'status':None,
197 'pid':self.pid
197 'pid':self.pid
198 }
198 }
199 )
199 )
200 elif isinstance(value, ProcessTerminated):
200 elif isinstance(value, ProcessTerminated):
201 self.process_launcher.notify_stop(
201 self.process_launcher.notify_stop(
202 {'exit_code':value.exitCode,
202 {'exit_code':value.exitCode,
203 'signal':value.signal,
203 'signal':value.signal,
204 'status':value.status,
204 'status':value.status,
205 'pid':self.pid
205 'pid':self.pid
206 }
206 }
207 )
207 )
208 else:
208 else:
209 raise UnknownStatus("Unknown exit status, this is probably a "
209 raise UnknownStatus("Unknown exit status, this is probably a "
210 "bug in Twisted")
210 "bug in Twisted")
211
211
212 def outReceived(self, data):
212 def outReceived(self, data):
213 log.msg(data)
213 log.msg(data)
214
214
215 def errReceived(self, data):
215 def errReceived(self, data):
216 log.err(data)
216 log.err(data)
217
217
218
218
219 class LocalProcessLauncher(BaseLauncher):
219 class LocalProcessLauncher(BaseLauncher):
220 """Start and stop an external process in an asynchronous manner."""
220 """Start and stop an external process in an asynchronous manner."""
221
221
222 # This is used to to construct self.args, which is passed to
222 # This is used to to construct self.args, which is passed to
223 # spawnProcess.
223 # spawnProcess.
224 cmd_and_args = List([])
224 cmd_and_args = List([])
225
225
226 def __init__(self, working_dir, parent=None, name=None, config=None):
226 def __init__(self, working_dir, parent=None, name=None, config=None):
227 super(LocalProcessLauncher, self).__init__(
227 super(LocalProcessLauncher, self).__init__(
228 working_dir, parent, name, config
228 working_dir, parent, name, config
229 )
229 )
230 self.process_protocol = None
230 self.process_protocol = None
231 self.start_deferred = None
231 self.start_deferred = None
232
232
233 def find_args(self):
233 def find_args(self):
234 return self.cmd_and_args
234 return self.cmd_and_args
235
235
236 def start(self):
236 def start(self):
237 if self.state == 'before':
237 if self.state == 'before':
238 self.process_protocol = LocalProcessLauncherProtocol(self)
238 self.process_protocol = LocalProcessLauncherProtocol(self)
239 self.start_deferred = defer.Deferred()
239 self.start_deferred = defer.Deferred()
240 self.process_transport = reactor.spawnProcess(
240 self.process_transport = reactor.spawnProcess(
241 self.process_protocol,
241 self.process_protocol,
242 str(self.args[0]),
242 str(self.args[0]),
243 [str(a) for a in self.args],
243 [str(a) for a in self.args],
244 env=os.environ
244 env=os.environ
245 )
245 )
246 return self.start_deferred
246 return self.start_deferred
247 else:
247 else:
248 s = 'The process was already started and has state: %r' % self.state
248 s = 'The process was already started and has state: %r' % self.state
249 return defer.fail(ProcessStateError(s))
249 return defer.fail(ProcessStateError(s))
250
250
251 def notify_start(self, data):
251 def notify_start(self, data):
252 super(LocalProcessLauncher, self).notify_start(data)
252 super(LocalProcessLauncher, self).notify_start(data)
253 self.start_deferred.callback(data)
253 self.start_deferred.callback(data)
254
254
255 def stop(self):
255 def stop(self):
256 return self.interrupt_then_kill()
256 return self.interrupt_then_kill()
257
257
258 @make_deferred
258 @make_deferred
259 def signal(self, sig):
259 def signal(self, sig):
260 if self.state == 'running':
260 if self.state == 'running':
261 self.process_transport.signalProcess(sig)
261 self.process_transport.signalProcess(sig)
262
262
263 @inlineCallbacks
263 @inlineCallbacks
264 def interrupt_then_kill(self, delay=2.0):
264 def interrupt_then_kill(self, delay=2.0):
265 """Send INT, wait a delay and then send KILL."""
265 """Send INT, wait a delay and then send KILL."""
266 yield self.signal('INT')
266 yield self.signal('INT')
267 yield sleep_deferred(delay)
267 yield sleep_deferred(delay)
268 yield self.signal('KILL')
268 yield self.signal('KILL')
269
269
270
270
271 class MPIExecLauncher(LocalProcessLauncher):
271 class MPIExecLauncher(LocalProcessLauncher):
272 """Launch an external process using mpiexec."""
272 """Launch an external process using mpiexec."""
273
273
274 # The mpiexec command to use in starting the process.
274 # The mpiexec command to use in starting the process.
275 mpi_cmd = List(['mpiexec'], config=True)
275 mpi_cmd = List(['mpiexec'], config=True)
276 # The command line arguments to pass to mpiexec.
276 # The command line arguments to pass to mpiexec.
277 mpi_args = List([], config=True)
277 mpi_args = List([], config=True)
278 # The program to start using mpiexec.
278 # The program to start using mpiexec.
279 program = List(['date'], config=True)
279 program = List(['date'], config=True)
280 # The command line argument to the program.
280 # The command line argument to the program.
281 program_args = List([], config=True)
281 program_args = List([], config=True)
282 # The number of instances of the program to start.
282 # The number of instances of the program to start.
283 n = Int(1, config=True)
283 n = Int(1, config=True)
284
284
285 def find_args(self):
285 def find_args(self):
286 """Build self.args using all the fields."""
286 """Build self.args using all the fields."""
287 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
287 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
288 self.program + self.program_args
288 self.program + self.program_args
289
289
290 def start(self, n):
290 def start(self, n):
291 """Start n instances of the program using mpiexec."""
291 """Start n instances of the program using mpiexec."""
292 self.n = n
292 self.n = n
293 return super(MPIExecLauncher, self).start()
293 return super(MPIExecLauncher, self).start()
294
294
295
295
296 class SSHLauncher(BaseLauncher):
296 class SSHLauncher(BaseLauncher):
297 """A minimal launcher for ssh.
297 """A minimal launcher for ssh.
298
298
299 To be useful this will probably have to be extended to use the ``sshx``
299 To be useful this will probably have to be extended to use the ``sshx``
300 idea for environment variables. There could be other things this needs
300 idea for environment variables. There could be other things this needs
301 as well.
301 as well.
302 """
302 """
303
303
304 ssh_cmd = List(['ssh'], config=True)
304 ssh_cmd = List(['ssh'], config=True)
305 ssh_args = List([], config=True)
305 ssh_args = List([], config=True)
306 program = List(['date'], config=True)
306 program = List(['date'], config=True)
307 program_args = List([], config=True)
307 program_args = List([], config=True)
308 hostname = Str('', config=True)
308 hostname = Str('', config=True)
309 user = Str('', config=True)
309 user = Str('', config=True)
310 location = Str('')
310 location = Str('')
311
311
312 def _hostname_changed(self, name, old, new):
312 def _hostname_changed(self, name, old, new):
313 self.location = '%s@%s' % (self.user, new)
313 self.location = '%s@%s' % (self.user, new)
314
314
315 def _user_changed(self, name, old, new):
315 def _user_changed(self, name, old, new):
316 self.location = '%s@%s' % (new, self.hostname)
316 self.location = '%s@%s' % (new, self.hostname)
317
317
318 def find_args(self):
318 def find_args(self):
319 return self.ssh_cmd + self.ssh_args + [self.location] + \
319 return self.ssh_cmd + self.ssh_args + [self.location] + \
320 self.program + self.program_args
320 self.program + self.program_args
321
321
322 def start(self, n, hostname=None, user=None):
322 def start(self, n, hostname=None, user=None):
323 if hostname is not None:
323 if hostname is not None:
324 self.hostname = hostname
324 self.hostname = hostname
325 if user is not None:
325 if user is not None:
326 self.user = user
326 self.user = user
327 return super(SSHLauncher, self).start()
327 return super(SSHLauncher, self).start()
328
328
329
329
330 class WindowsHPCLauncher(BaseLauncher):
330 class WindowsHPCLauncher(BaseLauncher):
331 pass
331 pass
332
332
333
333
334 class BatchSystemLauncher(BaseLauncher):
334 class BatchSystemLauncher(BaseLauncher):
335 """Launch an external process using a batch system.
335 """Launch an external process using a batch system.
336
336
337 This class is designed to work with UNIX batch systems like PBS, LSF,
337 This class is designed to work with UNIX batch systems like PBS, LSF,
338 GridEngine, etc. The overall model is that there are different commands
338 GridEngine, etc. The overall model is that there are different commands
339 like qsub, qdel, etc. that handle the starting and stopping of the process.
339 like qsub, qdel, etc. that handle the starting and stopping of the process.
340
340
341 This class also has the notion of a batch script. The ``batch_template``
341 This class also has the notion of a batch script. The ``batch_template``
342 attribute can be set to a string that is a template for the batch script.
342 attribute can be set to a string that is a template for the batch script.
343 This template is instantiated using Itpl. Thus the template can use
343 This template is instantiated using Itpl. Thus the template can use
344 ${n} fot the number of instances. Subclasses can add additional variables
344 ${n} fot the number of instances. Subclasses can add additional variables
345 to the template dict.
345 to the template dict.
346 """
346 """
347
347
348 # Subclasses must fill these in. See PBSEngineSet
348 # Subclasses must fill these in. See PBSEngineSet
349 # The name of the command line program used to submit jobs.
349 # The name of the command line program used to submit jobs.
350 submit_command = Str('', config=True)
350 submit_command = Str('', config=True)
351 # The name of the command line program used to delete jobs.
351 # The name of the command line program used to delete jobs.
352 delete_command = Str('', config=True)
352 delete_command = Str('', config=True)
353 # A regular expression used to get the job id from the output of the
353 # A regular expression used to get the job id from the output of the
354 # submit_command.
354 # submit_command.
355 job_id_regexp = Str('', config=True)
355 job_id_regexp = Str('', config=True)
356 # The string that is the batch script template itself.
356 # The string that is the batch script template itself.
357 batch_template = Str('', config=True)
357 batch_template = Str('', config=True)
358 # The filename of the instantiated batch script.
358 # The filename of the instantiated batch script.
359 batch_file_name = Unicode(u'batch_script', config=True)
359 batch_file_name = Unicode(u'batch_script', config=True)
360 # The full path to the instantiated batch script.
360 # The full path to the instantiated batch script.
361 batch_file = Unicode(u'')
361 batch_file = Unicode(u'')
362
362
363 def __init__(self, working_dir, parent=None, name=None, config=None):
363 def __init__(self, working_dir, parent=None, name=None, config=None):
364 super(BatchSystemLauncher, self).__init__(
364 super(BatchSystemLauncher, self).__init__(
365 working_dir, parent, name, config
365 working_dir, parent, name, config
366 )
366 )
367 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
367 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
368 self.context = {}
368 self.context = {}
369
369
370 def parse_job_id(self, output):
370 def parse_job_id(self, output):
371 """Take the output of the submit command and return the job id."""
371 """Take the output of the submit command and return the job id."""
372 m = re.match(self.job_id_regexp, output)
372 m = re.match(self.job_id_regexp, output)
373 if m is not None:
373 if m is not None:
374 job_id = m.group()
374 job_id = m.group()
375 else:
375 else:
376 raise LauncherError("Job id couldn't be determined: %s" % output)
376 raise LauncherError("Job id couldn't be determined: %s" % output)
377 self.job_id = job_id
377 self.job_id = job_id
378 log.msg('Job started with job id: %r' % job_id)
378 log.msg('Job started with job id: %r' % job_id)
379 return job_id
379 return job_id
380
380
381 def write_batch_script(self, n):
381 def write_batch_script(self, n):
382 """Instantiate and write the batch script to the working_dir."""
382 """Instantiate and write the batch script to the working_dir."""
383 self.context['n'] = n
383 self.context['n'] = n
384 script_as_string = Itpl.itplns(self.batch_template, self.context)
384 script_as_string = Itpl.itplns(self.batch_template, self.context)
385 log.msg('Writing instantiated batch script: %s' % self.batch_file)
385 log.msg('Writing instantiated batch script: %s' % self.batch_file)
386 f = open(self.batch_file, 'w')
386 f = open(self.batch_file, 'w')
387 f.write(script_as_string)
387 f.write(script_as_string)
388 f.close()
388 f.close()
389
389
390 @inlineCallbacks
390 @inlineCallbacks
391 def start(self, n):
391 def start(self, n):
392 """Start n copies of the process using a batch system."""
392 """Start n copies of the process using a batch system."""
393 self.write_batch_script(n)
393 self.write_batch_script(n)
394 output = yield getProcessOutput(self.submit_command,
394 output = yield getProcessOutput(self.submit_command,
395 [self.batch_file], env=os.environ)
395 [self.batch_file], env=os.environ)
396 job_id = self.parse_job_id(output)
396 job_id = self.parse_job_id(output)
397 self.notify_start(job_id)
397 self.notify_start(job_id)
398 defer.returnValue(job_id)
398 defer.returnValue(job_id)
399
399
400 @inlineCallbacks
400 @inlineCallbacks
401 def stop(self):
401 def stop(self):
402 output = yield getProcessOutput(self.delete_command,
402 output = yield getProcessOutput(self.delete_command,
403 [self.job_id], env=os.environ
403 [self.job_id], env=os.environ
404 )
404 )
405 self.notify_stop(output) # Pass the output of the kill cmd
405 self.notify_stop(output) # Pass the output of the kill cmd
406 defer.returnValue(output)
406 defer.returnValue(output)
407
407
408
408
409 class PBSLauncher(BatchSystemLauncher):
409 class PBSLauncher(BatchSystemLauncher):
410 """A BatchSystemLauncher subclass for PBS."""
410 """A BatchSystemLauncher subclass for PBS."""
411
411
412 submit_command = Str('qsub', config=True)
412 submit_command = Str('qsub', config=True)
413 delete_command = Str('qdel', config=True)
413 delete_command = Str('qdel', config=True)
414 job_id_regexp = Str('\d+', config=True)
414 job_id_regexp = Str('\d+', config=True)
415 batch_template = Str('', config=True)
415 batch_template = Str('', config=True)
416 batch_file_name = Unicode(u'pbs_batch_script', config=True)
416 batch_file_name = Unicode(u'pbs_batch_script', config=True)
417 batch_file = Unicode(u'')
417 batch_file = Unicode(u'')
418
418
419
419
420 #-----------------------------------------------------------------------------
420 #-----------------------------------------------------------------------------
421 # Controller launchers
421 # Controller launchers
422 #-----------------------------------------------------------------------------
422 #-----------------------------------------------------------------------------
423
423
424 def find_controller_cmd():
424 def find_controller_cmd():
425 """Find the command line ipcontroller program in a cross platform way."""
425 """Find the command line ipcontroller program in a cross platform way."""
426 if sys.platform == 'win32':
426 if sys.platform == 'win32':
427 # This logic is needed because the ipcontroller script doesn't
427 # This logic is needed because the ipcontroller script doesn't
428 # always get installed in the same way or in the same location.
428 # always get installed in the same way or in the same location.
429 from IPython.kernel import ipcontrollerapp
429 from IPython.kernel import ipcontrollerapp
430 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
430 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
431 # The -u option here turns on unbuffered output, which is required
431 # The -u option here turns on unbuffered output, which is required
432 # on Win32 to prevent wierd conflict and problems with Twisted.
432 # on Win32 to prevent wierd conflict and problems with Twisted.
433 # Also, use sys.executable to make sure we are picking up the
433 # Also, use sys.executable to make sure we are picking up the
434 # right python exe.
434 # right python exe.
435 cmd = [sys.executable, '-u', script_location]
435 cmd = [sys.executable, '-u', script_location]
436 else:
436 else:
437 # ipcontroller has to be on the PATH in this case.
437 # ipcontroller has to be on the PATH in this case.
438 cmd = ['ipcontroller']
438 cmd = ['ipcontroller']
439 return cmd
439 return cmd
440
440
441
441
442 class LocalControllerLauncher(LocalProcessLauncher):
442 class LocalControllerLauncher(LocalProcessLauncher):
443 """Launch a controller as a regular external process."""
443 """Launch a controller as a regular external process."""
444
444
445 controller_cmd = List(find_controller_cmd())
445 controller_cmd = List(find_controller_cmd(), config=True)
446 # Command line arguments to ipcontroller.
446 # Command line arguments to ipcontroller.
447 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
447 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
448
448
449 def find_args(self):
449 def find_args(self):
450 return self.controller_cmd + self.controller_args
450 return self.controller_cmd + self.controller_args
451
451
452 def start(self, profile=None, cluster_dir=None):
452 def start(self, profile=None, cluster_dir=None):
453 """Start the controller by profile or cluster_dir."""
453 """Start the controller by profile or cluster_dir."""
454 if cluster_dir is not None:
454 if cluster_dir is not None:
455 self.controller_args.extend(['--cluster-dir', cluster_dir])
455 self.controller_args.extend(['--cluster-dir', cluster_dir])
456 if profile is not None:
456 if profile is not None:
457 self.controller_args.extend(['--profile', profile])
457 self.controller_args.extend(['--profile', profile])
458 log.msg("Starting LocalControllerLauncher: %r" % self.args)
458 log.msg("Starting LocalControllerLauncher: %r" % self.args)
459 return super(LocalControllerLauncher, self).start()
459 return super(LocalControllerLauncher, self).start()
460
460
461
461
462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
463 pass
463 pass
464
464
465
465
466 class MPIExecControllerLauncher(MPIExecLauncher):
466 class MPIExecControllerLauncher(MPIExecLauncher):
467 """Launch a controller using mpiexec."""
467 """Launch a controller using mpiexec."""
468
468
469 controller_cmd = List(find_controller_cmd(), config=False)
469 controller_cmd = List(find_controller_cmd(), config=True)
470 # Command line arguments to ipcontroller.
470 # Command line arguments to ipcontroller.
471 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
471 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
472 n = Int(1, config=False)
472 n = Int(1, config=False)
473
473
474 def start(self, profile=None, cluster_dir=None):
474 def start(self, profile=None, cluster_dir=None):
475 """Start the controller by profile or cluster_dir."""
475 """Start the controller by profile or cluster_dir."""
476 if cluster_dir is not None:
476 if cluster_dir is not None:
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
478 if profile is not None:
478 if profile is not None:
479 self.controller_args.extend(['--profile', profile])
479 self.controller_args.extend(['--profile', profile])
480 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
480 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
481 return super(MPIExecControllerLauncher, self).start(1)
481 return super(MPIExecControllerLauncher, self).start(1)
482
482
483 def find_args(self):
483 def find_args(self):
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
485 self.controller_cmd + self.controller_args
485 self.controller_cmd + self.controller_args
486
486
487
487
488 class PBSControllerLauncher(PBSLauncher):
488 class PBSControllerLauncher(PBSLauncher):
489 """Launch a controller using PBS."""
489 """Launch a controller using PBS."""
490
490
491 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
491 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
492
492
493 def start(self, profile=None, cluster_dir=None):
493 def start(self, profile=None, cluster_dir=None):
494 """Start the controller by profile or cluster_dir."""
494 """Start the controller by profile or cluster_dir."""
495 # Here we save profile and cluster_dir in the context so they
495 # Here we save profile and cluster_dir in the context so they
496 # can be used in the batch script template as ${profile} and
496 # can be used in the batch script template as ${profile} and
497 # ${cluster_dir}
497 # ${cluster_dir}
498 if cluster_dir is not None:
498 if cluster_dir is not None:
499 self.context['cluster_dir'] = cluster_dir
499 self.context['cluster_dir'] = cluster_dir
500 if profile is not None:
500 if profile is not None:
501 self.context['profile'] = profile
501 self.context['profile'] = profile
502 log.msg("Starting PBSControllerLauncher: %r" % self.args)
502 log.msg("Starting PBSControllerLauncher: %r" % self.args)
503 return super(PBSControllerLauncher, self).start(1)
503 return super(PBSControllerLauncher, self).start(1)
504
504
505
505
506 class SSHControllerLauncher(SSHLauncher):
506 class SSHControllerLauncher(SSHLauncher):
507 pass
507 pass
508
508
509
509
510 #-----------------------------------------------------------------------------
510 #-----------------------------------------------------------------------------
511 # Engine launchers
511 # Engine launchers
512 #-----------------------------------------------------------------------------
512 #-----------------------------------------------------------------------------
513
513
514
514
515 def find_engine_cmd():
515 def find_engine_cmd():
516 """Find the command line ipengine program in a cross platform way."""
516 """Find the command line ipengine program in a cross platform way."""
517 if sys.platform == 'win32':
517 if sys.platform == 'win32':
518 # This logic is needed because the ipengine script doesn't
518 # This logic is needed because the ipengine script doesn't
519 # always get installed in the same way or in the same location.
519 # always get installed in the same way or in the same location.
520 from IPython.kernel import ipengineapp
520 from IPython.kernel import ipengineapp
521 script_location = ipengineapp.__file__.replace('.pyc', '.py')
521 script_location = ipengineapp.__file__.replace('.pyc', '.py')
522 # The -u option here turns on unbuffered output, which is required
522 # The -u option here turns on unbuffered output, which is required
523 # on Win32 to prevent wierd conflict and problems with Twisted.
523 # on Win32 to prevent wierd conflict and problems with Twisted.
524 # Also, use sys.executable to make sure we are picking up the
524 # Also, use sys.executable to make sure we are picking up the
525 # right python exe.
525 # right python exe.
526 cmd = [sys.executable, '-u', script_location]
526 cmd = [sys.executable, '-u', script_location]
527 else:
527 else:
528 # ipcontroller has to be on the PATH in this case.
528 # ipcontroller has to be on the PATH in this case.
529 cmd = ['ipengine']
529 cmd = ['ipengine']
530 return cmd
530 return cmd
531
531
532
532
533 class LocalEngineLauncher(LocalProcessLauncher):
533 class LocalEngineLauncher(LocalProcessLauncher):
534 """Launch a single engine as a regular externall process."""
534 """Launch a single engine as a regular externall process."""
535
535
536 engine_cmd = List(find_engine_cmd())
536 engine_cmd = List(find_engine_cmd(), config=True)
537 # Command line arguments for ipengine.
537 # Command line arguments for ipengine.
538 engine_args = List(
538 engine_args = List(
539 ['--log-to-file','--log-level', '40'], config=True
539 ['--log-to-file','--log-level', '40'], config=True
540 )
540 )
541
541
542 def find_args(self):
542 def find_args(self):
543 return self.engine_cmd + self.engine_args
543 return self.engine_cmd + self.engine_args
544
544
545 def start(self, profile=None, cluster_dir=None):
545 def start(self, profile=None, cluster_dir=None):
546 """Start the engine by profile or cluster_dir."""
546 """Start the engine by profile or cluster_dir."""
547 if cluster_dir is not None:
547 if cluster_dir is not None:
548 self.engine_args.extend(['--cluster-dir', cluster_dir])
548 self.engine_args.extend(['--cluster-dir', cluster_dir])
549 if profile is not None:
549 if profile is not None:
550 self.engine_args.extend(['--profile', profile])
550 self.engine_args.extend(['--profile', profile])
551 return super(LocalEngineLauncher, self).start()
551 return super(LocalEngineLauncher, self).start()
552
552
553
553
554 class LocalEngineSetLauncher(BaseLauncher):
554 class LocalEngineSetLauncher(BaseLauncher):
555 """Launch a set of engines as regular external processes."""
555 """Launch a set of engines as regular external processes."""
556
556
557 # Command line arguments for ipengine.
557 # Command line arguments for ipengine.
558 engine_args = List(
558 engine_args = List(
559 ['--log-to-file','--log-level', '40'], config=True
559 ['--log-to-file','--log-level', '40'], config=True
560 )
560 )
561
561
562 def __init__(self, working_dir, parent=None, name=None, config=None):
562 def __init__(self, working_dir, parent=None, name=None, config=None):
563 super(LocalEngineSetLauncher, self).__init__(
563 super(LocalEngineSetLauncher, self).__init__(
564 working_dir, parent, name, config
564 working_dir, parent, name, config
565 )
565 )
566 self.launchers = []
566 self.launchers = []
567
567
568 def start(self, n, profile=None, cluster_dir=None):
568 def start(self, n, profile=None, cluster_dir=None):
569 """Start n engines by profile or cluster_dir."""
569 """Start n engines by profile or cluster_dir."""
570 dlist = []
570 dlist = []
571 for i in range(n):
571 for i in range(n):
572 el = LocalEngineLauncher(self.working_dir, self)
572 el = LocalEngineLauncher(self.working_dir, self)
573 # Copy the engine args over to each engine launcher.
573 # Copy the engine args over to each engine launcher.
574 import copy
574 import copy
575 el.engine_args = copy.deepcopy(self.engine_args)
575 el.engine_args = copy.deepcopy(self.engine_args)
576 d = el.start(profile, cluster_dir)
576 d = el.start(profile, cluster_dir)
577 if i==0:
577 if i==0:
578 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
578 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
579 self.launchers.append(el)
579 self.launchers.append(el)
580 dlist.append(d)
580 dlist.append(d)
581 # The consumeErrors here could be dangerous
581 # The consumeErrors here could be dangerous
582 dfinal = gatherBoth(dlist, consumeErrors=True)
582 dfinal = gatherBoth(dlist, consumeErrors=True)
583 dfinal.addCallback(self.notify_start)
583 dfinal.addCallback(self.notify_start)
584 return dfinal
584 return dfinal
585
585
586 def find_args(self):
586 def find_args(self):
587 return ['engine set']
587 return ['engine set']
588
588
589 def signal(self, sig):
589 def signal(self, sig):
590 dlist = []
590 dlist = []
591 for el in self.launchers:
591 for el in self.launchers:
592 d = el.signal(sig)
592 d = el.signal(sig)
593 dlist.append(d)
593 dlist.append(d)
594 dfinal = gatherBoth(dlist, consumeErrors=True)
594 dfinal = gatherBoth(dlist, consumeErrors=True)
595 return dfinal
595 return dfinal
596
596
597 def interrupt_then_kill(self, delay=1.0):
597 def interrupt_then_kill(self, delay=1.0):
598 dlist = []
598 dlist = []
599 for el in self.launchers:
599 for el in self.launchers:
600 d = el.interrupt_then_kill(delay)
600 d = el.interrupt_then_kill(delay)
601 dlist.append(d)
601 dlist.append(d)
602 dfinal = gatherBoth(dlist, consumeErrors=True)
602 dfinal = gatherBoth(dlist, consumeErrors=True)
603 return dfinal
603 return dfinal
604
604
605 def stop(self):
605 def stop(self):
606 return self.interrupt_then_kill()
606 return self.interrupt_then_kill()
607
607
608 def observe_stop(self):
608 def observe_stop(self):
609 dlist = [el.observe_stop() for el in self.launchers]
609 dlist = [el.observe_stop() for el in self.launchers]
610 dfinal = gatherBoth(dlist, consumeErrors=False)
610 dfinal = gatherBoth(dlist, consumeErrors=False)
611 dfinal.addCallback(self.notify_stop)
611 dfinal.addCallback(self.notify_stop)
612 return dfinal
612 return dfinal
613
613
614
614
615 class MPIExecEngineSetLauncher(MPIExecLauncher):
615 class MPIExecEngineSetLauncher(MPIExecLauncher):
616
616
617 engine_cmd = List(find_engine_cmd(), config=False)
617 engine_cmd = List(find_engine_cmd(), config=True)
618 # Command line arguments for ipengine.
618 # Command line arguments for ipengine.
619 engine_args = List(
619 engine_args = List(
620 ['--log-to-file','--log-level', '40'], config=True
620 ['--log-to-file','--log-level', '40'], config=True
621 )
621 )
622 n = Int(1, config=True)
622 n = Int(1, config=True)
623
623
624 def start(self, n, profile=None, cluster_dir=None):
624 def start(self, n, profile=None, cluster_dir=None):
625 """Start n engines by profile or cluster_dir."""
625 """Start n engines by profile or cluster_dir."""
626 if cluster_dir is not None:
626 if cluster_dir is not None:
627 self.engine_args.extend(['--cluster-dir', cluster_dir])
627 self.engine_args.extend(['--cluster-dir', cluster_dir])
628 if profile is not None:
628 if profile is not None:
629 self.engine_args.extend(['--profile', profile])
629 self.engine_args.extend(['--profile', profile])
630 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
630 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
631 return super(MPIExecEngineSetLauncher, self).start(n)
631 return super(MPIExecEngineSetLauncher, self).start(n)
632
632
633 def find_args(self):
633 def find_args(self):
634 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
634 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
635 self.engine_cmd + self.engine_args
635 self.engine_cmd + self.engine_args
636
636
637
637
638 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
638 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
639 pass
639 pass
640
640
641
641
642 class PBSEngineSetLauncher(PBSLauncher):
642 class PBSEngineSetLauncher(PBSLauncher):
643
643
644 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
644 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
645
645
646 def start(self, n, profile=None, cluster_dir=None):
646 def start(self, n, profile=None, cluster_dir=None):
647 """Start n engines by profile or cluster_dir."""
647 """Start n engines by profile or cluster_dir."""
648 if cluster_dir is not None:
648 if cluster_dir is not None:
649 self.program_args.extend(['--cluster-dir', cluster_dir])
649 self.program_args.extend(['--cluster-dir', cluster_dir])
650 if profile is not None:
650 if profile is not None:
651 self.program_args.extend(['-p', profile])
651 self.program_args.extend(['-p', profile])
652 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
652 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
653 return super(PBSEngineSetLauncher, self).start(n)
653 return super(PBSEngineSetLauncher, self).start(n)
654
654
655
655
656 class SSHEngineSetLauncher(BaseLauncher):
656 class SSHEngineSetLauncher(BaseLauncher):
657 pass
657 pass
658
658
659
659
660 #-----------------------------------------------------------------------------
660 #-----------------------------------------------------------------------------
661 # A launcher for ipcluster itself!
661 # A launcher for ipcluster itself!
662 #-----------------------------------------------------------------------------
662 #-----------------------------------------------------------------------------
663
663
664
664
665 def find_ipcluster_cmd():
665 def find_ipcluster_cmd():
666 """Find the command line ipcluster program in a cross platform way."""
666 """Find the command line ipcluster program in a cross platform way."""
667 if sys.platform == 'win32':
667 if sys.platform == 'win32':
668 # This logic is needed because the ipcluster script doesn't
668 # This logic is needed because the ipcluster script doesn't
669 # always get installed in the same way or in the same location.
669 # always get installed in the same way or in the same location.
670 from IPython.kernel import ipclusterapp
670 from IPython.kernel import ipclusterapp
671 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
671 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
672 # The -u option here turns on unbuffered output, which is required
672 # The -u option here turns on unbuffered output, which is required
673 # on Win32 to prevent wierd conflict and problems with Twisted.
673 # on Win32 to prevent wierd conflict and problems with Twisted.
674 # Also, use sys.executable to make sure we are picking up the
674 # Also, use sys.executable to make sure we are picking up the
675 # right python exe.
675 # right python exe.
676 cmd = [sys.executable, '-u', script_location]
676 cmd = [sys.executable, '-u', script_location]
677 else:
677 else:
678 # ipcontroller has to be on the PATH in this case.
678 # ipcontroller has to be on the PATH in this case.
679 cmd = ['ipcluster']
679 cmd = ['ipcluster']
680 return cmd
680 return cmd
681
681
682
682
683 class IPClusterLauncher(LocalProcessLauncher):
683 class IPClusterLauncher(LocalProcessLauncher):
684 """Launch the ipcluster program in an external process."""
684 """Launch the ipcluster program in an external process."""
685
685
686 ipcluster_cmd = List(find_ipcluster_cmd())
686 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
687 # Command line arguments to pass to ipcluster.
687 # Command line arguments to pass to ipcluster.
688 ipcluster_args = List(
688 ipcluster_args = List(
689 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
689 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
690 ipcluster_subcommand = Str('start')
690 ipcluster_subcommand = Str('start')
691 ipcluster_n = Int(2)
691 ipcluster_n = Int(2)
692
692
693 def find_args(self):
693 def find_args(self):
694 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
694 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
695 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
695 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
696
696
697 def start(self):
697 def start(self):
698 log.msg("Starting ipcluster: %r" % self.args)
698 log.msg("Starting ipcluster: %r" % self.args)
699 return super(IPClusterLauncher, self).start()
699 return super(IPClusterLauncher, self).start()
700
700
General Comments 0
You need to be logged in to leave comments. Login now