##// END OF EJS Templates
Fixing two small bugs in :mod:`IPython.kernel`....
Brian Granger -
Show More
@@ -1,87 +1,95 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
5
5
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
7 blocking. This means that methods on the clients return the actual results
7 blocking. This means that methods on the clients return the actual results
8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 rather than a deferred to the result. Also, we manage the Twisted reactor
9 for you. This is done by running the reactor in a thread.
9 for you. This is done by running the reactor in a thread.
10
10
11 The main classes in this module are:
11 The main classes in this module are:
12
12
13 * MultiEngineClient
13 * MultiEngineClient
14 * TaskClient
14 * TaskClient
15 * Task
15 * Task
16 * CompositeError
16 * CompositeError
17 """
17 """
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2009 The IPython Development Team
20 # Copyright (C) 2008-2009 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Imports
27 # Warnings control
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 import sys
31 import warnings
30 import warnings
32
31
33 # from IPython.utils import growl
32 # Twisted generates annoying warnings with Python 2.6, as will do other code
34 # growl.start("IPython1 Client")
33 # that imports 'sets' as of today
34 warnings.filterwarnings('ignore', 'the sets module is deprecated',
35 DeprecationWarning )
36
37 # This one also comes from Twisted
38 warnings.filterwarnings('ignore', 'the sha module is deprecated',
39 DeprecationWarning)
40
41 #-----------------------------------------------------------------------------
42 # Imports
43 #-----------------------------------------------------------------------------
35
44
45 import sys
36
46
37 from twisted.internet import reactor
47 from twisted.internet import reactor
38 from twisted.internet.error import PotentialZombieWarning
48 from twisted.internet.error import PotentialZombieWarning
39 from twisted.python import log
49 from twisted.python import log
40
50
41 from IPython.kernel.clientconnector import ClientConnector, Cluster
51 from IPython.kernel.clientconnector import ClientConnector, Cluster
42 from IPython.kernel.twistedutil import ReactorInThread
52 from IPython.kernel.twistedutil import ReactorInThread
43 from IPython.kernel.twistedutil import blockingCallFromThread
53 from IPython.kernel.twistedutil import blockingCallFromThread
44
54
45 # These enable various things
55 # These enable various things
46 from IPython.kernel import codeutil
56 from IPython.kernel import codeutil
47 # import IPython.kernel.magic
57 # import IPython.kernel.magic
48
58
49 # Other things that the user will need
59 # Other things that the user will need
50 from IPython.kernel.task import MapTask, StringTask
60 from IPython.kernel.task import MapTask, StringTask
51 from IPython.kernel.error import CompositeError
61 from IPython.kernel.error import CompositeError
52
62
53 #-------------------------------------------------------------------------------
63 #-------------------------------------------------------------------------------
54 # Code
64 # Code
55 #-------------------------------------------------------------------------------
65 #-------------------------------------------------------------------------------
56
66
57 warnings.simplefilter('ignore', PotentialZombieWarning)
67 warnings.simplefilter('ignore', PotentialZombieWarning)
58
68
59 _client_tub = ClientConnector()
69 _client_tub = ClientConnector()
60
70
61 get_multiengine_client = _client_tub.get_multiengine_client
71 get_multiengine_client = _client_tub.get_multiengine_client
62 get_task_client = _client_tub.get_task_client
72 get_task_client = _client_tub.get_task_client
63 MultiEngineClient = get_multiengine_client
73 MultiEngineClient = get_multiengine_client
64 TaskClient = get_task_client
74 TaskClient = get_task_client
65
75
66 # This isn't great. I should probably set this up in the ReactorInThread
76 # This isn't great. I should probably set this up in the ReactorInThread
67 # class below. But, it does work for now.
77 # class below. But, it does work for now.
68 log.startLogging(sys.stdout, setStdout=0)
78 log.startLogging(sys.stdout, setStdout=0)
69
79
70 # Now we start the reactor in a thread
80 # Now we start the reactor in a thread
71 rit = ReactorInThread()
81 rit = ReactorInThread()
72 rit.setDaemon(True)
82 rit.setDaemon(True)
73 rit.start()
83 rit.start()
74
84
75
85
76
77
78 __all__ = [
86 __all__ = [
79 'MapTask',
87 'MapTask',
80 'StringTask',
88 'StringTask',
81 'MultiEngineClient',
89 'MultiEngineClient',
82 'TaskClient',
90 'TaskClient',
83 'CompositeError',
91 'CompositeError',
84 'get_task_client',
92 'get_task_client',
85 'get_multiengine_client',
93 'get_multiengine_client',
86 'Cluster'
94 'Cluster'
87 ]
95 ]
@@ -1,832 +1,835 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.utils.path import get_ipython_module_path
25 from IPython.utils.path import get_ipython_module_path
26 from IPython.utils.process import find_cmd, pycmd2argv
26 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
27 from IPython.kernel.twistedutil import (
27 from IPython.kernel.twistedutil import (
28 gatherBoth,
28 gatherBoth,
29 make_deferred,
29 make_deferred,
30 sleep_deferred
30 sleep_deferred
31 )
31 )
32 from IPython.kernel.winhpcjob import (
32 from IPython.kernel.winhpcjob import (
33 IPControllerTask, IPEngineTask,
33 IPControllerTask, IPEngineTask,
34 IPControllerJob, IPEngineSetJob
34 IPControllerJob, IPEngineSetJob
35 )
35 )
36
36
37 from twisted.internet import reactor, defer
37 from twisted.internet import reactor, defer
38 from twisted.internet.defer import inlineCallbacks
38 from twisted.internet.defer import inlineCallbacks
39 from twisted.internet.protocol import ProcessProtocol
39 from twisted.internet.protocol import ProcessProtocol
40 from twisted.internet.utils import getProcessOutput
40 from twisted.internet.utils import getProcessOutput
41 from twisted.internet.error import ProcessDone, ProcessTerminated
41 from twisted.internet.error import ProcessDone, ProcessTerminated
42 from twisted.python import log
42 from twisted.python import log
43 from twisted.python.failure import Failure
43 from twisted.python.failure import Failure
44
44
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Paths to the kernel apps
47 # Paths to the kernel apps
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
51 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
52 'IPython.kernel.ipclusterapp'
52 'IPython.kernel.ipclusterapp'
53 ))
53 ))
54
54
55 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
55 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
56 'IPython.kernel.ipengineapp'
56 'IPython.kernel.ipengineapp'
57 ))
57 ))
58
58
59 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
59 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
60 'IPython.kernel.ipcontrollerapp'
60 'IPython.kernel.ipcontrollerapp'
61 ))
61 ))
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Base launchers and errors
64 # Base launchers and errors
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67
67
68 class LauncherError(Exception):
68 class LauncherError(Exception):
69 pass
69 pass
70
70
71
71
72 class ProcessStateError(LauncherError):
72 class ProcessStateError(LauncherError):
73 pass
73 pass
74
74
75
75
76 class UnknownStatus(LauncherError):
76 class UnknownStatus(LauncherError):
77 pass
77 pass
78
78
79
79
80 class BaseLauncher(Component):
80 class BaseLauncher(Component):
81 """An asbtraction for starting, stopping and signaling a process."""
81 """An asbtraction for starting, stopping and signaling a process."""
82
82
83 # In all of the launchers, the work_dir is where child processes will be
83 # In all of the launchers, the work_dir is where child processes will be
84 # run. This will usually be the cluster_dir, but may not be. any work_dir
84 # run. This will usually be the cluster_dir, but may not be. any work_dir
85 # passed into the __init__ method will override the config value.
85 # passed into the __init__ method will override the config value.
86 # This should not be used to set the work_dir for the actual engine
86 # This should not be used to set the work_dir for the actual engine
87 # and controller. Instead, use their own config files or the
87 # and controller. Instead, use their own config files or the
88 # controller_args, engine_args attributes of the launchers to add
88 # controller_args, engine_args attributes of the launchers to add
89 # the --work-dir option.
89 # the --work-dir option.
90 work_dir = Unicode(u'')
90 work_dir = Unicode(u'')
91
91
92 def __init__(self, work_dir, parent=None, name=None, config=None):
92 def __init__(self, work_dir, parent=None, name=None, config=None):
93 super(BaseLauncher, self).__init__(parent, name, config)
93 super(BaseLauncher, self).__init__(parent, name, config)
94 self.work_dir = work_dir
94 self.work_dir = work_dir
95 self.state = 'before' # can be before, running, after
95 self.state = 'before' # can be before, running, after
96 self.stop_deferreds = []
96 self.stop_deferreds = []
97 self.start_data = None
97 self.start_data = None
98 self.stop_data = None
98 self.stop_data = None
99
99
100 @property
100 @property
101 def args(self):
101 def args(self):
102 """A list of cmd and args that will be used to start the process.
102 """A list of cmd and args that will be used to start the process.
103
103
104 This is what is passed to :func:`spawnProcess` and the first element
104 This is what is passed to :func:`spawnProcess` and the first element
105 will be the process name.
105 will be the process name.
106 """
106 """
107 return self.find_args()
107 return self.find_args()
108
108
109 def find_args(self):
109 def find_args(self):
110 """The ``.args`` property calls this to find the args list.
110 """The ``.args`` property calls this to find the args list.
111
111
112 Subcommand should implement this to construct the cmd and args.
112 Subcommand should implement this to construct the cmd and args.
113 """
113 """
114 raise NotImplementedError('find_args must be implemented in a subclass')
114 raise NotImplementedError('find_args must be implemented in a subclass')
115
115
116 @property
116 @property
117 def arg_str(self):
117 def arg_str(self):
118 """The string form of the program arguments."""
118 """The string form of the program arguments."""
119 return ' '.join(self.args)
119 return ' '.join(self.args)
120
120
121 @property
121 @property
122 def running(self):
122 def running(self):
123 """Am I running."""
123 """Am I running."""
124 if self.state == 'running':
124 if self.state == 'running':
125 return True
125 return True
126 else:
126 else:
127 return False
127 return False
128
128
129 def start(self):
129 def start(self):
130 """Start the process.
130 """Start the process.
131
131
132 This must return a deferred that fires with information about the
132 This must return a deferred that fires with information about the
133 process starting (like a pid, job id, etc.).
133 process starting (like a pid, job id, etc.).
134 """
134 """
135 return defer.fail(
135 return defer.fail(
136 Failure(NotImplementedError(
136 Failure(NotImplementedError(
137 'start must be implemented in a subclass')
137 'start must be implemented in a subclass')
138 )
138 )
139 )
139 )
140
140
141 def stop(self):
141 def stop(self):
142 """Stop the process and notify observers of stopping.
142 """Stop the process and notify observers of stopping.
143
143
144 This must return a deferred that fires with information about the
144 This must return a deferred that fires with information about the
145 processing stopping, like errors that occur while the process is
145 processing stopping, like errors that occur while the process is
146 attempting to be shut down. This deferred won't fire when the process
146 attempting to be shut down. This deferred won't fire when the process
147 actually stops. To observe the actual process stopping, see
147 actually stops. To observe the actual process stopping, see
148 :func:`observe_stop`.
148 :func:`observe_stop`.
149 """
149 """
150 return defer.fail(
150 return defer.fail(
151 Failure(NotImplementedError(
151 Failure(NotImplementedError(
152 'stop must be implemented in a subclass')
152 'stop must be implemented in a subclass')
153 )
153 )
154 )
154 )
155
155
156 def observe_stop(self):
156 def observe_stop(self):
157 """Get a deferred that will fire when the process stops.
157 """Get a deferred that will fire when the process stops.
158
158
159 The deferred will fire with data that contains information about
159 The deferred will fire with data that contains information about
160 the exit status of the process.
160 the exit status of the process.
161 """
161 """
162 if self.state=='after':
162 if self.state=='after':
163 return defer.succeed(self.stop_data)
163 return defer.succeed(self.stop_data)
164 else:
164 else:
165 d = defer.Deferred()
165 d = defer.Deferred()
166 self.stop_deferreds.append(d)
166 self.stop_deferreds.append(d)
167 return d
167 return d
168
168
169 def notify_start(self, data):
169 def notify_start(self, data):
170 """Call this to trigger startup actions.
170 """Call this to trigger startup actions.
171
171
172 This logs the process startup and sets the state to 'running'. It is
172 This logs the process startup and sets the state to 'running'. It is
173 a pass-through so it can be used as a callback.
173 a pass-through so it can be used as a callback.
174 """
174 """
175
175
176 log.msg('Process %r started: %r' % (self.args[0], data))
176 log.msg('Process %r started: %r' % (self.args[0], data))
177 self.start_data = data
177 self.start_data = data
178 self.state = 'running'
178 self.state = 'running'
179 return data
179 return data
180
180
181 def notify_stop(self, data):
181 def notify_stop(self, data):
182 """Call this to trigger process stop actions.
182 """Call this to trigger process stop actions.
183
183
184 This logs the process stopping and sets the state to 'after'. Call
184 This logs the process stopping and sets the state to 'after'. Call
185 this to trigger all the deferreds from :func:`observe_stop`."""
185 this to trigger all the deferreds from :func:`observe_stop`."""
186
186
187 log.msg('Process %r stopped: %r' % (self.args[0], data))
187 log.msg('Process %r stopped: %r' % (self.args[0], data))
188 self.stop_data = data
188 self.stop_data = data
189 self.state = 'after'
189 self.state = 'after'
190 for i in range(len(self.stop_deferreds)):
190 for i in range(len(self.stop_deferreds)):
191 d = self.stop_deferreds.pop()
191 d = self.stop_deferreds.pop()
192 d.callback(data)
192 d.callback(data)
193 return data
193 return data
194
194
195 def signal(self, sig):
195 def signal(self, sig):
196 """Signal the process.
196 """Signal the process.
197
197
198 Return a semi-meaningless deferred after signaling the process.
198 Return a semi-meaningless deferred after signaling the process.
199
199
200 Parameters
200 Parameters
201 ----------
201 ----------
202 sig : str or int
202 sig : str or int
203 'KILL', 'INT', etc., or any signal number
203 'KILL', 'INT', etc., or any signal number
204 """
204 """
205 return defer.fail(
205 return defer.fail(
206 Failure(NotImplementedError(
206 Failure(NotImplementedError(
207 'signal must be implemented in a subclass')
207 'signal must be implemented in a subclass')
208 )
208 )
209 )
209 )
210
210
211
211
212 #-----------------------------------------------------------------------------
212 #-----------------------------------------------------------------------------
213 # Local process launchers
213 # Local process launchers
214 #-----------------------------------------------------------------------------
214 #-----------------------------------------------------------------------------
215
215
216
216
217 class LocalProcessLauncherProtocol(ProcessProtocol):
217 class LocalProcessLauncherProtocol(ProcessProtocol):
218 """A ProcessProtocol to go with the LocalProcessLauncher."""
218 """A ProcessProtocol to go with the LocalProcessLauncher."""
219
219
220 def __init__(self, process_launcher):
220 def __init__(self, process_launcher):
221 self.process_launcher = process_launcher
221 self.process_launcher = process_launcher
222 self.pid = None
222 self.pid = None
223
223
224 def connectionMade(self):
224 def connectionMade(self):
225 self.pid = self.transport.pid
225 self.pid = self.transport.pid
226 self.process_launcher.notify_start(self.transport.pid)
226 self.process_launcher.notify_start(self.transport.pid)
227
227
228 def processEnded(self, status):
228 def processEnded(self, status):
229 value = status.value
229 value = status.value
230 if isinstance(value, ProcessDone):
230 if isinstance(value, ProcessDone):
231 self.process_launcher.notify_stop(
231 self.process_launcher.notify_stop(
232 {'exit_code':0,
232 {'exit_code':0,
233 'signal':None,
233 'signal':None,
234 'status':None,
234 'status':None,
235 'pid':self.pid
235 'pid':self.pid
236 }
236 }
237 )
237 )
238 elif isinstance(value, ProcessTerminated):
238 elif isinstance(value, ProcessTerminated):
239 self.process_launcher.notify_stop(
239 self.process_launcher.notify_stop(
240 {'exit_code':value.exitCode,
240 {'exit_code':value.exitCode,
241 'signal':value.signal,
241 'signal':value.signal,
242 'status':value.status,
242 'status':value.status,
243 'pid':self.pid
243 'pid':self.pid
244 }
244 }
245 )
245 )
246 else:
246 else:
247 raise UnknownStatus("Unknown exit status, this is probably a "
247 raise UnknownStatus("Unknown exit status, this is probably a "
248 "bug in Twisted")
248 "bug in Twisted")
249
249
250 def outReceived(self, data):
250 def outReceived(self, data):
251 log.msg(data)
251 log.msg(data)
252
252
253 def errReceived(self, data):
253 def errReceived(self, data):
254 log.err(data)
254 log.err(data)
255
255
256
256
257 class LocalProcessLauncher(BaseLauncher):
257 class LocalProcessLauncher(BaseLauncher):
258 """Start and stop an external process in an asynchronous manner.
258 """Start and stop an external process in an asynchronous manner.
259
259
260 This will launch the external process with a working directory of
260 This will launch the external process with a working directory of
261 ``self.work_dir``.
261 ``self.work_dir``.
262 """
262 """
263
263
264 # This is used to to construct self.args, which is passed to
264 # This is used to to construct self.args, which is passed to
265 # spawnProcess.
265 # spawnProcess.
266 cmd_and_args = List([])
266 cmd_and_args = List([])
267
267
268 def __init__(self, work_dir, parent=None, name=None, config=None):
268 def __init__(self, work_dir, parent=None, name=None, config=None):
269 super(LocalProcessLauncher, self).__init__(
269 super(LocalProcessLauncher, self).__init__(
270 work_dir, parent, name, config
270 work_dir, parent, name, config
271 )
271 )
272 self.process_protocol = None
272 self.process_protocol = None
273 self.start_deferred = None
273 self.start_deferred = None
274
274
275 def find_args(self):
275 def find_args(self):
276 return self.cmd_and_args
276 return self.cmd_and_args
277
277
278 def start(self):
278 def start(self):
279 if self.state == 'before':
279 if self.state == 'before':
280 self.process_protocol = LocalProcessLauncherProtocol(self)
280 self.process_protocol = LocalProcessLauncherProtocol(self)
281 self.start_deferred = defer.Deferred()
281 self.start_deferred = defer.Deferred()
282 self.process_transport = reactor.spawnProcess(
282 self.process_transport = reactor.spawnProcess(
283 self.process_protocol,
283 self.process_protocol,
284 str(self.args[0]), # twisted expects these to be str, not unicode
284 str(self.args[0]), # twisted expects these to be str, not unicode
285 [str(a) for a in self.args], # str expected, not unicode
285 [str(a) for a in self.args], # str expected, not unicode
286 env=os.environ,
286 env=os.environ,
287 path=self.work_dir # start in the work_dir
287 path=self.work_dir # start in the work_dir
288 )
288 )
289 return self.start_deferred
289 return self.start_deferred
290 else:
290 else:
291 s = 'The process was already started and has state: %r' % self.state
291 s = 'The process was already started and has state: %r' % self.state
292 return defer.fail(ProcessStateError(s))
292 return defer.fail(ProcessStateError(s))
293
293
294 def notify_start(self, data):
294 def notify_start(self, data):
295 super(LocalProcessLauncher, self).notify_start(data)
295 super(LocalProcessLauncher, self).notify_start(data)
296 self.start_deferred.callback(data)
296 self.start_deferred.callback(data)
297
297
298 def stop(self):
298 def stop(self):
299 return self.interrupt_then_kill()
299 return self.interrupt_then_kill()
300
300
301 @make_deferred
301 @make_deferred
302 def signal(self, sig):
302 def signal(self, sig):
303 if self.state == 'running':
303 if self.state == 'running':
304 self.process_transport.signalProcess(sig)
304 self.process_transport.signalProcess(sig)
305
305
306 @inlineCallbacks
306 @inlineCallbacks
307 def interrupt_then_kill(self, delay=2.0):
307 def interrupt_then_kill(self, delay=2.0):
308 """Send INT, wait a delay and then send KILL."""
308 """Send INT, wait a delay and then send KILL."""
309 yield self.signal('INT')
309 yield self.signal('INT')
310 yield sleep_deferred(delay)
310 yield sleep_deferred(delay)
311 yield self.signal('KILL')
311 yield self.signal('KILL')
312
312
313
313
314 class LocalControllerLauncher(LocalProcessLauncher):
314 class LocalControllerLauncher(LocalProcessLauncher):
315 """Launch a controller as a regular external process."""
315 """Launch a controller as a regular external process."""
316
316
317 controller_cmd = List(ipcontroller_cmd_argv, config=True)
317 controller_cmd = List(ipcontroller_cmd_argv, config=True)
318 # Command line arguments to ipcontroller.
318 # Command line arguments to ipcontroller.
319 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
319 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
320
320
321 def find_args(self):
321 def find_args(self):
322 return self.controller_cmd + self.controller_args
322 return self.controller_cmd + self.controller_args
323
323
324 def start(self, cluster_dir):
324 def start(self, cluster_dir):
325 """Start the controller by cluster_dir."""
325 """Start the controller by cluster_dir."""
326 self.controller_args.extend(['--cluster-dir', cluster_dir])
326 self.controller_args.extend(['--cluster-dir', cluster_dir])
327 self.cluster_dir = unicode(cluster_dir)
327 self.cluster_dir = unicode(cluster_dir)
328 log.msg("Starting LocalControllerLauncher: %r" % self.args)
328 log.msg("Starting LocalControllerLauncher: %r" % self.args)
329 return super(LocalControllerLauncher, self).start()
329 return super(LocalControllerLauncher, self).start()
330
330
331
331
332 class LocalEngineLauncher(LocalProcessLauncher):
332 class LocalEngineLauncher(LocalProcessLauncher):
333 """Launch a single engine as a regular externall process."""
333 """Launch a single engine as a regular externall process."""
334
334
335 engine_cmd = List(ipengine_cmd_argv, config=True)
335 engine_cmd = List(ipengine_cmd_argv, config=True)
336 # Command line arguments for ipengine.
336 # Command line arguments for ipengine.
337 engine_args = List(
337 engine_args = List(
338 ['--log-to-file','--log-level', '40'], config=True
338 ['--log-to-file','--log-level', '40'], config=True
339 )
339 )
340
340
341 def find_args(self):
341 def find_args(self):
342 return self.engine_cmd + self.engine_args
342 return self.engine_cmd + self.engine_args
343
343
344 def start(self, cluster_dir):
344 def start(self, cluster_dir):
345 """Start the engine by cluster_dir."""
345 """Start the engine by cluster_dir."""
346 self.engine_args.extend(['--cluster-dir', cluster_dir])
346 self.engine_args.extend(['--cluster-dir', cluster_dir])
347 self.cluster_dir = unicode(cluster_dir)
347 self.cluster_dir = unicode(cluster_dir)
348 return super(LocalEngineLauncher, self).start()
348 return super(LocalEngineLauncher, self).start()
349
349
350
350
351 class LocalEngineSetLauncher(BaseLauncher):
351 class LocalEngineSetLauncher(BaseLauncher):
352 """Launch a set of engines as regular external processes."""
352 """Launch a set of engines as regular external processes."""
353
353
354 # Command line arguments for ipengine.
354 # Command line arguments for ipengine.
355 engine_args = List(
355 engine_args = List(
356 ['--log-to-file','--log-level', '40'], config=True
356 ['--log-to-file','--log-level', '40'], config=True
357 )
357 )
358
358
359 def __init__(self, work_dir, parent=None, name=None, config=None):
359 def __init__(self, work_dir, parent=None, name=None, config=None):
360 super(LocalEngineSetLauncher, self).__init__(
360 super(LocalEngineSetLauncher, self).__init__(
361 work_dir, parent, name, config
361 work_dir, parent, name, config
362 )
362 )
363 self.launchers = []
363 self.launchers = []
364
364
365 def start(self, n, cluster_dir):
365 def start(self, n, cluster_dir):
366 """Start n engines by profile or cluster_dir."""
366 """Start n engines by profile or cluster_dir."""
367 self.cluster_dir = unicode(cluster_dir)
367 self.cluster_dir = unicode(cluster_dir)
368 dlist = []
368 dlist = []
369 for i in range(n):
369 for i in range(n):
370 el = LocalEngineLauncher(self.work_dir, self)
370 el = LocalEngineLauncher(self.work_dir, self)
371 # Copy the engine args over to each engine launcher.
371 # Copy the engine args over to each engine launcher.
372 import copy
372 import copy
373 el.engine_args = copy.deepcopy(self.engine_args)
373 el.engine_args = copy.deepcopy(self.engine_args)
374 d = el.start(cluster_dir)
374 d = el.start(cluster_dir)
375 if i==0:
375 if i==0:
376 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
376 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
377 self.launchers.append(el)
377 self.launchers.append(el)
378 dlist.append(d)
378 dlist.append(d)
379 # The consumeErrors here could be dangerous
379 # The consumeErrors here could be dangerous
380 dfinal = gatherBoth(dlist, consumeErrors=True)
380 dfinal = gatherBoth(dlist, consumeErrors=True)
381 dfinal.addCallback(self.notify_start)
381 dfinal.addCallback(self.notify_start)
382 return dfinal
382 return dfinal
383
383
384 def find_args(self):
384 def find_args(self):
385 return ['engine set']
385 return ['engine set']
386
386
387 def signal(self, sig):
387 def signal(self, sig):
388 dlist = []
388 dlist = []
389 for el in self.launchers:
389 for el in self.launchers:
390 d = el.signal(sig)
390 d = el.signal(sig)
391 dlist.append(d)
391 dlist.append(d)
392 dfinal = gatherBoth(dlist, consumeErrors=True)
392 dfinal = gatherBoth(dlist, consumeErrors=True)
393 return dfinal
393 return dfinal
394
394
395 def interrupt_then_kill(self, delay=1.0):
395 def interrupt_then_kill(self, delay=1.0):
396 dlist = []
396 dlist = []
397 for el in self.launchers:
397 for el in self.launchers:
398 d = el.interrupt_then_kill(delay)
398 d = el.interrupt_then_kill(delay)
399 dlist.append(d)
399 dlist.append(d)
400 dfinal = gatherBoth(dlist, consumeErrors=True)
400 dfinal = gatherBoth(dlist, consumeErrors=True)
401 return dfinal
401 return dfinal
402
402
403 def stop(self):
403 def stop(self):
404 return self.interrupt_then_kill()
404 return self.interrupt_then_kill()
405
405
406 def observe_stop(self):
406 def observe_stop(self):
407 dlist = [el.observe_stop() for el in self.launchers]
407 dlist = [el.observe_stop() for el in self.launchers]
408 dfinal = gatherBoth(dlist, consumeErrors=False)
408 dfinal = gatherBoth(dlist, consumeErrors=False)
409 dfinal.addCallback(self.notify_stop)
409 dfinal.addCallback(self.notify_stop)
410 return dfinal
410 return dfinal
411
411
412
412
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414 # MPIExec launchers
414 # MPIExec launchers
415 #-----------------------------------------------------------------------------
415 #-----------------------------------------------------------------------------
416
416
417
417
418 class MPIExecLauncher(LocalProcessLauncher):
418 class MPIExecLauncher(LocalProcessLauncher):
419 """Launch an external process using mpiexec."""
419 """Launch an external process using mpiexec."""
420
420
421 # The mpiexec command to use in starting the process.
421 # The mpiexec command to use in starting the process.
422 mpi_cmd = List(['mpiexec'], config=True)
422 mpi_cmd = List(['mpiexec'], config=True)
423 # The command line arguments to pass to mpiexec.
423 # The command line arguments to pass to mpiexec.
424 mpi_args = List([], config=True)
424 mpi_args = List([], config=True)
425 # The program to start using mpiexec.
425 # The program to start using mpiexec.
426 program = List(['date'], config=True)
426 program = List(['date'], config=True)
427 # The command line argument to the program.
427 # The command line argument to the program.
428 program_args = List([], config=True)
428 program_args = List([], config=True)
429 # The number of instances of the program to start.
429 # The number of instances of the program to start.
430 n = Int(1, config=True)
430 n = Int(1, config=True)
431
431
432 def find_args(self):
432 def find_args(self):
433 """Build self.args using all the fields."""
433 """Build self.args using all the fields."""
434 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
434 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
435 self.program + self.program_args
435 self.program + self.program_args
436
436
437 def start(self, n):
437 def start(self, n):
438 """Start n instances of the program using mpiexec."""
438 """Start n instances of the program using mpiexec."""
439 self.n = n
439 self.n = n
440 return super(MPIExecLauncher, self).start()
440 return super(MPIExecLauncher, self).start()
441
441
442
442
443 class MPIExecControllerLauncher(MPIExecLauncher):
443 class MPIExecControllerLauncher(MPIExecLauncher):
444 """Launch a controller using mpiexec."""
444 """Launch a controller using mpiexec."""
445
445
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 # Command line arguments to ipcontroller.
447 # Command line arguments to ipcontroller.
448 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
448 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
449 n = Int(1, config=False)
449 n = Int(1, config=False)
450
450
451 def start(self, cluster_dir):
451 def start(self, cluster_dir):
452 """Start the controller by cluster_dir."""
452 """Start the controller by cluster_dir."""
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
454 self.cluster_dir = unicode(cluster_dir)
455 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
455 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
456 return super(MPIExecControllerLauncher, self).start(1)
456 return super(MPIExecControllerLauncher, self).start(1)
457
457
458 def find_args(self):
458 def find_args(self):
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 self.controller_cmd + self.controller_args
460 self.controller_cmd + self.controller_args
461
461
462
462
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
464
464
465 engine_cmd = List(ipengine_cmd_argv, config=True)
465 engine_cmd = List(ipengine_cmd_argv, config=True)
466 # Command line arguments for ipengine.
466 # Command line arguments for ipengine.
467 engine_args = List(
467 engine_args = List(
468 ['--log-to-file','--log-level', '40'], config=True
468 ['--log-to-file','--log-level', '40'], config=True
469 )
469 )
470 n = Int(1, config=True)
470 n = Int(1, config=True)
471
471
472 def start(self, n, cluster_dir):
472 def start(self, n, cluster_dir):
473 """Start n engines by profile or cluster_dir."""
473 """Start n engines by profile or cluster_dir."""
474 self.engine_args.extend(['--cluster-dir', cluster_dir])
474 self.engine_args.extend(['--cluster-dir', cluster_dir])
475 self.cluster_dir = unicode(cluster_dir)
475 self.cluster_dir = unicode(cluster_dir)
476 self.n = n
476 self.n = n
477 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 return super(MPIExecEngineSetLauncher, self).start(n)
478 return super(MPIExecEngineSetLauncher, self).start(n)
479
479
480 def find_args(self):
480 def find_args(self):
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
482 self.engine_cmd + self.engine_args
482 self.engine_cmd + self.engine_args
483
483
484
484
485 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
486 # SSH launchers
486 # SSH launchers
487 #-----------------------------------------------------------------------------
487 #-----------------------------------------------------------------------------
488
488
489 # TODO: Get SSH Launcher working again.
489 # TODO: Get SSH Launcher working again.
490
490
491 class SSHLauncher(BaseLauncher):
491 class SSHLauncher(BaseLauncher):
492 """A minimal launcher for ssh.
492 """A minimal launcher for ssh.
493
493
494 To be useful this will probably have to be extended to use the ``sshx``
494 To be useful this will probably have to be extended to use the ``sshx``
495 idea for environment variables. There could be other things this needs
495 idea for environment variables. There could be other things this needs
496 as well.
496 as well.
497 """
497 """
498
498
499 ssh_cmd = List(['ssh'], config=True)
499 ssh_cmd = List(['ssh'], config=True)
500 ssh_args = List([], config=True)
500 ssh_args = List([], config=True)
501 program = List(['date'], config=True)
501 program = List(['date'], config=True)
502 program_args = List([], config=True)
502 program_args = List([], config=True)
503 hostname = Str('', config=True)
503 hostname = Str('', config=True)
504 user = Str('', config=True)
504 user = Str('', config=True)
505 location = Str('')
505 location = Str('')
506
506
507 def _hostname_changed(self, name, old, new):
507 def _hostname_changed(self, name, old, new):
508 self.location = '%s@%s' % (self.user, new)
508 self.location = '%s@%s' % (self.user, new)
509
509
510 def _user_changed(self, name, old, new):
510 def _user_changed(self, name, old, new):
511 self.location = '%s@%s' % (new, self.hostname)
511 self.location = '%s@%s' % (new, self.hostname)
512
512
513 def find_args(self):
513 def find_args(self):
514 return self.ssh_cmd + self.ssh_args + [self.location] + \
514 return self.ssh_cmd + self.ssh_args + [self.location] + \
515 self.program + self.program_args
515 self.program + self.program_args
516
516
517 def start(self, n, hostname=None, user=None):
517 def start(self, n, hostname=None, user=None):
518 if hostname is not None:
518 if hostname is not None:
519 self.hostname = hostname
519 self.hostname = hostname
520 if user is not None:
520 if user is not None:
521 self.user = user
521 self.user = user
522 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
523
524
524
525 class SSHControllerLauncher(SSHLauncher):
525 class SSHControllerLauncher(SSHLauncher):
526 pass
526 pass
527
527
528
528
529 class SSHEngineSetLauncher(BaseLauncher):
529 class SSHEngineSetLauncher(BaseLauncher):
530 pass
530 pass
531
531
532
532
533 #-----------------------------------------------------------------------------
533 #-----------------------------------------------------------------------------
534 # Windows HPC Server 2008 scheduler launchers
534 # Windows HPC Server 2008 scheduler launchers
535 #-----------------------------------------------------------------------------
535 #-----------------------------------------------------------------------------
536
536
537
537
538 # This is only used on Windows.
538 # This is only used on Windows.
539 def find_job_cmd():
539 def find_job_cmd():
540 if os.name=='nt':
540 if os.name=='nt':
541 return find_cmd('job')
541 try:
542 return find_cmd('job')
543 except FindCmdError:
544 return 'job'
542 else:
545 else:
543 return 'job'
546 return 'job'
544
547
545
548
546 class WindowsHPCLauncher(BaseLauncher):
549 class WindowsHPCLauncher(BaseLauncher):
547
550
548 # A regular expression used to get the job id from the output of the
551 # A regular expression used to get the job id from the output of the
549 # submit_command.
552 # submit_command.
550 job_id_regexp = Str(r'\d+', config=True)
553 job_id_regexp = Str(r'\d+', config=True)
551 # The filename of the instantiated job script.
554 # The filename of the instantiated job script.
552 job_file_name = Unicode(u'ipython_job.xml', config=True)
555 job_file_name = Unicode(u'ipython_job.xml', config=True)
553 # The full path to the instantiated job script. This gets made dynamically
556 # The full path to the instantiated job script. This gets made dynamically
554 # by combining the work_dir with the job_file_name.
557 # by combining the work_dir with the job_file_name.
555 job_file = Unicode(u'')
558 job_file = Unicode(u'')
556 # The hostname of the scheduler to submit the job to
559 # The hostname of the scheduler to submit the job to
557 scheduler = Str('', config=True)
560 scheduler = Str('', config=True)
558 job_cmd = Str(find_job_cmd(), config=True)
561 job_cmd = Str(find_job_cmd(), config=True)
559
562
560 def __init__(self, work_dir, parent=None, name=None, config=None):
563 def __init__(self, work_dir, parent=None, name=None, config=None):
561 super(WindowsHPCLauncher, self).__init__(
564 super(WindowsHPCLauncher, self).__init__(
562 work_dir, parent, name, config
565 work_dir, parent, name, config
563 )
566 )
564
567
565 @property
568 @property
566 def job_file(self):
569 def job_file(self):
567 return os.path.join(self.work_dir, self.job_file_name)
570 return os.path.join(self.work_dir, self.job_file_name)
568
571
569 def write_job_file(self, n):
572 def write_job_file(self, n):
570 raise NotImplementedError("Implement write_job_file in a subclass.")
573 raise NotImplementedError("Implement write_job_file in a subclass.")
571
574
572 def find_args(self):
575 def find_args(self):
573 return ['job.exe']
576 return ['job.exe']
574
577
575 def parse_job_id(self, output):
578 def parse_job_id(self, output):
576 """Take the output of the submit command and return the job id."""
579 """Take the output of the submit command and return the job id."""
577 m = re.search(self.job_id_regexp, output)
580 m = re.search(self.job_id_regexp, output)
578 if m is not None:
581 if m is not None:
579 job_id = m.group()
582 job_id = m.group()
580 else:
583 else:
581 raise LauncherError("Job id couldn't be determined: %s" % output)
584 raise LauncherError("Job id couldn't be determined: %s" % output)
582 self.job_id = job_id
585 self.job_id = job_id
583 log.msg('Job started with job id: %r' % job_id)
586 log.msg('Job started with job id: %r' % job_id)
584 return job_id
587 return job_id
585
588
586 @inlineCallbacks
589 @inlineCallbacks
587 def start(self, n):
590 def start(self, n):
588 """Start n copies of the process using the Win HPC job scheduler."""
591 """Start n copies of the process using the Win HPC job scheduler."""
589 self.write_job_file(n)
592 self.write_job_file(n)
590 args = [
593 args = [
591 'submit',
594 'submit',
592 '/jobfile:%s' % self.job_file,
595 '/jobfile:%s' % self.job_file,
593 '/scheduler:%s' % self.scheduler
596 '/scheduler:%s' % self.scheduler
594 ]
597 ]
595 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
598 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
596 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
599 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
597 output = yield getProcessOutput(str(self.job_cmd),
600 output = yield getProcessOutput(str(self.job_cmd),
598 [str(a) for a in args],
601 [str(a) for a in args],
599 env=dict((str(k),str(v)) for k,v in os.environ.items()),
602 env=dict((str(k),str(v)) for k,v in os.environ.items()),
600 path=self.work_dir
603 path=self.work_dir
601 )
604 )
602 job_id = self.parse_job_id(output)
605 job_id = self.parse_job_id(output)
603 self.notify_start(job_id)
606 self.notify_start(job_id)
604 defer.returnValue(job_id)
607 defer.returnValue(job_id)
605
608
606 @inlineCallbacks
609 @inlineCallbacks
607 def stop(self):
610 def stop(self):
608 args = [
611 args = [
609 'cancel',
612 'cancel',
610 self.job_id,
613 self.job_id,
611 '/scheduler:%s' % self.scheduler
614 '/scheduler:%s' % self.scheduler
612 ]
615 ]
613 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
616 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 try:
617 try:
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
618 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
616 output = yield getProcessOutput(str(self.job_cmd),
619 output = yield getProcessOutput(str(self.job_cmd),
617 [str(a) for a in args],
620 [str(a) for a in args],
618 env=dict((str(k),str(v)) for k,v in os.environ.items()),
621 env=dict((str(k),str(v)) for k,v in os.environ.items()),
619 path=self.work_dir
622 path=self.work_dir
620 )
623 )
621 except:
624 except:
622 output = 'The job already appears to be stoppped: %r' % self.job_id
625 output = 'The job already appears to be stoppped: %r' % self.job_id
623 self.notify_stop(output) # Pass the output of the kill cmd
626 self.notify_stop(output) # Pass the output of the kill cmd
624 defer.returnValue(output)
627 defer.returnValue(output)
625
628
626
629
627 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
630 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
628
631
629 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
632 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
630 extra_args = List([], config=False)
633 extra_args = List([], config=False)
631
634
632 def write_job_file(self, n):
635 def write_job_file(self, n):
633 job = IPControllerJob(self)
636 job = IPControllerJob(self)
634
637
635 t = IPControllerTask(self)
638 t = IPControllerTask(self)
636 # The tasks work directory is *not* the actual work directory of
639 # The tasks work directory is *not* the actual work directory of
637 # the controller. It is used as the base path for the stdout/stderr
640 # the controller. It is used as the base path for the stdout/stderr
638 # files that the scheduler redirects to.
641 # files that the scheduler redirects to.
639 t.work_directory = self.cluster_dir
642 t.work_directory = self.cluster_dir
640 # Add the --cluster-dir and from self.start().
643 # Add the --cluster-dir and from self.start().
641 t.controller_args.extend(self.extra_args)
644 t.controller_args.extend(self.extra_args)
642 job.add_task(t)
645 job.add_task(t)
643
646
644 log.msg("Writing job description file: %s" % self.job_file)
647 log.msg("Writing job description file: %s" % self.job_file)
645 job.write(self.job_file)
648 job.write(self.job_file)
646
649
647 @property
650 @property
648 def job_file(self):
651 def job_file(self):
649 return os.path.join(self.cluster_dir, self.job_file_name)
652 return os.path.join(self.cluster_dir, self.job_file_name)
650
653
651 def start(self, cluster_dir):
654 def start(self, cluster_dir):
652 """Start the controller by cluster_dir."""
655 """Start the controller by cluster_dir."""
653 self.extra_args = ['--cluster-dir', cluster_dir]
656 self.extra_args = ['--cluster-dir', cluster_dir]
654 self.cluster_dir = unicode(cluster_dir)
657 self.cluster_dir = unicode(cluster_dir)
655 return super(WindowsHPCControllerLauncher, self).start(1)
658 return super(WindowsHPCControllerLauncher, self).start(1)
656
659
657
660
658 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
661 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
659
662
660 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
663 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
661 extra_args = List([], config=False)
664 extra_args = List([], config=False)
662
665
663 def write_job_file(self, n):
666 def write_job_file(self, n):
664 job = IPEngineSetJob(self)
667 job = IPEngineSetJob(self)
665
668
666 for i in range(n):
669 for i in range(n):
667 t = IPEngineTask(self)
670 t = IPEngineTask(self)
668 # The tasks work directory is *not* the actual work directory of
671 # The tasks work directory is *not* the actual work directory of
669 # the engine. It is used as the base path for the stdout/stderr
672 # the engine. It is used as the base path for the stdout/stderr
670 # files that the scheduler redirects to.
673 # files that the scheduler redirects to.
671 t.work_directory = self.cluster_dir
674 t.work_directory = self.cluster_dir
672 # Add the --cluster-dir and from self.start().
675 # Add the --cluster-dir and from self.start().
673 t.engine_args.extend(self.extra_args)
676 t.engine_args.extend(self.extra_args)
674 job.add_task(t)
677 job.add_task(t)
675
678
676 log.msg("Writing job description file: %s" % self.job_file)
679 log.msg("Writing job description file: %s" % self.job_file)
677 job.write(self.job_file)
680 job.write(self.job_file)
678
681
679 @property
682 @property
680 def job_file(self):
683 def job_file(self):
681 return os.path.join(self.cluster_dir, self.job_file_name)
684 return os.path.join(self.cluster_dir, self.job_file_name)
682
685
683 def start(self, n, cluster_dir):
686 def start(self, n, cluster_dir):
684 """Start the controller by cluster_dir."""
687 """Start the controller by cluster_dir."""
685 self.extra_args = ['--cluster-dir', cluster_dir]
688 self.extra_args = ['--cluster-dir', cluster_dir]
686 self.cluster_dir = unicode(cluster_dir)
689 self.cluster_dir = unicode(cluster_dir)
687 return super(WindowsHPCEngineSetLauncher, self).start(n)
690 return super(WindowsHPCEngineSetLauncher, self).start(n)
688
691
689
692
690 #-----------------------------------------------------------------------------
693 #-----------------------------------------------------------------------------
691 # Batch (PBS) system launchers
694 # Batch (PBS) system launchers
692 #-----------------------------------------------------------------------------
695 #-----------------------------------------------------------------------------
693
696
694 # TODO: Get PBS launcher working again.
697 # TODO: Get PBS launcher working again.
695
698
696 class BatchSystemLauncher(BaseLauncher):
699 class BatchSystemLauncher(BaseLauncher):
697 """Launch an external process using a batch system.
700 """Launch an external process using a batch system.
698
701
699 This class is designed to work with UNIX batch systems like PBS, LSF,
702 This class is designed to work with UNIX batch systems like PBS, LSF,
700 GridEngine, etc. The overall model is that there are different commands
703 GridEngine, etc. The overall model is that there are different commands
701 like qsub, qdel, etc. that handle the starting and stopping of the process.
704 like qsub, qdel, etc. that handle the starting and stopping of the process.
702
705
703 This class also has the notion of a batch script. The ``batch_template``
706 This class also has the notion of a batch script. The ``batch_template``
704 attribute can be set to a string that is a template for the batch script.
707 attribute can be set to a string that is a template for the batch script.
705 This template is instantiated using Itpl. Thus the template can use
708 This template is instantiated using Itpl. Thus the template can use
706 ${n} fot the number of instances. Subclasses can add additional variables
709 ${n} fot the number of instances. Subclasses can add additional variables
707 to the template dict.
710 to the template dict.
708 """
711 """
709
712
710 # Subclasses must fill these in. See PBSEngineSet
713 # Subclasses must fill these in. See PBSEngineSet
711 # The name of the command line program used to submit jobs.
714 # The name of the command line program used to submit jobs.
712 submit_command = Str('', config=True)
715 submit_command = Str('', config=True)
713 # The name of the command line program used to delete jobs.
716 # The name of the command line program used to delete jobs.
714 delete_command = Str('', config=True)
717 delete_command = Str('', config=True)
715 # A regular expression used to get the job id from the output of the
718 # A regular expression used to get the job id from the output of the
716 # submit_command.
719 # submit_command.
717 job_id_regexp = Str('', config=True)
720 job_id_regexp = Str('', config=True)
718 # The string that is the batch script template itself.
721 # The string that is the batch script template itself.
719 batch_template = Str('', config=True)
722 batch_template = Str('', config=True)
720 # The filename of the instantiated batch script.
723 # The filename of the instantiated batch script.
721 batch_file_name = Unicode(u'batch_script', config=True)
724 batch_file_name = Unicode(u'batch_script', config=True)
722 # The full path to the instantiated batch script.
725 # The full path to the instantiated batch script.
723 batch_file = Unicode(u'')
726 batch_file = Unicode(u'')
724
727
725 def __init__(self, work_dir, parent=None, name=None, config=None):
728 def __init__(self, work_dir, parent=None, name=None, config=None):
726 super(BatchSystemLauncher, self).__init__(
729 super(BatchSystemLauncher, self).__init__(
727 work_dir, parent, name, config
730 work_dir, parent, name, config
728 )
731 )
729 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
732 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
730 self.context = {}
733 self.context = {}
731
734
732 def parse_job_id(self, output):
735 def parse_job_id(self, output):
733 """Take the output of the submit command and return the job id."""
736 """Take the output of the submit command and return the job id."""
734 m = re.match(self.job_id_regexp, output)
737 m = re.match(self.job_id_regexp, output)
735 if m is not None:
738 if m is not None:
736 job_id = m.group()
739 job_id = m.group()
737 else:
740 else:
738 raise LauncherError("Job id couldn't be determined: %s" % output)
741 raise LauncherError("Job id couldn't be determined: %s" % output)
739 self.job_id = job_id
742 self.job_id = job_id
740 log.msg('Job started with job id: %r' % job_id)
743 log.msg('Job started with job id: %r' % job_id)
741 return job_id
744 return job_id
742
745
743 def write_batch_script(self, n):
746 def write_batch_script(self, n):
744 """Instantiate and write the batch script to the work_dir."""
747 """Instantiate and write the batch script to the work_dir."""
745 self.context['n'] = n
748 self.context['n'] = n
746 script_as_string = Itpl.itplns(self.batch_template, self.context)
749 script_as_string = Itpl.itplns(self.batch_template, self.context)
747 log.msg('Writing instantiated batch script: %s' % self.batch_file)
750 log.msg('Writing instantiated batch script: %s' % self.batch_file)
748 f = open(self.batch_file, 'w')
751 f = open(self.batch_file, 'w')
749 f.write(script_as_string)
752 f.write(script_as_string)
750 f.close()
753 f.close()
751
754
752 @inlineCallbacks
755 @inlineCallbacks
753 def start(self, n):
756 def start(self, n):
754 """Start n copies of the process using a batch system."""
757 """Start n copies of the process using a batch system."""
755 self.write_batch_script(n)
758 self.write_batch_script(n)
756 output = yield getProcessOutput(self.submit_command,
759 output = yield getProcessOutput(self.submit_command,
757 [self.batch_file], env=os.environ)
760 [self.batch_file], env=os.environ)
758 job_id = self.parse_job_id(output)
761 job_id = self.parse_job_id(output)
759 self.notify_start(job_id)
762 self.notify_start(job_id)
760 defer.returnValue(job_id)
763 defer.returnValue(job_id)
761
764
762 @inlineCallbacks
765 @inlineCallbacks
763 def stop(self):
766 def stop(self):
764 output = yield getProcessOutput(self.delete_command,
767 output = yield getProcessOutput(self.delete_command,
765 [self.job_id], env=os.environ
768 [self.job_id], env=os.environ
766 )
769 )
767 self.notify_stop(output) # Pass the output of the kill cmd
770 self.notify_stop(output) # Pass the output of the kill cmd
768 defer.returnValue(output)
771 defer.returnValue(output)
769
772
770
773
771 class PBSLauncher(BatchSystemLauncher):
774 class PBSLauncher(BatchSystemLauncher):
772 """A BatchSystemLauncher subclass for PBS."""
775 """A BatchSystemLauncher subclass for PBS."""
773
776
774 submit_command = Str('qsub', config=True)
777 submit_command = Str('qsub', config=True)
775 delete_command = Str('qdel', config=True)
778 delete_command = Str('qdel', config=True)
776 job_id_regexp = Str(r'\d+', config=True)
779 job_id_regexp = Str(r'\d+', config=True)
777 batch_template = Str('', config=True)
780 batch_template = Str('', config=True)
778 batch_file_name = Unicode(u'pbs_batch_script', config=True)
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
779 batch_file = Unicode(u'')
782 batch_file = Unicode(u'')
780
783
781
784
782 class PBSControllerLauncher(PBSLauncher):
785 class PBSControllerLauncher(PBSLauncher):
783 """Launch a controller using PBS."""
786 """Launch a controller using PBS."""
784
787
785 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
786
789
787 def start(self, cluster_dir):
790 def start(self, cluster_dir):
788 """Start the controller by profile or cluster_dir."""
791 """Start the controller by profile or cluster_dir."""
789 # Here we save profile and cluster_dir in the context so they
792 # Here we save profile and cluster_dir in the context so they
790 # can be used in the batch script template as ${profile} and
793 # can be used in the batch script template as ${profile} and
791 # ${cluster_dir}
794 # ${cluster_dir}
792 self.context['cluster_dir'] = cluster_dir
795 self.context['cluster_dir'] = cluster_dir
793 self.cluster_dir = unicode(cluster_dir)
796 self.cluster_dir = unicode(cluster_dir)
794 log.msg("Starting PBSControllerLauncher: %r" % self.args)
797 log.msg("Starting PBSControllerLauncher: %r" % self.args)
795 return super(PBSControllerLauncher, self).start(1)
798 return super(PBSControllerLauncher, self).start(1)
796
799
797
800
798 class PBSEngineSetLauncher(PBSLauncher):
801 class PBSEngineSetLauncher(PBSLauncher):
799
802
800 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
801
804
802 def start(self, n, cluster_dir):
805 def start(self, n, cluster_dir):
803 """Start n engines by profile or cluster_dir."""
806 """Start n engines by profile or cluster_dir."""
804 self.program_args.extend(['--cluster-dir', cluster_dir])
807 self.program_args.extend(['--cluster-dir', cluster_dir])
805 self.cluster_dir = unicode(cluster_dir)
808 self.cluster_dir = unicode(cluster_dir)
806 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
809 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
807 return super(PBSEngineSetLauncher, self).start(n)
810 return super(PBSEngineSetLauncher, self).start(n)
808
811
809
812
810 #-----------------------------------------------------------------------------
813 #-----------------------------------------------------------------------------
811 # A launcher for ipcluster itself!
814 # A launcher for ipcluster itself!
812 #-----------------------------------------------------------------------------
815 #-----------------------------------------------------------------------------
813
816
814
817
815 class IPClusterLauncher(LocalProcessLauncher):
818 class IPClusterLauncher(LocalProcessLauncher):
816 """Launch the ipcluster program in an external process."""
819 """Launch the ipcluster program in an external process."""
817
820
818 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
819 # Command line arguments to pass to ipcluster.
822 # Command line arguments to pass to ipcluster.
820 ipcluster_args = List(
823 ipcluster_args = List(
821 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
824 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
822 ipcluster_subcommand = Str('start')
825 ipcluster_subcommand = Str('start')
823 ipcluster_n = Int(2)
826 ipcluster_n = Int(2)
824
827
825 def find_args(self):
828 def find_args(self):
826 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
829 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
827 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
830 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
828
831
829 def start(self):
832 def start(self):
830 log.msg("Starting ipcluster: %r" % self.args)
833 log.msg("Starting ipcluster: %r" % self.args)
831 return super(IPClusterLauncher, self).start()
834 return super(IPClusterLauncher, self).start()
832
835
General Comments 0
You need to be logged in to leave comments. Login now