##// END OF EJS Templates
Changed the INT then KILL delay to 2 s to be a little more friendly.
Brian Granger -
Show More
@@ -1,87 +1,88 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
5
5
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
7 blocking. This means that methods on the clients return the actual results
7 blocking. This means that methods on the clients return the actual results
8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 rather than a deferred to the result. Also, we manage the Twisted reactor
9 for you. This is done by running the reactor in a thread.
9 for you. This is done by running the reactor in a thread.
10
10
11 The main classes in this module are:
11 The main classes in this module are:
12
12
13 * MultiEngineClient
13 * MultiEngineClient
14 * TaskClient
14 * TaskClient
15 * Task
15 * Task
16 * CompositeError
16 * CompositeError
17 """
17 """
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2009 The IPython Development Team
20 # Copyright (C) 2008-2009 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 from cStringIO import StringIO
30 from cStringIO import StringIO
31 import sys
31 import sys
32 import warnings
32 import warnings
33
33
34 # from IPython.utils import growl
34 # from IPython.utils import growl
35 # growl.start("IPython1 Client")
35 # growl.start("IPython1 Client")
36
36
37
37
38 from twisted.internet import reactor
38 from twisted.internet import reactor
39 from twisted.internet.error import PotentialZombieWarning
39 from twisted.internet.error import PotentialZombieWarning
40 from twisted.python import log
40 from twisted.python import log
41
41
42 from IPython.kernel.clientconnector import ClientConnector, Cluster
42 from IPython.kernel.clientconnector import ClientConnector, Cluster
43 from IPython.kernel.twistedutil import ReactorInThread
43 from IPython.kernel.twistedutil import ReactorInThread
44 from IPython.kernel.twistedutil import blockingCallFromThread
44 from IPython.kernel.twistedutil import blockingCallFromThread
45
45
46 # These enable various things
46 # These enable various things
47 from IPython.kernel import codeutil
47 from IPython.kernel import codeutil
48 # import IPython.kernel.magic
48 # import IPython.kernel.magic
49
49
50 # Other things that the user will need
50 # Other things that the user will need
51 from IPython.kernel.task import MapTask, StringTask
51 from IPython.kernel.task import MapTask, StringTask
52 from IPython.kernel.error import CompositeError
52 from IPython.kernel.error import CompositeError
53
53
54 #-------------------------------------------------------------------------------
54 #-------------------------------------------------------------------------------
55 # Code
55 # Code
56 #-------------------------------------------------------------------------------
56 #-------------------------------------------------------------------------------
57
57
58 warnings.simplefilter('ignore', PotentialZombieWarning)
58 warnings.simplefilter('ignore', PotentialZombieWarning)
59
59
60 _client_tub = ClientConnector()
60 _client_tub = ClientConnector()
61
61
62 get_multiengine_client = _client_tub.get_multiengine_client
62 get_multiengine_client = _client_tub.get_multiengine_client
63 get_task_client = _client_tub.get_task_client
63 get_task_client = _client_tub.get_task_client
64 MultiEngineClient = get_multiengine_client
64 MultiEngineClient = get_multiengine_client
65 TaskClient = get_task_client
65 TaskClient = get_task_client
66
66
67 twisted_log = StringIO()
67 # This isn't great. I should probably set this up in the ReactorInThread
68 # class below. But, it does work for now.
68 log.startLogging(sys.stdout, setStdout=0)
69 log.startLogging(sys.stdout, setStdout=0)
69
70
70 # Now we start the reactor in a thread
71 # Now we start the reactor in a thread
71 rit = ReactorInThread()
72 rit = ReactorInThread()
72 rit.setDaemon(True)
73 rit.setDaemon(True)
73 rit.start()
74 rit.start()
74
75
75
76
76
77
77
78
78 __all__ = [
79 __all__ = [
79 'MapTask',
80 'MapTask',
80 'StringTask',
81 'StringTask',
81 'MultiEngineClient',
82 'MultiEngineClient',
82 'TaskClient',
83 'TaskClient',
83 'CompositeError',
84 'CompositeError',
84 'get_task_client',
85 'get_task_client',
85 'get_multiengine_client',
86 'get_multiengine_client',
86 'Cluster'
87 'Cluster'
87 ]
88 ]
@@ -1,628 +1,628 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26
26
27 from twisted.internet import reactor, defer
27 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
30 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
32 from twisted.python import log
33 from twisted.python.failure import Failure
33 from twisted.python.failure import Failure
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Generic launchers
36 # Generic launchers
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 class LauncherError(Exception):
40 class LauncherError(Exception):
41 pass
41 pass
42
42
43
43
44 class ProcessStateError(LauncherError):
44 class ProcessStateError(LauncherError):
45 pass
45 pass
46
46
47
47
48 class UnknownStatus(LauncherError):
48 class UnknownStatus(LauncherError):
49 pass
49 pass
50
50
51
51
52 class BaseLauncher(Component):
52 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
53 """An asbtraction for starting, stopping and signaling a process."""
54
54
55 working_dir = Unicode(u'')
55 working_dir = Unicode(u'')
56
56
57 def __init__(self, working_dir, parent=None, name=None, config=None):
57 def __init__(self, working_dir, parent=None, name=None, config=None):
58 super(BaseLauncher, self).__init__(parent, name, config)
58 super(BaseLauncher, self).__init__(parent, name, config)
59 self.working_dir = working_dir
59 self.working_dir = working_dir
60 self.state = 'before' # can be before, running, after
60 self.state = 'before' # can be before, running, after
61 self.stop_deferreds = []
61 self.stop_deferreds = []
62 self.start_data = None
62 self.start_data = None
63 self.stop_data = None
63 self.stop_data = None
64
64
65 @property
65 @property
66 def args(self):
66 def args(self):
67 """A list of cmd and args that will be used to start the process."""
67 """A list of cmd and args that will be used to start the process."""
68 return self.find_args()
68 return self.find_args()
69
69
70 def find_args(self):
70 def find_args(self):
71 """The ``.args`` property calls this to find the args list."""
71 """The ``.args`` property calls this to find the args list."""
72 raise NotImplementedError('find_args must be implemented in a subclass')
72 raise NotImplementedError('find_args must be implemented in a subclass')
73
73
74 @property
74 @property
75 def arg_str(self):
75 def arg_str(self):
76 """The string form of the program arguments."""
76 """The string form of the program arguments."""
77 return ' '.join(self.args)
77 return ' '.join(self.args)
78
78
79 @property
79 @property
80 def running(self):
80 def running(self):
81 if self.state == 'running':
81 if self.state == 'running':
82 return True
82 return True
83 else:
83 else:
84 return False
84 return False
85
85
86 def start(self):
86 def start(self):
87 """Start the process.
87 """Start the process.
88
88
89 This must return a deferred that fires with information about the
89 This must return a deferred that fires with information about the
90 process starting (like a pid, job id, etc.)
90 process starting (like a pid, job id, etc.)
91 """
91 """
92 return defer.fail(
92 return defer.fail(
93 Failure(NotImplementedError(
93 Failure(NotImplementedError(
94 'start must be implemented in a subclass')
94 'start must be implemented in a subclass')
95 )
95 )
96 )
96 )
97
97
98 def stop(self):
98 def stop(self):
99 """Stop the process and notify observers of ProcessStopped.
99 """Stop the process and notify observers of ProcessStopped.
100
100
101 This must return a deferred that fires with any errors that occur
101 This must return a deferred that fires with any errors that occur
102 while the process is attempting to be shut down. This deferred
102 while the process is attempting to be shut down. This deferred
103 won't fire when the process actually stops. These events are
103 won't fire when the process actually stops. These events are
104 handled by calling :func:`observe_stop`.
104 handled by calling :func:`observe_stop`.
105 """
105 """
106 return defer.fail(
106 return defer.fail(
107 Failure(NotImplementedError(
107 Failure(NotImplementedError(
108 'stop must be implemented in a subclass')
108 'stop must be implemented in a subclass')
109 )
109 )
110 )
110 )
111
111
112 def observe_stop(self):
112 def observe_stop(self):
113 """Get a deferred that will fire when the process stops.
113 """Get a deferred that will fire when the process stops.
114
114
115 The deferred will fire with data that contains information about
115 The deferred will fire with data that contains information about
116 the exit status of the process.
116 the exit status of the process.
117 """
117 """
118 if self.state=='after':
118 if self.state=='after':
119 return defer.succeed(self.stop_data)
119 return defer.succeed(self.stop_data)
120 else:
120 else:
121 d = defer.Deferred()
121 d = defer.Deferred()
122 self.stop_deferreds.append(d)
122 self.stop_deferreds.append(d)
123 return d
123 return d
124
124
125 def notify_start(self, data):
125 def notify_start(self, data):
126 """Call this to tigger startup actions.
126 """Call this to tigger startup actions.
127
127
128 This logs the process startup and sets the state to running. It is
128 This logs the process startup and sets the state to running. It is
129 a pass-through so it can be used as a callback.
129 a pass-through so it can be used as a callback.
130 """
130 """
131
131
132 log.msg('Process %r started: %r' % (self.args[0], data))
132 log.msg('Process %r started: %r' % (self.args[0], data))
133 self.start_data = data
133 self.start_data = data
134 self.state = 'running'
134 self.state = 'running'
135 return data
135 return data
136
136
137 def notify_stop(self, data):
137 def notify_stop(self, data):
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
139
139
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 self.stop_data = data
141 self.stop_data = data
142 self.state = 'after'
142 self.state = 'after'
143 for i in range(len(self.stop_deferreds)):
143 for i in range(len(self.stop_deferreds)):
144 d = self.stop_deferreds.pop()
144 d = self.stop_deferreds.pop()
145 d.callback(data)
145 d.callback(data)
146 return data
146 return data
147
147
148 def signal(self, sig):
148 def signal(self, sig):
149 """Signal the process.
149 """Signal the process.
150
150
151 Return a semi-meaningless deferred after signaling the process.
151 Return a semi-meaningless deferred after signaling the process.
152
152
153 Parameters
153 Parameters
154 ----------
154 ----------
155 sig : str or int
155 sig : str or int
156 'KILL', 'INT', etc., or any signal number
156 'KILL', 'INT', etc., or any signal number
157 """
157 """
158 return defer.fail(
158 return defer.fail(
159 Failure(NotImplementedError(
159 Failure(NotImplementedError(
160 'signal must be implemented in a subclass')
160 'signal must be implemented in a subclass')
161 )
161 )
162 )
162 )
163
163
164
164
165 class LocalProcessLauncherProtocol(ProcessProtocol):
165 class LocalProcessLauncherProtocol(ProcessProtocol):
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
167
167
168 def __init__(self, process_launcher):
168 def __init__(self, process_launcher):
169 self.process_launcher = process_launcher
169 self.process_launcher = process_launcher
170 self.pid = None
170 self.pid = None
171
171
172 def connectionMade(self):
172 def connectionMade(self):
173 self.pid = self.transport.pid
173 self.pid = self.transport.pid
174 self.process_launcher.notify_start(self.transport.pid)
174 self.process_launcher.notify_start(self.transport.pid)
175
175
176 def processEnded(self, status):
176 def processEnded(self, status):
177 value = status.value
177 value = status.value
178 if isinstance(value, ProcessDone):
178 if isinstance(value, ProcessDone):
179 self.process_launcher.notify_stop(
179 self.process_launcher.notify_stop(
180 {'exit_code':0,
180 {'exit_code':0,
181 'signal':None,
181 'signal':None,
182 'status':None,
182 'status':None,
183 'pid':self.pid
183 'pid':self.pid
184 }
184 }
185 )
185 )
186 elif isinstance(value, ProcessTerminated):
186 elif isinstance(value, ProcessTerminated):
187 self.process_launcher.notify_stop(
187 self.process_launcher.notify_stop(
188 {'exit_code':value.exitCode,
188 {'exit_code':value.exitCode,
189 'signal':value.signal,
189 'signal':value.signal,
190 'status':value.status,
190 'status':value.status,
191 'pid':self.pid
191 'pid':self.pid
192 }
192 }
193 )
193 )
194 else:
194 else:
195 raise UnknownStatus("Unknown exit status, this is probably a "
195 raise UnknownStatus("Unknown exit status, this is probably a "
196 "bug in Twisted")
196 "bug in Twisted")
197
197
198 def outReceived(self, data):
198 def outReceived(self, data):
199 log.msg(data)
199 log.msg(data)
200
200
201 def errReceived(self, data):
201 def errReceived(self, data):
202 log.err(data)
202 log.err(data)
203
203
204
204
205 class LocalProcessLauncher(BaseLauncher):
205 class LocalProcessLauncher(BaseLauncher):
206 """Start and stop an external process in an asynchronous manner."""
206 """Start and stop an external process in an asynchronous manner."""
207
207
208 cmd_and_args = List([])
208 cmd_and_args = List([])
209
209
210 def __init__(self, working_dir, parent=None, name=None, config=None):
210 def __init__(self, working_dir, parent=None, name=None, config=None):
211 super(LocalProcessLauncher, self).__init__(
211 super(LocalProcessLauncher, self).__init__(
212 working_dir, parent, name, config
212 working_dir, parent, name, config
213 )
213 )
214 self.process_protocol = None
214 self.process_protocol = None
215 self.start_deferred = None
215 self.start_deferred = None
216
216
217 def find_args(self):
217 def find_args(self):
218 return self.cmd_and_args
218 return self.cmd_and_args
219
219
220 def start(self):
220 def start(self):
221 if self.state == 'before':
221 if self.state == 'before':
222 self.process_protocol = LocalProcessLauncherProtocol(self)
222 self.process_protocol = LocalProcessLauncherProtocol(self)
223 self.start_deferred = defer.Deferred()
223 self.start_deferred = defer.Deferred()
224 self.process_transport = reactor.spawnProcess(
224 self.process_transport = reactor.spawnProcess(
225 self.process_protocol,
225 self.process_protocol,
226 str(self.args[0]),
226 str(self.args[0]),
227 [str(a) for a in self.args],
227 [str(a) for a in self.args],
228 env=os.environ
228 env=os.environ
229 )
229 )
230 return self.start_deferred
230 return self.start_deferred
231 else:
231 else:
232 s = 'The process was already started and has state: %r' % self.state
232 s = 'The process was already started and has state: %r' % self.state
233 return defer.fail(ProcessStateError(s))
233 return defer.fail(ProcessStateError(s))
234
234
235 def notify_start(self, data):
235 def notify_start(self, data):
236 super(LocalProcessLauncher, self).notify_start(data)
236 super(LocalProcessLauncher, self).notify_start(data)
237 self.start_deferred.callback(data)
237 self.start_deferred.callback(data)
238
238
239 def stop(self):
239 def stop(self):
240 return self.interrupt_then_kill()
240 return self.interrupt_then_kill()
241
241
242 @make_deferred
242 @make_deferred
243 def signal(self, sig):
243 def signal(self, sig):
244 if self.state == 'running':
244 if self.state == 'running':
245 self.process_transport.signalProcess(sig)
245 self.process_transport.signalProcess(sig)
246
246
247 @inlineCallbacks
247 @inlineCallbacks
248 def interrupt_then_kill(self, delay=1.0):
248 def interrupt_then_kill(self, delay=2.0):
249 yield self.signal('INT')
249 yield self.signal('INT')
250 yield sleep_deferred(delay)
250 yield sleep_deferred(delay)
251 yield self.signal('KILL')
251 yield self.signal('KILL')
252
252
253
253
254 class MPIExecLauncher(LocalProcessLauncher):
254 class MPIExecLauncher(LocalProcessLauncher):
255
255
256 mpi_cmd = List(['mpiexec'], config=True)
256 mpi_cmd = List(['mpiexec'], config=True)
257 mpi_args = List([], config=True)
257 mpi_args = List([], config=True)
258 program = List(['date'], config=True)
258 program = List(['date'], config=True)
259 program_args = List([], config=True)
259 program_args = List([], config=True)
260 n = Int(1, config=True)
260 n = Int(1, config=True)
261
261
262 def find_args(self):
262 def find_args(self):
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 self.program + self.program_args
264 self.program + self.program_args
265
265
266 def start(self, n):
266 def start(self, n):
267 self.n = n
267 self.n = n
268 return super(MPIExecLauncher, self).start()
268 return super(MPIExecLauncher, self).start()
269
269
270
270
271 class SSHLauncher(BaseLauncher):
271 class SSHLauncher(BaseLauncher):
272 """A minimal launcher for ssh.
272 """A minimal launcher for ssh.
273
273
274 To be useful this will probably have to be extended to use the ``sshx``
274 To be useful this will probably have to be extended to use the ``sshx``
275 idea for environment variables. There could be other things this needs
275 idea for environment variables. There could be other things this needs
276 as well.
276 as well.
277 """
277 """
278
278
279 ssh_cmd = List(['ssh'], config=True)
279 ssh_cmd = List(['ssh'], config=True)
280 ssh_args = List([], config=True)
280 ssh_args = List([], config=True)
281 program = List(['date'], config=True)
281 program = List(['date'], config=True)
282 program_args = List([], config=True)
282 program_args = List([], config=True)
283 hostname = Str('', config=True)
283 hostname = Str('', config=True)
284 user = Str(os.environ['USER'], config=True)
284 user = Str(os.environ['USER'], config=True)
285 location = Str('')
285 location = Str('')
286
286
287 def _hostname_changed(self, name, old, new):
287 def _hostname_changed(self, name, old, new):
288 self.location = '%s@%s' % (self.user, new)
288 self.location = '%s@%s' % (self.user, new)
289
289
290 def _user_changed(self, name, old, new):
290 def _user_changed(self, name, old, new):
291 self.location = '%s@%s' % (new, self.hostname)
291 self.location = '%s@%s' % (new, self.hostname)
292
292
293 def find_args(self):
293 def find_args(self):
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 self.program + self.program_args
295 self.program + self.program_args
296
296
297 def start(self, n, hostname=None, user=None):
297 def start(self, n, hostname=None, user=None):
298 if hostname is not None:
298 if hostname is not None:
299 self.hostname = hostname
299 self.hostname = hostname
300 if user is not None:
300 if user is not None:
301 self.user = user
301 self.user = user
302 return super(SSHLauncher, self).start()
302 return super(SSHLauncher, self).start()
303
303
304
304
305 class WindowsHPCLauncher(BaseLauncher):
305 class WindowsHPCLauncher(BaseLauncher):
306 pass
306 pass
307
307
308
308
309 class BatchSystemLauncher(BaseLauncher):
309 class BatchSystemLauncher(BaseLauncher):
310
310
311 # Subclasses must fill these in. See PBSEngineSet
311 # Subclasses must fill these in. See PBSEngineSet
312 submit_command = Str('', config=True)
312 submit_command = Str('', config=True)
313 delete_command = Str('', config=True)
313 delete_command = Str('', config=True)
314 job_id_regexp = Str('', config=True)
314 job_id_regexp = Str('', config=True)
315 batch_template = Str('', config=True)
315 batch_template = Str('', config=True)
316 batch_file_name = Unicode(u'batch_script', config=True)
316 batch_file_name = Unicode(u'batch_script', config=True)
317 batch_file = Unicode(u'')
317 batch_file = Unicode(u'')
318
318
319 def __init__(self, working_dir, parent=None, name=None, config=None):
319 def __init__(self, working_dir, parent=None, name=None, config=None):
320 super(BatchSystemLauncher, self).__init__(
320 super(BatchSystemLauncher, self).__init__(
321 working_dir, parent, name, config
321 working_dir, parent, name, config
322 )
322 )
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 self.context = {}
324 self.context = {}
325
325
326 def parse_job_id(self, output):
326 def parse_job_id(self, output):
327 m = re.match(self.job_id_regexp, output)
327 m = re.match(self.job_id_regexp, output)
328 if m is not None:
328 if m is not None:
329 job_id = m.group()
329 job_id = m.group()
330 else:
330 else:
331 raise LauncherError("Job id couldn't be determined: %s" % output)
331 raise LauncherError("Job id couldn't be determined: %s" % output)
332 self.job_id = job_id
332 self.job_id = job_id
333 log.msg('Job started with job id: %r' % job_id)
333 log.msg('Job started with job id: %r' % job_id)
334 return job_id
334 return job_id
335
335
336 def write_batch_script(self, n):
336 def write_batch_script(self, n):
337 self.context['n'] = n
337 self.context['n'] = n
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 f = open(self.batch_file, 'w')
340 f = open(self.batch_file, 'w')
341 f.write(script_as_string)
341 f.write(script_as_string)
342 f.close()
342 f.close()
343
343
344 @inlineCallbacks
344 @inlineCallbacks
345 def start(self, n):
345 def start(self, n):
346 """Start n copies of the process using a batch system."""
346 """Start n copies of the process using a batch system."""
347 self.write_batch_script(n)
347 self.write_batch_script(n)
348 output = yield getProcessOutput(self.submit_command,
348 output = yield getProcessOutput(self.submit_command,
349 [self.batch_file], env=os.environ)
349 [self.batch_file], env=os.environ)
350 job_id = self.parse_job_id(output)
350 job_id = self.parse_job_id(output)
351 self.notify_start(job_id)
351 self.notify_start(job_id)
352 defer.returnValue(job_id)
352 defer.returnValue(job_id)
353
353
354 @inlineCallbacks
354 @inlineCallbacks
355 def stop(self):
355 def stop(self):
356 output = yield getProcessOutput(self.delete_command,
356 output = yield getProcessOutput(self.delete_command,
357 [self.job_id], env=os.environ
357 [self.job_id], env=os.environ
358 )
358 )
359 self.notify_stop(output) # Pass the output of the kill cmd
359 self.notify_stop(output) # Pass the output of the kill cmd
360 defer.returnValue(output)
360 defer.returnValue(output)
361
361
362
362
363 class PBSLauncher(BatchSystemLauncher):
363 class PBSLauncher(BatchSystemLauncher):
364
364
365 submit_command = Str('qsub', config=True)
365 submit_command = Str('qsub', config=True)
366 delete_command = Str('qdel', config=True)
366 delete_command = Str('qdel', config=True)
367 job_id_regexp = Str('\d+', config=True)
367 job_id_regexp = Str('\d+', config=True)
368 batch_template = Str('', config=True)
368 batch_template = Str('', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 batch_file = Unicode(u'')
370 batch_file = Unicode(u'')
371
371
372
372
373 #-----------------------------------------------------------------------------
373 #-----------------------------------------------------------------------------
374 # Controller launchers
374 # Controller launchers
375 #-----------------------------------------------------------------------------
375 #-----------------------------------------------------------------------------
376
376
377 def find_controller_cmd():
377 def find_controller_cmd():
378 if sys.platform == 'win32':
378 if sys.platform == 'win32':
379 # This logic is needed because the ipcontroller script doesn't
379 # This logic is needed because the ipcontroller script doesn't
380 # always get installed in the same way or in the same location.
380 # always get installed in the same way or in the same location.
381 from IPython.kernel import ipcontrollerapp
381 from IPython.kernel import ipcontrollerapp
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 # The -u option here turns on unbuffered output, which is required
383 # The -u option here turns on unbuffered output, which is required
384 # on Win32 to prevent wierd conflict and problems with Twisted.
384 # on Win32 to prevent wierd conflict and problems with Twisted.
385 # Also, use sys.executable to make sure we are picking up the
385 # Also, use sys.executable to make sure we are picking up the
386 # right python exe.
386 # right python exe.
387 cmd = [sys.executable, '-u', script_location]
387 cmd = [sys.executable, '-u', script_location]
388 else:
388 else:
389 # ipcontroller has to be on the PATH in this case.
389 # ipcontroller has to be on the PATH in this case.
390 cmd = ['ipcontroller']
390 cmd = ['ipcontroller']
391 return cmd
391 return cmd
392
392
393
393
394 class LocalControllerLauncher(LocalProcessLauncher):
394 class LocalControllerLauncher(LocalProcessLauncher):
395
395
396 controller_cmd = List(find_controller_cmd())
396 controller_cmd = List(find_controller_cmd())
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398
398
399 def find_args(self):
399 def find_args(self):
400 return self.controller_cmd + self.controller_args
400 return self.controller_cmd + self.controller_args
401
401
402 def start(self, profile=None, cluster_dir=None):
402 def start(self, profile=None, cluster_dir=None):
403 if cluster_dir is not None:
403 if cluster_dir is not None:
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 if profile is not None:
405 if profile is not None:
406 self.controller_args.extend(['--profile', profile])
406 self.controller_args.extend(['--profile', profile])
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 return super(LocalControllerLauncher, self).start()
408 return super(LocalControllerLauncher, self).start()
409
409
410
410
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 pass
412 pass
413
413
414
414
415 class MPIExecControllerLauncher(MPIExecLauncher):
415 class MPIExecControllerLauncher(MPIExecLauncher):
416
416
417 controller_cmd = List(find_controller_cmd(), config=False)
417 controller_cmd = List(find_controller_cmd(), config=False)
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 n = Int(1, config=False)
419 n = Int(1, config=False)
420
420
421 def start(self, profile=None, cluster_dir=None):
421 def start(self, profile=None, cluster_dir=None):
422 if cluster_dir is not None:
422 if cluster_dir is not None:
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 if profile is not None:
424 if profile is not None:
425 self.controller_args.extend(['--profile', profile])
425 self.controller_args.extend(['--profile', profile])
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 return super(MPIExecControllerLauncher, self).start(1)
427 return super(MPIExecControllerLauncher, self).start(1)
428
428
429
429
430 def find_args(self):
430 def find_args(self):
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 self.controller_cmd + self.controller_args
432 self.controller_cmd + self.controller_args
433
433
434
434
435 class PBSControllerLauncher(PBSLauncher):
435 class PBSControllerLauncher(PBSLauncher):
436
436
437 def start(self, profile=None, cluster_dir=None):
437 def start(self, profile=None, cluster_dir=None):
438 # Here we save profile and cluster_dir in the context so they
438 # Here we save profile and cluster_dir in the context so they
439 # can be used in the batch script template as ${profile} and
439 # can be used in the batch script template as ${profile} and
440 # ${cluster_dir}
440 # ${cluster_dir}
441 if cluster_dir is not None:
441 if cluster_dir is not None:
442 self.context['cluster_dir'] = cluster_dir
442 self.context['cluster_dir'] = cluster_dir
443 if profile is not None:
443 if profile is not None:
444 self.context['profile'] = profile
444 self.context['profile'] = profile
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 return super(PBSControllerLauncher, self).start(1)
446 return super(PBSControllerLauncher, self).start(1)
447
447
448
448
449 class SSHControllerLauncher(SSHLauncher):
449 class SSHControllerLauncher(SSHLauncher):
450 pass
450 pass
451
451
452
452
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454 # Engine launchers
454 # Engine launchers
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456
456
457
457
458 def find_engine_cmd():
458 def find_engine_cmd():
459 if sys.platform == 'win32':
459 if sys.platform == 'win32':
460 # This logic is needed because the ipengine script doesn't
460 # This logic is needed because the ipengine script doesn't
461 # always get installed in the same way or in the same location.
461 # always get installed in the same way or in the same location.
462 from IPython.kernel import ipengineapp
462 from IPython.kernel import ipengineapp
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 # The -u option here turns on unbuffered output, which is required
464 # The -u option here turns on unbuffered output, which is required
465 # on Win32 to prevent wierd conflict and problems with Twisted.
465 # on Win32 to prevent wierd conflict and problems with Twisted.
466 # Also, use sys.executable to make sure we are picking up the
466 # Also, use sys.executable to make sure we are picking up the
467 # right python exe.
467 # right python exe.
468 cmd = [sys.executable, '-u', script_location]
468 cmd = [sys.executable, '-u', script_location]
469 else:
469 else:
470 # ipcontroller has to be on the PATH in this case.
470 # ipcontroller has to be on the PATH in this case.
471 cmd = ['ipengine']
471 cmd = ['ipengine']
472 return cmd
472 return cmd
473
473
474
474
475 class LocalEngineLauncher(LocalProcessLauncher):
475 class LocalEngineLauncher(LocalProcessLauncher):
476
476
477 engine_cmd = List(find_engine_cmd())
477 engine_cmd = List(find_engine_cmd())
478 engine_args = List(
478 engine_args = List(
479 ['--log-to-file','--log-level', '40'], config=True
479 ['--log-to-file','--log-level', '40'], config=True
480 )
480 )
481
481
482 def find_args(self):
482 def find_args(self):
483 return self.engine_cmd + self.engine_args
483 return self.engine_cmd + self.engine_args
484
484
485 def start(self, profile=None, cluster_dir=None):
485 def start(self, profile=None, cluster_dir=None):
486 if cluster_dir is not None:
486 if cluster_dir is not None:
487 self.engine_args.extend(['--cluster-dir', cluster_dir])
487 self.engine_args.extend(['--cluster-dir', cluster_dir])
488 if profile is not None:
488 if profile is not None:
489 self.engine_args.extend(['--profile', profile])
489 self.engine_args.extend(['--profile', profile])
490 return super(LocalEngineLauncher, self).start()
490 return super(LocalEngineLauncher, self).start()
491
491
492
492
493 class LocalEngineSetLauncher(BaseLauncher):
493 class LocalEngineSetLauncher(BaseLauncher):
494
494
495 engine_args = List(
495 engine_args = List(
496 ['--log-to-file','--log-level', '40'], config=True
496 ['--log-to-file','--log-level', '40'], config=True
497 )
497 )
498
498
499 def __init__(self, working_dir, parent=None, name=None, config=None):
499 def __init__(self, working_dir, parent=None, name=None, config=None):
500 super(LocalEngineSetLauncher, self).__init__(
500 super(LocalEngineSetLauncher, self).__init__(
501 working_dir, parent, name, config
501 working_dir, parent, name, config
502 )
502 )
503 self.launchers = []
503 self.launchers = []
504
504
505 def start(self, n, profile=None, cluster_dir=None):
505 def start(self, n, profile=None, cluster_dir=None):
506 dlist = []
506 dlist = []
507 for i in range(n):
507 for i in range(n):
508 el = LocalEngineLauncher(self.working_dir, self)
508 el = LocalEngineLauncher(self.working_dir, self)
509 # Copy the engine args over to each engine launcher.
509 # Copy the engine args over to each engine launcher.
510 import copy
510 import copy
511 el.engine_args = copy.deepcopy(self.engine_args)
511 el.engine_args = copy.deepcopy(self.engine_args)
512 d = el.start(profile, cluster_dir)
512 d = el.start(profile, cluster_dir)
513 if i==0:
513 if i==0:
514 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
514 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
515 self.launchers.append(el)
515 self.launchers.append(el)
516 dlist.append(d)
516 dlist.append(d)
517 # The consumeErrors here could be dangerous
517 # The consumeErrors here could be dangerous
518 dfinal = gatherBoth(dlist, consumeErrors=True)
518 dfinal = gatherBoth(dlist, consumeErrors=True)
519 dfinal.addCallback(self.notify_start)
519 dfinal.addCallback(self.notify_start)
520 return dfinal
520 return dfinal
521
521
522 def find_args(self):
522 def find_args(self):
523 return ['engine set']
523 return ['engine set']
524
524
525 def signal(self, sig):
525 def signal(self, sig):
526 dlist = []
526 dlist = []
527 for el in self.launchers:
527 for el in self.launchers:
528 d = el.signal(sig)
528 d = el.signal(sig)
529 dlist.append(d)
529 dlist.append(d)
530 dfinal = gatherBoth(dlist, consumeErrors=True)
530 dfinal = gatherBoth(dlist, consumeErrors=True)
531 return dfinal
531 return dfinal
532
532
533 def interrupt_then_kill(self, delay=1.0):
533 def interrupt_then_kill(self, delay=1.0):
534 dlist = []
534 dlist = []
535 for el in self.launchers:
535 for el in self.launchers:
536 d = el.interrupt_then_kill(delay)
536 d = el.interrupt_then_kill(delay)
537 dlist.append(d)
537 dlist.append(d)
538 dfinal = gatherBoth(dlist, consumeErrors=True)
538 dfinal = gatherBoth(dlist, consumeErrors=True)
539 return dfinal
539 return dfinal
540
540
541 def stop(self):
541 def stop(self):
542 return self.interrupt_then_kill()
542 return self.interrupt_then_kill()
543
543
544 def observe_stop(self):
544 def observe_stop(self):
545 dlist = [el.observe_stop() for el in self.launchers]
545 dlist = [el.observe_stop() for el in self.launchers]
546 dfinal = gatherBoth(dlist, consumeErrors=False)
546 dfinal = gatherBoth(dlist, consumeErrors=False)
547 dfinal.addCallback(self.notify_stop)
547 dfinal.addCallback(self.notify_stop)
548 return dfinal
548 return dfinal
549
549
550
550
551 class MPIExecEngineSetLauncher(MPIExecLauncher):
551 class MPIExecEngineSetLauncher(MPIExecLauncher):
552
552
553 engine_cmd = List(find_engine_cmd(), config=False)
553 engine_cmd = List(find_engine_cmd(), config=False)
554 engine_args = List(
554 engine_args = List(
555 ['--log-to-file','--log-level', '40'], config=True
555 ['--log-to-file','--log-level', '40'], config=True
556 )
556 )
557 n = Int(1, config=True)
557 n = Int(1, config=True)
558
558
559 def start(self, n, profile=None, cluster_dir=None):
559 def start(self, n, profile=None, cluster_dir=None):
560 if cluster_dir is not None:
560 if cluster_dir is not None:
561 self.engine_args.extend(['--cluster-dir', cluster_dir])
561 self.engine_args.extend(['--cluster-dir', cluster_dir])
562 if profile is not None:
562 if profile is not None:
563 self.engine_args.extend(['--profile', profile])
563 self.engine_args.extend(['--profile', profile])
564 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
564 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
565 return super(MPIExecEngineSetLauncher, self).start(n)
565 return super(MPIExecEngineSetLauncher, self).start(n)
566
566
567 def find_args(self):
567 def find_args(self):
568 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
568 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
569 self.engine_cmd + self.engine_args
569 self.engine_cmd + self.engine_args
570
570
571
571
572 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
572 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
573 pass
573 pass
574
574
575
575
576 class PBSEngineSetLauncher(PBSLauncher):
576 class PBSEngineSetLauncher(PBSLauncher):
577
577
578 def start(self, n, profile=None, cluster_dir=None):
578 def start(self, n, profile=None, cluster_dir=None):
579 if cluster_dir is not None:
579 if cluster_dir is not None:
580 self.program_args.extend(['--cluster-dir', cluster_dir])
580 self.program_args.extend(['--cluster-dir', cluster_dir])
581 if profile is not None:
581 if profile is not None:
582 self.program_args.extend(['-p', profile])
582 self.program_args.extend(['-p', profile])
583 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
583 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
584 return super(PBSEngineSetLauncher, self).start(n)
584 return super(PBSEngineSetLauncher, self).start(n)
585
585
586
586
587 class SSHEngineSetLauncher(BaseLauncher):
587 class SSHEngineSetLauncher(BaseLauncher):
588 pass
588 pass
589
589
590
590
591 #-----------------------------------------------------------------------------
591 #-----------------------------------------------------------------------------
592 # A launcher for ipcluster itself!
592 # A launcher for ipcluster itself!
593 #-----------------------------------------------------------------------------
593 #-----------------------------------------------------------------------------
594
594
595
595
596 def find_ipcluster_cmd():
596 def find_ipcluster_cmd():
597 if sys.platform == 'win32':
597 if sys.platform == 'win32':
598 # This logic is needed because the ipcluster script doesn't
598 # This logic is needed because the ipcluster script doesn't
599 # always get installed in the same way or in the same location.
599 # always get installed in the same way or in the same location.
600 from IPython.kernel import ipclusterapp
600 from IPython.kernel import ipclusterapp
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 # The -u option here turns on unbuffered output, which is required
602 # The -u option here turns on unbuffered output, which is required
603 # on Win32 to prevent wierd conflict and problems with Twisted.
603 # on Win32 to prevent wierd conflict and problems with Twisted.
604 # Also, use sys.executable to make sure we are picking up the
604 # Also, use sys.executable to make sure we are picking up the
605 # right python exe.
605 # right python exe.
606 cmd = [sys.executable, '-u', script_location]
606 cmd = [sys.executable, '-u', script_location]
607 else:
607 else:
608 # ipcontroller has to be on the PATH in this case.
608 # ipcontroller has to be on the PATH in this case.
609 cmd = ['ipcluster']
609 cmd = ['ipcluster']
610 return cmd
610 return cmd
611
611
612
612
613 class IPClusterLauncher(LocalProcessLauncher):
613 class IPClusterLauncher(LocalProcessLauncher):
614
614
615 ipcluster_cmd = List(find_ipcluster_cmd())
615 ipcluster_cmd = List(find_ipcluster_cmd())
616 ipcluster_args = List(
616 ipcluster_args = List(
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 ipcluster_subcommand = Str('start')
618 ipcluster_subcommand = Str('start')
619 ipcluster_n = Int(2)
619 ipcluster_n = Int(2)
620
620
621 def find_args(self):
621 def find_args(self):
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624
624
625 def start(self):
625 def start(self):
626 log.msg("Starting ipcluster: %r" % self.args)
626 log.msg("Starting ipcluster: %r" % self.args)
627 return super(IPClusterLauncher, self).start()
627 return super(IPClusterLauncher, self).start()
628
628
General Comments 0
You need to be logged in to leave comments. Login now