##// END OF EJS Templates
forward subprocess IO over zmq on Windows...
MinRK -
Show More
@@ -0,0 +1,67 b''
1 #!/usr/bin/env python
2 """Utility for forwarding file read events over a zmq socket.
3
4 This is necessary because select on Windows only supports"""
5
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 import uuid
18 import zmq
19
20 from threading import Thread
21
22 #-----------------------------------------------------------------------------
23 # Code
24 #-----------------------------------------------------------------------------
25
26 class ForwarderThread(Thread):
27 def __init__(self, sock, fd):
28 Thread.__init__(self)
29 self.daemon=True
30 self.sock = sock
31 self.fd = fd
32
33 def run(self):
34 """loop through lines in self.fd, and send them over self.sock"""
35 line = self.fd.readline()
36 # allow for files opened in unicode mode
37 if isinstance(line, unicode):
38 send = self.sock.send_unicode
39 else:
40 send = self.sock.send
41 while line:
42 send(line)
43 line = self.fd.readline()
44 # line == '' means EOF
45 self.fd.close()
46 self.sock.close()
47
48 def forward_read_events(fd, context=None):
49 """forward read events from an FD over a socket.
50
51 This method wraps a file in a socket pair, so it can
52 be polled for read events by select (specifically zmq.eventloop.ioloop)
53 """
54 if context is None:
55 context = zmq.Context.instance()
56 push = context.socket(zmq.PUSH)
57 push.setsockopt(zmq.LINGER, -1)
58 pull = context.socket(zmq.PULL)
59 addr='inproc://%s'%uuid.uuid4()
60 push.bind(addr)
61 pull.connect(addr)
62 forwarder = ForwarderThread(push, fd)
63 forwarder.start()
64 return pull
65
66
67 __all__ = ['forward_read_events'] No newline at end of file
@@ -1,972 +1,985 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 from signal import SIGINT, SIGTERM
24 from signal import SIGINT, SIGTERM
25 try:
25 try:
26 from signal import SIGKILL
26 from signal import SIGKILL
27 except ImportError:
27 except ImportError:
28 SIGKILL=SIGTERM
28 SIGKILL=SIGTERM
29
29
30 from subprocess import Popen, PIPE, STDOUT
30 from subprocess import Popen, PIPE, STDOUT
31 try:
31 try:
32 from subprocess import check_output
32 from subprocess import check_output
33 except ImportError:
33 except ImportError:
34 # pre-2.7, define check_output with Popen
34 # pre-2.7, define check_output with Popen
35 def check_output(*args, **kwargs):
35 def check_output(*args, **kwargs):
36 kwargs.update(dict(stdout=PIPE))
36 kwargs.update(dict(stdout=PIPE))
37 p = Popen(*args, **kwargs)
37 p = Popen(*args, **kwargs)
38 out,err = p.communicate()
38 out,err = p.communicate()
39 return out
39 return out
40
40
41 from zmq.eventloop import ioloop
41 from zmq.eventloop import ioloop
42
42
43 from IPython.external import Itpl
43 from IPython.external import Itpl
44 # from IPython.config.configurable import Configurable
44 # from IPython.config.configurable import Configurable
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
46 from IPython.utils.path import get_ipython_module_path
46 from IPython.utils.path import get_ipython_module_path
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
48
48
49 from IPython.parallel.factory import LoggingFactory
49 from IPython.parallel.factory import LoggingFactory
50
50
51 from .win32support import forward_read_events
52
51 # load winhpcjob only on Windows
53 # load winhpcjob only on Windows
52 try:
54 try:
53 from .winhpcjob import (
55 from .winhpcjob import (
54 IPControllerTask, IPEngineTask,
56 IPControllerTask, IPEngineTask,
55 IPControllerJob, IPEngineSetJob
57 IPControllerJob, IPEngineSetJob
56 )
58 )
57 except ImportError:
59 except ImportError:
58 pass
60 pass
59
61
60
62 WINDOWS = os.name == 'nt'
61 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
62 # Paths to the kernel apps
64 # Paths to the kernel apps
63 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
64
66
65
67
66 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
67 'IPython.parallel.apps.ipclusterapp'
69 'IPython.parallel.apps.ipclusterapp'
68 ))
70 ))
69
71
70 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
71 'IPython.parallel.apps.ipengineapp'
73 'IPython.parallel.apps.ipengineapp'
72 ))
74 ))
73
75
74 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipcontrollerapp'
77 'IPython.parallel.apps.ipcontrollerapp'
76 ))
78 ))
77
79
78 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
79 # Base launchers and errors
81 # Base launchers and errors
80 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
81
83
82
84
83 class LauncherError(Exception):
85 class LauncherError(Exception):
84 pass
86 pass
85
87
86
88
87 class ProcessStateError(LauncherError):
89 class ProcessStateError(LauncherError):
88 pass
90 pass
89
91
90
92
91 class UnknownStatus(LauncherError):
93 class UnknownStatus(LauncherError):
92 pass
94 pass
93
95
94
96
95 class BaseLauncher(LoggingFactory):
97 class BaseLauncher(LoggingFactory):
96 """An asbtraction for starting, stopping and signaling a process."""
98 """An asbtraction for starting, stopping and signaling a process."""
97
99
98 # In all of the launchers, the work_dir is where child processes will be
100 # In all of the launchers, the work_dir is where child processes will be
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
100 # passed into the __init__ method will override the config value.
102 # passed into the __init__ method will override the config value.
101 # This should not be used to set the work_dir for the actual engine
103 # This should not be used to set the work_dir for the actual engine
102 # and controller. Instead, use their own config files or the
104 # and controller. Instead, use their own config files or the
103 # controller_args, engine_args attributes of the launchers to add
105 # controller_args, engine_args attributes of the launchers to add
104 # the --work-dir option.
106 # the --work-dir option.
105 work_dir = Unicode(u'.')
107 work_dir = Unicode(u'.')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
107
109
108 start_data = Any()
110 start_data = Any()
109 stop_data = Any()
111 stop_data = Any()
110
112
111 def _loop_default(self):
113 def _loop_default(self):
112 return ioloop.IOLoop.instance()
114 return ioloop.IOLoop.instance()
113
115
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
116 def __init__(self, work_dir=u'.', config=None, **kwargs):
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
117 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
116 self.state = 'before' # can be before, running, after
118 self.state = 'before' # can be before, running, after
117 self.stop_callbacks = []
119 self.stop_callbacks = []
118 self.start_data = None
120 self.start_data = None
119 self.stop_data = None
121 self.stop_data = None
120
122
121 @property
123 @property
122 def args(self):
124 def args(self):
123 """A list of cmd and args that will be used to start the process.
125 """A list of cmd and args that will be used to start the process.
124
126
125 This is what is passed to :func:`spawnProcess` and the first element
127 This is what is passed to :func:`spawnProcess` and the first element
126 will be the process name.
128 will be the process name.
127 """
129 """
128 return self.find_args()
130 return self.find_args()
129
131
130 def find_args(self):
132 def find_args(self):
131 """The ``.args`` property calls this to find the args list.
133 """The ``.args`` property calls this to find the args list.
132
134
133 Subcommand should implement this to construct the cmd and args.
135 Subcommand should implement this to construct the cmd and args.
134 """
136 """
135 raise NotImplementedError('find_args must be implemented in a subclass')
137 raise NotImplementedError('find_args must be implemented in a subclass')
136
138
137 @property
139 @property
138 def arg_str(self):
140 def arg_str(self):
139 """The string form of the program arguments."""
141 """The string form of the program arguments."""
140 return ' '.join(self.args)
142 return ' '.join(self.args)
141
143
142 @property
144 @property
143 def running(self):
145 def running(self):
144 """Am I running."""
146 """Am I running."""
145 if self.state == 'running':
147 if self.state == 'running':
146 return True
148 return True
147 else:
149 else:
148 return False
150 return False
149
151
150 def start(self):
152 def start(self):
151 """Start the process.
153 """Start the process.
152
154
153 This must return a deferred that fires with information about the
155 This must return a deferred that fires with information about the
154 process starting (like a pid, job id, etc.).
156 process starting (like a pid, job id, etc.).
155 """
157 """
156 raise NotImplementedError('start must be implemented in a subclass')
158 raise NotImplementedError('start must be implemented in a subclass')
157
159
158 def stop(self):
160 def stop(self):
159 """Stop the process and notify observers of stopping.
161 """Stop the process and notify observers of stopping.
160
162
161 This must return a deferred that fires with information about the
163 This must return a deferred that fires with information about the
162 processing stopping, like errors that occur while the process is
164 processing stopping, like errors that occur while the process is
163 attempting to be shut down. This deferred won't fire when the process
165 attempting to be shut down. This deferred won't fire when the process
164 actually stops. To observe the actual process stopping, see
166 actually stops. To observe the actual process stopping, see
165 :func:`observe_stop`.
167 :func:`observe_stop`.
166 """
168 """
167 raise NotImplementedError('stop must be implemented in a subclass')
169 raise NotImplementedError('stop must be implemented in a subclass')
168
170
169 def on_stop(self, f):
171 def on_stop(self, f):
170 """Get a deferred that will fire when the process stops.
172 """Get a deferred that will fire when the process stops.
171
173
172 The deferred will fire with data that contains information about
174 The deferred will fire with data that contains information about
173 the exit status of the process.
175 the exit status of the process.
174 """
176 """
175 if self.state=='after':
177 if self.state=='after':
176 return f(self.stop_data)
178 return f(self.stop_data)
177 else:
179 else:
178 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
179
181
180 def notify_start(self, data):
182 def notify_start(self, data):
181 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
182
184
183 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
185 """
187 """
186
188
187 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
190 self.start_data = data
189 self.state = 'running'
191 self.state = 'running'
190 return data
192 return data
191
193
192 def notify_stop(self, data):
194 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
194
196
195 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger all the deferreds from :func:`observe_stop`."""
198 this to trigger all the deferreds from :func:`observe_stop`."""
197
199
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
201 self.stop_data = data
200 self.state = 'after'
202 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
203 d(data)
205 d(data)
204 return data
206 return data
205
207
206 def signal(self, sig):
208 def signal(self, sig):
207 """Signal the process.
209 """Signal the process.
208
210
209 Return a semi-meaningless deferred after signaling the process.
211 Return a semi-meaningless deferred after signaling the process.
210
212
211 Parameters
213 Parameters
212 ----------
214 ----------
213 sig : str or int
215 sig : str or int
214 'KILL', 'INT', etc., or any signal number
216 'KILL', 'INT', etc., or any signal number
215 """
217 """
216 raise NotImplementedError('signal must be implemented in a subclass')
218 raise NotImplementedError('signal must be implemented in a subclass')
217
219
218
220
219 #-----------------------------------------------------------------------------
221 #-----------------------------------------------------------------------------
220 # Local process launchers
222 # Local process launchers
221 #-----------------------------------------------------------------------------
223 #-----------------------------------------------------------------------------
222
224
223
225
224 class LocalProcessLauncher(BaseLauncher):
226 class LocalProcessLauncher(BaseLauncher):
225 """Start and stop an external process in an asynchronous manner.
227 """Start and stop an external process in an asynchronous manner.
226
228
227 This will launch the external process with a working directory of
229 This will launch the external process with a working directory of
228 ``self.work_dir``.
230 ``self.work_dir``.
229 """
231 """
230
232
231 # This is used to to construct self.args, which is passed to
233 # This is used to to construct self.args, which is passed to
232 # spawnProcess.
234 # spawnProcess.
233 cmd_and_args = List([])
235 cmd_and_args = List([])
234 poll_frequency = Int(100) # in ms
236 poll_frequency = Int(100) # in ms
235
237
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
238 def __init__(self, work_dir=u'.', config=None, **kwargs):
237 super(LocalProcessLauncher, self).__init__(
239 super(LocalProcessLauncher, self).__init__(
238 work_dir=work_dir, config=config, **kwargs
240 work_dir=work_dir, config=config, **kwargs
239 )
241 )
240 self.process = None
242 self.process = None
241 self.start_deferred = None
243 self.start_deferred = None
242 self.poller = None
244 self.poller = None
243
245
244 def find_args(self):
246 def find_args(self):
245 return self.cmd_and_args
247 return self.cmd_and_args
246
248
247 def start(self):
249 def start(self):
248 if self.state == 'before':
250 if self.state == 'before':
249 self.process = Popen(self.args,
251 self.process = Popen(self.args,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
252 stdout=PIPE,stderr=PIPE,stdin=PIPE,
251 env=os.environ,
253 env=os.environ,
252 cwd=self.work_dir
254 cwd=self.work_dir
253 )
255 )
254
256 if WINDOWS:
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
257 self.stdout = forward_read_events(self.process.stdout)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
258 self.stderr = forward_read_events(self.process.stderr)
259 else:
260 self.stdout = self.process.stdout.fileno()
261 self.stderr = self.process.stderr.fileno()
262 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
263 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
264 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 self.poller.start()
265 self.poller.start()
259 self.notify_start(self.process.pid)
266 self.notify_start(self.process.pid)
260 else:
267 else:
261 s = 'The process was already started and has state: %r' % self.state
268 s = 'The process was already started and has state: %r' % self.state
262 raise ProcessStateError(s)
269 raise ProcessStateError(s)
263
270
264 def stop(self):
271 def stop(self):
265 return self.interrupt_then_kill()
272 return self.interrupt_then_kill()
266
273
267 def signal(self, sig):
274 def signal(self, sig):
268 if self.state == 'running':
275 if self.state == 'running':
269 self.process.send_signal(sig)
276 self.process.send_signal(sig)
270
277
271 def interrupt_then_kill(self, delay=2.0):
278 def interrupt_then_kill(self, delay=2.0):
272 """Send INT, wait a delay and then send KILL."""
279 """Send INT, wait a delay and then send KILL."""
273 self.signal(SIGINT)
280 self.signal(SIGINT)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
281 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
275 self.killer.start()
282 self.killer.start()
276
283
277 # callbacks, etc:
284 # callbacks, etc:
278
285
279 def handle_stdout(self, fd, events):
286 def handle_stdout(self, fd, events):
280 line = self.process.stdout.readline()
287 if WINDOWS:
288 line = self.stdout.recv()
289 else:
290 line = self.process.stdout.readline()
281 # a stopped process will be readable but return empty strings
291 # a stopped process will be readable but return empty strings
282 if line:
292 if line:
283 self.log.info(line[:-1])
293 self.log.info(line[:-1])
284 else:
294 else:
285 self.poll()
295 self.poll()
286
296
287 def handle_stderr(self, fd, events):
297 def handle_stderr(self, fd, events):
288 line = self.process.stderr.readline()
298 if WINDOWS:
299 line = self.stderr.recv()
300 else:
301 line = self.process.stderr.readline()
289 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
290 if line:
303 if line:
291 self.log.error(line[:-1])
304 self.log.error(line[:-1])
292 else:
305 else:
293 self.poll()
306 self.poll()
294
307
295 def poll(self):
308 def poll(self):
296 status = self.process.poll()
309 status = self.process.poll()
297 if status is not None:
310 if status is not None:
298 self.poller.stop()
311 self.poller.stop()
299 self.loop.remove_handler(self.process.stdout.fileno())
312 self.loop.remove_handler(self.stdout)
300 self.loop.remove_handler(self.process.stderr.fileno())
313 self.loop.remove_handler(self.stderr)
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
314 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 return status
315 return status
303
316
304 class LocalControllerLauncher(LocalProcessLauncher):
317 class LocalControllerLauncher(LocalProcessLauncher):
305 """Launch a controller as a regular external process."""
318 """Launch a controller as a regular external process."""
306
319
307 controller_cmd = List(ipcontroller_cmd_argv, config=True)
320 controller_cmd = List(ipcontroller_cmd_argv, config=True)
308 # Command line arguments to ipcontroller.
321 # Command line arguments to ipcontroller.
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
322 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
310
323
311 def find_args(self):
324 def find_args(self):
312 return self.controller_cmd + self.controller_args
325 return self.controller_cmd + self.controller_args
313
326
314 def start(self, cluster_dir):
327 def start(self, cluster_dir):
315 """Start the controller by cluster_dir."""
328 """Start the controller by cluster_dir."""
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
329 self.controller_args.extend(['--cluster-dir', cluster_dir])
317 self.cluster_dir = unicode(cluster_dir)
330 self.cluster_dir = unicode(cluster_dir)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
331 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
319 return super(LocalControllerLauncher, self).start()
332 return super(LocalControllerLauncher, self).start()
320
333
321
334
322 class LocalEngineLauncher(LocalProcessLauncher):
335 class LocalEngineLauncher(LocalProcessLauncher):
323 """Launch a single engine as a regular externall process."""
336 """Launch a single engine as a regular externall process."""
324
337
325 engine_cmd = List(ipengine_cmd_argv, config=True)
338 engine_cmd = List(ipengine_cmd_argv, config=True)
326 # Command line arguments for ipengine.
339 # Command line arguments for ipengine.
327 engine_args = List(
340 engine_args = List(
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
341 ['--log-to-file','--log-level', str(logging.INFO)], config=True
329 )
342 )
330
343
331 def find_args(self):
344 def find_args(self):
332 return self.engine_cmd + self.engine_args
345 return self.engine_cmd + self.engine_args
333
346
334 def start(self, cluster_dir):
347 def start(self, cluster_dir):
335 """Start the engine by cluster_dir."""
348 """Start the engine by cluster_dir."""
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
349 self.engine_args.extend(['--cluster-dir', cluster_dir])
337 self.cluster_dir = unicode(cluster_dir)
350 self.cluster_dir = unicode(cluster_dir)
338 return super(LocalEngineLauncher, self).start()
351 return super(LocalEngineLauncher, self).start()
339
352
340
353
341 class LocalEngineSetLauncher(BaseLauncher):
354 class LocalEngineSetLauncher(BaseLauncher):
342 """Launch a set of engines as regular external processes."""
355 """Launch a set of engines as regular external processes."""
343
356
344 # Command line arguments for ipengine.
357 # Command line arguments for ipengine.
345 engine_args = List(
358 engine_args = List(
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
359 ['--log-to-file','--log-level', str(logging.INFO)], config=True
347 )
360 )
348 # launcher class
361 # launcher class
349 launcher_class = LocalEngineLauncher
362 launcher_class = LocalEngineLauncher
350
363
351 launchers = Dict()
364 launchers = Dict()
352 stop_data = Dict()
365 stop_data = Dict()
353
366
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
367 def __init__(self, work_dir=u'.', config=None, **kwargs):
355 super(LocalEngineSetLauncher, self).__init__(
368 super(LocalEngineSetLauncher, self).__init__(
356 work_dir=work_dir, config=config, **kwargs
369 work_dir=work_dir, config=config, **kwargs
357 )
370 )
358 self.stop_data = {}
371 self.stop_data = {}
359
372
360 def start(self, n, cluster_dir):
373 def start(self, n, cluster_dir):
361 """Start n engines by profile or cluster_dir."""
374 """Start n engines by profile or cluster_dir."""
362 self.cluster_dir = unicode(cluster_dir)
375 self.cluster_dir = unicode(cluster_dir)
363 dlist = []
376 dlist = []
364 for i in range(n):
377 for i in range(n):
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
378 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
366 # Copy the engine args over to each engine launcher.
379 # Copy the engine args over to each engine launcher.
367 el.engine_args = copy.deepcopy(self.engine_args)
380 el.engine_args = copy.deepcopy(self.engine_args)
368 el.on_stop(self._notice_engine_stopped)
381 el.on_stop(self._notice_engine_stopped)
369 d = el.start(cluster_dir)
382 d = el.start(cluster_dir)
370 if i==0:
383 if i==0:
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
384 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
372 self.launchers[i] = el
385 self.launchers[i] = el
373 dlist.append(d)
386 dlist.append(d)
374 self.notify_start(dlist)
387 self.notify_start(dlist)
375 # The consumeErrors here could be dangerous
388 # The consumeErrors here could be dangerous
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 # dfinal = gatherBoth(dlist, consumeErrors=True)
377 # dfinal.addCallback(self.notify_start)
390 # dfinal.addCallback(self.notify_start)
378 return dlist
391 return dlist
379
392
380 def find_args(self):
393 def find_args(self):
381 return ['engine set']
394 return ['engine set']
382
395
383 def signal(self, sig):
396 def signal(self, sig):
384 dlist = []
397 dlist = []
385 for el in self.launchers.itervalues():
398 for el in self.launchers.itervalues():
386 d = el.signal(sig)
399 d = el.signal(sig)
387 dlist.append(d)
400 dlist.append(d)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
401 # dfinal = gatherBoth(dlist, consumeErrors=True)
389 return dlist
402 return dlist
390
403
391 def interrupt_then_kill(self, delay=1.0):
404 def interrupt_then_kill(self, delay=1.0):
392 dlist = []
405 dlist = []
393 for el in self.launchers.itervalues():
406 for el in self.launchers.itervalues():
394 d = el.interrupt_then_kill(delay)
407 d = el.interrupt_then_kill(delay)
395 dlist.append(d)
408 dlist.append(d)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
409 # dfinal = gatherBoth(dlist, consumeErrors=True)
397 return dlist
410 return dlist
398
411
399 def stop(self):
412 def stop(self):
400 return self.interrupt_then_kill()
413 return self.interrupt_then_kill()
401
414
402 def _notice_engine_stopped(self, data):
415 def _notice_engine_stopped(self, data):
403 pid = data['pid']
416 pid = data['pid']
404 for idx,el in self.launchers.iteritems():
417 for idx,el in self.launchers.iteritems():
405 if el.process.pid == pid:
418 if el.process.pid == pid:
406 break
419 break
407 self.launchers.pop(idx)
420 self.launchers.pop(idx)
408 self.stop_data[idx] = data
421 self.stop_data[idx] = data
409 if not self.launchers:
422 if not self.launchers:
410 self.notify_stop(self.stop_data)
423 self.notify_stop(self.stop_data)
411
424
412
425
413 #-----------------------------------------------------------------------------
426 #-----------------------------------------------------------------------------
414 # MPIExec launchers
427 # MPIExec launchers
415 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
416
429
417
430
418 class MPIExecLauncher(LocalProcessLauncher):
431 class MPIExecLauncher(LocalProcessLauncher):
419 """Launch an external process using mpiexec."""
432 """Launch an external process using mpiexec."""
420
433
421 # The mpiexec command to use in starting the process.
434 # The mpiexec command to use in starting the process.
422 mpi_cmd = List(['mpiexec'], config=True)
435 mpi_cmd = List(['mpiexec'], config=True)
423 # The command line arguments to pass to mpiexec.
436 # The command line arguments to pass to mpiexec.
424 mpi_args = List([], config=True)
437 mpi_args = List([], config=True)
425 # The program to start using mpiexec.
438 # The program to start using mpiexec.
426 program = List(['date'], config=True)
439 program = List(['date'], config=True)
427 # The command line argument to the program.
440 # The command line argument to the program.
428 program_args = List([], config=True)
441 program_args = List([], config=True)
429 # The number of instances of the program to start.
442 # The number of instances of the program to start.
430 n = Int(1, config=True)
443 n = Int(1, config=True)
431
444
432 def find_args(self):
445 def find_args(self):
433 """Build self.args using all the fields."""
446 """Build self.args using all the fields."""
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
447 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
435 self.program + self.program_args
448 self.program + self.program_args
436
449
437 def start(self, n):
450 def start(self, n):
438 """Start n instances of the program using mpiexec."""
451 """Start n instances of the program using mpiexec."""
439 self.n = n
452 self.n = n
440 return super(MPIExecLauncher, self).start()
453 return super(MPIExecLauncher, self).start()
441
454
442
455
443 class MPIExecControllerLauncher(MPIExecLauncher):
456 class MPIExecControllerLauncher(MPIExecLauncher):
444 """Launch a controller using mpiexec."""
457 """Launch a controller using mpiexec."""
445
458
446 controller_cmd = List(ipcontroller_cmd_argv, config=True)
459 controller_cmd = List(ipcontroller_cmd_argv, config=True)
447 # Command line arguments to ipcontroller.
460 # Command line arguments to ipcontroller.
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
461 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
449 n = Int(1, config=False)
462 n = Int(1, config=False)
450
463
451 def start(self, cluster_dir):
464 def start(self, cluster_dir):
452 """Start the controller by cluster_dir."""
465 """Start the controller by cluster_dir."""
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
466 self.controller_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
467 self.cluster_dir = unicode(cluster_dir)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
468 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
456 return super(MPIExecControllerLauncher, self).start(1)
469 return super(MPIExecControllerLauncher, self).start(1)
457
470
458 def find_args(self):
471 def find_args(self):
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
472 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
460 self.controller_cmd + self.controller_args
473 self.controller_cmd + self.controller_args
461
474
462
475
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
476 class MPIExecEngineSetLauncher(MPIExecLauncher):
464
477
465 program = List(ipengine_cmd_argv, config=True)
478 program = List(ipengine_cmd_argv, config=True)
466 # Command line arguments for ipengine.
479 # Command line arguments for ipengine.
467 program_args = List(
480 program_args = List(
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
481 ['--log-to-file','--log-level', str(logging.INFO)], config=True
469 )
482 )
470 n = Int(1, config=True)
483 n = Int(1, config=True)
471
484
472 def start(self, n, cluster_dir):
485 def start(self, n, cluster_dir):
473 """Start n engines by profile or cluster_dir."""
486 """Start n engines by profile or cluster_dir."""
474 self.program_args.extend(['--cluster-dir', cluster_dir])
487 self.program_args.extend(['--cluster-dir', cluster_dir])
475 self.cluster_dir = unicode(cluster_dir)
488 self.cluster_dir = unicode(cluster_dir)
476 self.n = n
489 self.n = n
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
490 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
478 return super(MPIExecEngineSetLauncher, self).start(n)
491 return super(MPIExecEngineSetLauncher, self).start(n)
479
492
480 #-----------------------------------------------------------------------------
493 #-----------------------------------------------------------------------------
481 # SSH launchers
494 # SSH launchers
482 #-----------------------------------------------------------------------------
495 #-----------------------------------------------------------------------------
483
496
484 # TODO: Get SSH Launcher working again.
497 # TODO: Get SSH Launcher working again.
485
498
486 class SSHLauncher(LocalProcessLauncher):
499 class SSHLauncher(LocalProcessLauncher):
487 """A minimal launcher for ssh.
500 """A minimal launcher for ssh.
488
501
489 To be useful this will probably have to be extended to use the ``sshx``
502 To be useful this will probably have to be extended to use the ``sshx``
490 idea for environment variables. There could be other things this needs
503 idea for environment variables. There could be other things this needs
491 as well.
504 as well.
492 """
505 """
493
506
494 ssh_cmd = List(['ssh'], config=True)
507 ssh_cmd = List(['ssh'], config=True)
495 ssh_args = List(['-tt'], config=True)
508 ssh_args = List(['-tt'], config=True)
496 program = List(['date'], config=True)
509 program = List(['date'], config=True)
497 program_args = List([], config=True)
510 program_args = List([], config=True)
498 hostname = CUnicode('', config=True)
511 hostname = CUnicode('', config=True)
499 user = CUnicode('', config=True)
512 user = CUnicode('', config=True)
500 location = CUnicode('')
513 location = CUnicode('')
501
514
502 def _hostname_changed(self, name, old, new):
515 def _hostname_changed(self, name, old, new):
503 if self.user:
516 if self.user:
504 self.location = u'%s@%s' % (self.user, new)
517 self.location = u'%s@%s' % (self.user, new)
505 else:
518 else:
506 self.location = new
519 self.location = new
507
520
508 def _user_changed(self, name, old, new):
521 def _user_changed(self, name, old, new):
509 self.location = u'%s@%s' % (new, self.hostname)
522 self.location = u'%s@%s' % (new, self.hostname)
510
523
511 def find_args(self):
524 def find_args(self):
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
525 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 self.program + self.program_args
526 self.program + self.program_args
514
527
515 def start(self, cluster_dir, hostname=None, user=None):
528 def start(self, cluster_dir, hostname=None, user=None):
516 self.cluster_dir = unicode(cluster_dir)
529 self.cluster_dir = unicode(cluster_dir)
517 if hostname is not None:
530 if hostname is not None:
518 self.hostname = hostname
531 self.hostname = hostname
519 if user is not None:
532 if user is not None:
520 self.user = user
533 self.user = user
521
534
522 return super(SSHLauncher, self).start()
535 return super(SSHLauncher, self).start()
523
536
524 def signal(self, sig):
537 def signal(self, sig):
525 if self.state == 'running':
538 if self.state == 'running':
526 # send escaped ssh connection-closer
539 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
540 self.process.stdin.write('~.')
528 self.process.stdin.flush()
541 self.process.stdin.flush()
529
542
530
543
531
544
532 class SSHControllerLauncher(SSHLauncher):
545 class SSHControllerLauncher(SSHLauncher):
533
546
534 program = List(ipcontroller_cmd_argv, config=True)
547 program = List(ipcontroller_cmd_argv, config=True)
535 # Command line arguments to ipcontroller.
548 # Command line arguments to ipcontroller.
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
549 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
537
550
538
551
539 class SSHEngineLauncher(SSHLauncher):
552 class SSHEngineLauncher(SSHLauncher):
540 program = List(ipengine_cmd_argv, config=True)
553 program = List(ipengine_cmd_argv, config=True)
541 # Command line arguments for ipengine.
554 # Command line arguments for ipengine.
542 program_args = List(
555 program_args = List(
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
556 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 )
557 )
545
558
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
559 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 launcher_class = SSHEngineLauncher
560 launcher_class = SSHEngineLauncher
548 engines = Dict(config=True)
561 engines = Dict(config=True)
549
562
550 def start(self, n, cluster_dir):
563 def start(self, n, cluster_dir):
551 """Start engines by profile or cluster_dir.
564 """Start engines by profile or cluster_dir.
552 `n` is ignored, and the `engines` config property is used instead.
565 `n` is ignored, and the `engines` config property is used instead.
553 """
566 """
554
567
555 self.cluster_dir = unicode(cluster_dir)
568 self.cluster_dir = unicode(cluster_dir)
556 dlist = []
569 dlist = []
557 for host, n in self.engines.iteritems():
570 for host, n in self.engines.iteritems():
558 if isinstance(n, (tuple, list)):
571 if isinstance(n, (tuple, list)):
559 n, args = n
572 n, args = n
560 else:
573 else:
561 args = copy.deepcopy(self.engine_args)
574 args = copy.deepcopy(self.engine_args)
562
575
563 if '@' in host:
576 if '@' in host:
564 user,host = host.split('@',1)
577 user,host = host.split('@',1)
565 else:
578 else:
566 user=None
579 user=None
567 for i in range(n):
580 for i in range(n):
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
581 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
569
582
570 # Copy the engine args over to each engine launcher.
583 # Copy the engine args over to each engine launcher.
571 i
584 i
572 el.program_args = args
585 el.program_args = args
573 el.on_stop(self._notice_engine_stopped)
586 el.on_stop(self._notice_engine_stopped)
574 d = el.start(cluster_dir, user=user, hostname=host)
587 d = el.start(cluster_dir, user=user, hostname=host)
575 if i==0:
588 if i==0:
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
589 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
577 self.launchers[host+str(i)] = el
590 self.launchers[host+str(i)] = el
578 dlist.append(d)
591 dlist.append(d)
579 self.notify_start(dlist)
592 self.notify_start(dlist)
580 return dlist
593 return dlist
581
594
582
595
583
596
584 #-----------------------------------------------------------------------------
597 #-----------------------------------------------------------------------------
585 # Windows HPC Server 2008 scheduler launchers
598 # Windows HPC Server 2008 scheduler launchers
586 #-----------------------------------------------------------------------------
599 #-----------------------------------------------------------------------------
587
600
588
601
589 # This is only used on Windows.
602 # This is only used on Windows.
590 def find_job_cmd():
603 def find_job_cmd():
591 if os.name=='nt':
604 if WINDOWS:
592 try:
605 try:
593 return find_cmd('job')
606 return find_cmd('job')
594 except (FindCmdError, ImportError):
607 except (FindCmdError, ImportError):
595 # ImportError will be raised if win32api is not installed
608 # ImportError will be raised if win32api is not installed
596 return 'job'
609 return 'job'
597 else:
610 else:
598 return 'job'
611 return 'job'
599
612
600
613
601 class WindowsHPCLauncher(BaseLauncher):
614 class WindowsHPCLauncher(BaseLauncher):
602
615
603 # A regular expression used to get the job id from the output of the
616 # A regular expression used to get the job id from the output of the
604 # submit_command.
617 # submit_command.
605 job_id_regexp = Str(r'\d+', config=True)
618 job_id_regexp = Str(r'\d+', config=True)
606 # The filename of the instantiated job script.
619 # The filename of the instantiated job script.
607 job_file_name = CUnicode(u'ipython_job.xml', config=True)
620 job_file_name = CUnicode(u'ipython_job.xml', config=True)
608 # The full path to the instantiated job script. This gets made dynamically
621 # The full path to the instantiated job script. This gets made dynamically
609 # by combining the work_dir with the job_file_name.
622 # by combining the work_dir with the job_file_name.
610 job_file = CUnicode(u'')
623 job_file = CUnicode(u'')
611 # The hostname of the scheduler to submit the job to
624 # The hostname of the scheduler to submit the job to
612 scheduler = CUnicode('', config=True)
625 scheduler = CUnicode('', config=True)
613 job_cmd = CUnicode(find_job_cmd(), config=True)
626 job_cmd = CUnicode(find_job_cmd(), config=True)
614
627
615 def __init__(self, work_dir=u'.', config=None, **kwargs):
628 def __init__(self, work_dir=u'.', config=None, **kwargs):
616 super(WindowsHPCLauncher, self).__init__(
629 super(WindowsHPCLauncher, self).__init__(
617 work_dir=work_dir, config=config, **kwargs
630 work_dir=work_dir, config=config, **kwargs
618 )
631 )
619
632
620 @property
633 @property
621 def job_file(self):
634 def job_file(self):
622 return os.path.join(self.work_dir, self.job_file_name)
635 return os.path.join(self.work_dir, self.job_file_name)
623
636
624 def write_job_file(self, n):
637 def write_job_file(self, n):
625 raise NotImplementedError("Implement write_job_file in a subclass.")
638 raise NotImplementedError("Implement write_job_file in a subclass.")
626
639
627 def find_args(self):
640 def find_args(self):
628 return [u'job.exe']
641 return [u'job.exe']
629
642
630 def parse_job_id(self, output):
643 def parse_job_id(self, output):
631 """Take the output of the submit command and return the job id."""
644 """Take the output of the submit command and return the job id."""
632 m = re.search(self.job_id_regexp, output)
645 m = re.search(self.job_id_regexp, output)
633 if m is not None:
646 if m is not None:
634 job_id = m.group()
647 job_id = m.group()
635 else:
648 else:
636 raise LauncherError("Job id couldn't be determined: %s" % output)
649 raise LauncherError("Job id couldn't be determined: %s" % output)
637 self.job_id = job_id
650 self.job_id = job_id
638 self.log.info('Job started with job id: %r' % job_id)
651 self.log.info('Job started with job id: %r' % job_id)
639 return job_id
652 return job_id
640
653
641 def start(self, n):
654 def start(self, n):
642 """Start n copies of the process using the Win HPC job scheduler."""
655 """Start n copies of the process using the Win HPC job scheduler."""
643 self.write_job_file(n)
656 self.write_job_file(n)
644 args = [
657 args = [
645 'submit',
658 'submit',
646 '/jobfile:%s' % self.job_file,
659 '/jobfile:%s' % self.job_file,
647 '/scheduler:%s' % self.scheduler
660 '/scheduler:%s' % self.scheduler
648 ]
661 ]
649 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
662 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
650 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
663 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
651 output = check_output([self.job_cmd]+args,
664 output = check_output([self.job_cmd]+args,
652 env=os.environ,
665 env=os.environ,
653 cwd=self.work_dir,
666 cwd=self.work_dir,
654 stderr=STDOUT
667 stderr=STDOUT
655 )
668 )
656 job_id = self.parse_job_id(output)
669 job_id = self.parse_job_id(output)
657 self.notify_start(job_id)
670 self.notify_start(job_id)
658 return job_id
671 return job_id
659
672
660 def stop(self):
673 def stop(self):
661 args = [
674 args = [
662 'cancel',
675 'cancel',
663 self.job_id,
676 self.job_id,
664 '/scheduler:%s' % self.scheduler
677 '/scheduler:%s' % self.scheduler
665 ]
678 ]
666 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
679 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
667 try:
680 try:
668 output = check_output([self.job_cmd]+args,
681 output = check_output([self.job_cmd]+args,
669 env=os.environ,
682 env=os.environ,
670 cwd=self.work_dir,
683 cwd=self.work_dir,
671 stderr=STDOUT
684 stderr=STDOUT
672 )
685 )
673 except:
686 except:
674 output = 'The job already appears to be stoppped: %r' % self.job_id
687 output = 'The job already appears to be stoppped: %r' % self.job_id
675 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
688 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
676 return output
689 return output
677
690
678
691
679 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
692 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
680
693
681 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
694 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
682 extra_args = List([], config=False)
695 extra_args = List([], config=False)
683
696
684 def write_job_file(self, n):
697 def write_job_file(self, n):
685 job = IPControllerJob(config=self.config)
698 job = IPControllerJob(config=self.config)
686
699
687 t = IPControllerTask(config=self.config)
700 t = IPControllerTask(config=self.config)
688 # The tasks work directory is *not* the actual work directory of
701 # The tasks work directory is *not* the actual work directory of
689 # the controller. It is used as the base path for the stdout/stderr
702 # the controller. It is used as the base path for the stdout/stderr
690 # files that the scheduler redirects to.
703 # files that the scheduler redirects to.
691 t.work_directory = self.cluster_dir
704 t.work_directory = self.cluster_dir
692 # Add the --cluster-dir and from self.start().
705 # Add the --cluster-dir and from self.start().
693 t.controller_args.extend(self.extra_args)
706 t.controller_args.extend(self.extra_args)
694 job.add_task(t)
707 job.add_task(t)
695
708
696 self.log.info("Writing job description file: %s" % self.job_file)
709 self.log.info("Writing job description file: %s" % self.job_file)
697 job.write(self.job_file)
710 job.write(self.job_file)
698
711
699 @property
712 @property
700 def job_file(self):
713 def job_file(self):
701 return os.path.join(self.cluster_dir, self.job_file_name)
714 return os.path.join(self.cluster_dir, self.job_file_name)
702
715
703 def start(self, cluster_dir):
716 def start(self, cluster_dir):
704 """Start the controller by cluster_dir."""
717 """Start the controller by cluster_dir."""
705 self.extra_args = ['--cluster-dir', cluster_dir]
718 self.extra_args = ['--cluster-dir', cluster_dir]
706 self.cluster_dir = unicode(cluster_dir)
719 self.cluster_dir = unicode(cluster_dir)
707 return super(WindowsHPCControllerLauncher, self).start(1)
720 return super(WindowsHPCControllerLauncher, self).start(1)
708
721
709
722
710 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
723 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
711
724
712 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
725 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
713 extra_args = List([], config=False)
726 extra_args = List([], config=False)
714
727
715 def write_job_file(self, n):
728 def write_job_file(self, n):
716 job = IPEngineSetJob(config=self.config)
729 job = IPEngineSetJob(config=self.config)
717
730
718 for i in range(n):
731 for i in range(n):
719 t = IPEngineTask(config=self.config)
732 t = IPEngineTask(config=self.config)
720 # The tasks work directory is *not* the actual work directory of
733 # The tasks work directory is *not* the actual work directory of
721 # the engine. It is used as the base path for the stdout/stderr
734 # the engine. It is used as the base path for the stdout/stderr
722 # files that the scheduler redirects to.
735 # files that the scheduler redirects to.
723 t.work_directory = self.cluster_dir
736 t.work_directory = self.cluster_dir
724 # Add the --cluster-dir and from self.start().
737 # Add the --cluster-dir and from self.start().
725 t.engine_args.extend(self.extra_args)
738 t.engine_args.extend(self.extra_args)
726 job.add_task(t)
739 job.add_task(t)
727
740
728 self.log.info("Writing job description file: %s" % self.job_file)
741 self.log.info("Writing job description file: %s" % self.job_file)
729 job.write(self.job_file)
742 job.write(self.job_file)
730
743
731 @property
744 @property
732 def job_file(self):
745 def job_file(self):
733 return os.path.join(self.cluster_dir, self.job_file_name)
746 return os.path.join(self.cluster_dir, self.job_file_name)
734
747
735 def start(self, n, cluster_dir):
748 def start(self, n, cluster_dir):
736 """Start the controller by cluster_dir."""
749 """Start the controller by cluster_dir."""
737 self.extra_args = ['--cluster-dir', cluster_dir]
750 self.extra_args = ['--cluster-dir', cluster_dir]
738 self.cluster_dir = unicode(cluster_dir)
751 self.cluster_dir = unicode(cluster_dir)
739 return super(WindowsHPCEngineSetLauncher, self).start(n)
752 return super(WindowsHPCEngineSetLauncher, self).start(n)
740
753
741
754
742 #-----------------------------------------------------------------------------
755 #-----------------------------------------------------------------------------
743 # Batch (PBS) system launchers
756 # Batch (PBS) system launchers
744 #-----------------------------------------------------------------------------
757 #-----------------------------------------------------------------------------
745
758
746 class BatchSystemLauncher(BaseLauncher):
759 class BatchSystemLauncher(BaseLauncher):
747 """Launch an external process using a batch system.
760 """Launch an external process using a batch system.
748
761
749 This class is designed to work with UNIX batch systems like PBS, LSF,
762 This class is designed to work with UNIX batch systems like PBS, LSF,
750 GridEngine, etc. The overall model is that there are different commands
763 GridEngine, etc. The overall model is that there are different commands
751 like qsub, qdel, etc. that handle the starting and stopping of the process.
764 like qsub, qdel, etc. that handle the starting and stopping of the process.
752
765
753 This class also has the notion of a batch script. The ``batch_template``
766 This class also has the notion of a batch script. The ``batch_template``
754 attribute can be set to a string that is a template for the batch script.
767 attribute can be set to a string that is a template for the batch script.
755 This template is instantiated using Itpl. Thus the template can use
768 This template is instantiated using Itpl. Thus the template can use
756 ${n} fot the number of instances. Subclasses can add additional variables
769 ${n} fot the number of instances. Subclasses can add additional variables
757 to the template dict.
770 to the template dict.
758 """
771 """
759
772
760 # Subclasses must fill these in. See PBSEngineSet
773 # Subclasses must fill these in. See PBSEngineSet
761 # The name of the command line program used to submit jobs.
774 # The name of the command line program used to submit jobs.
762 submit_command = List([''], config=True)
775 submit_command = List([''], config=True)
763 # The name of the command line program used to delete jobs.
776 # The name of the command line program used to delete jobs.
764 delete_command = List([''], config=True)
777 delete_command = List([''], config=True)
765 # A regular expression used to get the job id from the output of the
778 # A regular expression used to get the job id from the output of the
766 # submit_command.
779 # submit_command.
767 job_id_regexp = CUnicode('', config=True)
780 job_id_regexp = CUnicode('', config=True)
768 # The string that is the batch script template itself.
781 # The string that is the batch script template itself.
769 batch_template = CUnicode('', config=True)
782 batch_template = CUnicode('', config=True)
770 # The file that contains the batch template
783 # The file that contains the batch template
771 batch_template_file = CUnicode(u'', config=True)
784 batch_template_file = CUnicode(u'', config=True)
772 # The filename of the instantiated batch script.
785 # The filename of the instantiated batch script.
773 batch_file_name = CUnicode(u'batch_script', config=True)
786 batch_file_name = CUnicode(u'batch_script', config=True)
774 # The PBS Queue
787 # The PBS Queue
775 queue = CUnicode(u'', config=True)
788 queue = CUnicode(u'', config=True)
776
789
777 # not configurable, override in subclasses
790 # not configurable, override in subclasses
778 # PBS Job Array regex
791 # PBS Job Array regex
779 job_array_regexp = CUnicode('')
792 job_array_regexp = CUnicode('')
780 job_array_template = CUnicode('')
793 job_array_template = CUnicode('')
781 # PBS Queue regex
794 # PBS Queue regex
782 queue_regexp = CUnicode('')
795 queue_regexp = CUnicode('')
783 queue_template = CUnicode('')
796 queue_template = CUnicode('')
784 # The default batch template, override in subclasses
797 # The default batch template, override in subclasses
785 default_template = CUnicode('')
798 default_template = CUnicode('')
786 # The full path to the instantiated batch script.
799 # The full path to the instantiated batch script.
787 batch_file = CUnicode(u'')
800 batch_file = CUnicode(u'')
788 # the format dict used with batch_template:
801 # the format dict used with batch_template:
789 context = Dict()
802 context = Dict()
790
803
791
804
792 def find_args(self):
805 def find_args(self):
793 return self.submit_command + [self.batch_file]
806 return self.submit_command + [self.batch_file]
794
807
795 def __init__(self, work_dir=u'.', config=None, **kwargs):
808 def __init__(self, work_dir=u'.', config=None, **kwargs):
796 super(BatchSystemLauncher, self).__init__(
809 super(BatchSystemLauncher, self).__init__(
797 work_dir=work_dir, config=config, **kwargs
810 work_dir=work_dir, config=config, **kwargs
798 )
811 )
799 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
812 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
800
813
801 def parse_job_id(self, output):
814 def parse_job_id(self, output):
802 """Take the output of the submit command and return the job id."""
815 """Take the output of the submit command and return the job id."""
803 m = re.search(self.job_id_regexp, output)
816 m = re.search(self.job_id_regexp, output)
804 if m is not None:
817 if m is not None:
805 job_id = m.group()
818 job_id = m.group()
806 else:
819 else:
807 raise LauncherError("Job id couldn't be determined: %s" % output)
820 raise LauncherError("Job id couldn't be determined: %s" % output)
808 self.job_id = job_id
821 self.job_id = job_id
809 self.log.info('Job submitted with job id: %r' % job_id)
822 self.log.info('Job submitted with job id: %r' % job_id)
810 return job_id
823 return job_id
811
824
812 def write_batch_script(self, n):
825 def write_batch_script(self, n):
813 """Instantiate and write the batch script to the work_dir."""
826 """Instantiate and write the batch script to the work_dir."""
814 self.context['n'] = n
827 self.context['n'] = n
815 self.context['queue'] = self.queue
828 self.context['queue'] = self.queue
816 print self.context
829 print self.context
817 # first priority is batch_template if set
830 # first priority is batch_template if set
818 if self.batch_template_file and not self.batch_template:
831 if self.batch_template_file and not self.batch_template:
819 # second priority is batch_template_file
832 # second priority is batch_template_file
820 with open(self.batch_template_file) as f:
833 with open(self.batch_template_file) as f:
821 self.batch_template = f.read()
834 self.batch_template = f.read()
822 if not self.batch_template:
835 if not self.batch_template:
823 # third (last) priority is default_template
836 # third (last) priority is default_template
824 self.batch_template = self.default_template
837 self.batch_template = self.default_template
825
838
826 regex = re.compile(self.job_array_regexp)
839 regex = re.compile(self.job_array_regexp)
827 # print regex.search(self.batch_template)
840 # print regex.search(self.batch_template)
828 if not regex.search(self.batch_template):
841 if not regex.search(self.batch_template):
829 self.log.info("adding job array settings to batch script")
842 self.log.info("adding job array settings to batch script")
830 firstline, rest = self.batch_template.split('\n',1)
843 firstline, rest = self.batch_template.split('\n',1)
831 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
844 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
832
845
833 regex = re.compile(self.queue_regexp)
846 regex = re.compile(self.queue_regexp)
834 # print regex.search(self.batch_template)
847 # print regex.search(self.batch_template)
835 if self.queue and not regex.search(self.batch_template):
848 if self.queue and not regex.search(self.batch_template):
836 self.log.info("adding PBS queue settings to batch script")
849 self.log.info("adding PBS queue settings to batch script")
837 firstline, rest = self.batch_template.split('\n',1)
850 firstline, rest = self.batch_template.split('\n',1)
838 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
851 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
839
852
840 script_as_string = Itpl.itplns(self.batch_template, self.context)
853 script_as_string = Itpl.itplns(self.batch_template, self.context)
841 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
854 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
842
855
843 with open(self.batch_file, 'w') as f:
856 with open(self.batch_file, 'w') as f:
844 f.write(script_as_string)
857 f.write(script_as_string)
845 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
858 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
846
859
847 def start(self, n, cluster_dir):
860 def start(self, n, cluster_dir):
848 """Start n copies of the process using a batch system."""
861 """Start n copies of the process using a batch system."""
849 # Here we save profile and cluster_dir in the context so they
862 # Here we save profile and cluster_dir in the context so they
850 # can be used in the batch script template as ${profile} and
863 # can be used in the batch script template as ${profile} and
851 # ${cluster_dir}
864 # ${cluster_dir}
852 self.context['cluster_dir'] = cluster_dir
865 self.context['cluster_dir'] = cluster_dir
853 self.cluster_dir = unicode(cluster_dir)
866 self.cluster_dir = unicode(cluster_dir)
854 self.write_batch_script(n)
867 self.write_batch_script(n)
855 output = check_output(self.args, env=os.environ)
868 output = check_output(self.args, env=os.environ)
856
869
857 job_id = self.parse_job_id(output)
870 job_id = self.parse_job_id(output)
858 self.notify_start(job_id)
871 self.notify_start(job_id)
859 return job_id
872 return job_id
860
873
861 def stop(self):
874 def stop(self):
862 output = check_output(self.delete_command+[self.job_id], env=os.environ)
875 output = check_output(self.delete_command+[self.job_id], env=os.environ)
863 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
876 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
864 return output
877 return output
865
878
866
879
867 class PBSLauncher(BatchSystemLauncher):
880 class PBSLauncher(BatchSystemLauncher):
868 """A BatchSystemLauncher subclass for PBS."""
881 """A BatchSystemLauncher subclass for PBS."""
869
882
870 submit_command = List(['qsub'], config=True)
883 submit_command = List(['qsub'], config=True)
871 delete_command = List(['qdel'], config=True)
884 delete_command = List(['qdel'], config=True)
872 job_id_regexp = CUnicode(r'\d+', config=True)
885 job_id_regexp = CUnicode(r'\d+', config=True)
873
886
874 batch_file = CUnicode(u'')
887 batch_file = CUnicode(u'')
875 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
888 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
876 job_array_template = CUnicode('#PBS -t 1-$n')
889 job_array_template = CUnicode('#PBS -t 1-$n')
877 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
890 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
878 queue_template = CUnicode('#PBS -q $queue')
891 queue_template = CUnicode('#PBS -q $queue')
879
892
880
893
881 class PBSControllerLauncher(PBSLauncher):
894 class PBSControllerLauncher(PBSLauncher):
882 """Launch a controller using PBS."""
895 """Launch a controller using PBS."""
883
896
884 batch_file_name = CUnicode(u'pbs_controller', config=True)
897 batch_file_name = CUnicode(u'pbs_controller', config=True)
885 default_template= CUnicode("""#!/bin/sh
898 default_template= CUnicode("""#!/bin/sh
886 #PBS -V
899 #PBS -V
887 #PBS -N ipcontroller
900 #PBS -N ipcontroller
888 %s --log-to-file --cluster-dir $cluster_dir
901 %s --log-to-file --cluster-dir $cluster_dir
889 """%(' '.join(ipcontroller_cmd_argv)))
902 """%(' '.join(ipcontroller_cmd_argv)))
890
903
891 def start(self, cluster_dir):
904 def start(self, cluster_dir):
892 """Start the controller by profile or cluster_dir."""
905 """Start the controller by profile or cluster_dir."""
893 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
906 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
894 return super(PBSControllerLauncher, self).start(1, cluster_dir)
907 return super(PBSControllerLauncher, self).start(1, cluster_dir)
895
908
896
909
897 class PBSEngineSetLauncher(PBSLauncher):
910 class PBSEngineSetLauncher(PBSLauncher):
898 """Launch Engines using PBS"""
911 """Launch Engines using PBS"""
899 batch_file_name = CUnicode(u'pbs_engines', config=True)
912 batch_file_name = CUnicode(u'pbs_engines', config=True)
900 default_template= CUnicode(u"""#!/bin/sh
913 default_template= CUnicode(u"""#!/bin/sh
901 #PBS -V
914 #PBS -V
902 #PBS -N ipengine
915 #PBS -N ipengine
903 %s --cluster-dir $cluster_dir
916 %s --cluster-dir $cluster_dir
904 """%(' '.join(ipengine_cmd_argv)))
917 """%(' '.join(ipengine_cmd_argv)))
905
918
906 def start(self, n, cluster_dir):
919 def start(self, n, cluster_dir):
907 """Start n engines by profile or cluster_dir."""
920 """Start n engines by profile or cluster_dir."""
908 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
921 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
909 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
922 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
910
923
911 #SGE is very similar to PBS
924 #SGE is very similar to PBS
912
925
913 class SGELauncher(PBSLauncher):
926 class SGELauncher(PBSLauncher):
914 """Sun GridEngine is a PBS clone with slightly different syntax"""
927 """Sun GridEngine is a PBS clone with slightly different syntax"""
915 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
928 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
916 job_array_template = CUnicode('#$$ -t 1-$n')
929 job_array_template = CUnicode('#$$ -t 1-$n')
917 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
930 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
918 queue_template = CUnicode('#$$ -q $queue')
931 queue_template = CUnicode('#$$ -q $queue')
919
932
920 class SGEControllerLauncher(SGELauncher):
933 class SGEControllerLauncher(SGELauncher):
921 """Launch a controller using SGE."""
934 """Launch a controller using SGE."""
922
935
923 batch_file_name = CUnicode(u'sge_controller', config=True)
936 batch_file_name = CUnicode(u'sge_controller', config=True)
924 default_template= CUnicode(u"""#$$ -V
937 default_template= CUnicode(u"""#$$ -V
925 #$$ -S /bin/sh
938 #$$ -S /bin/sh
926 #$$ -N ipcontroller
939 #$$ -N ipcontroller
927 %s --log-to-file --cluster-dir $cluster_dir
940 %s --log-to-file --cluster-dir $cluster_dir
928 """%(' '.join(ipcontroller_cmd_argv)))
941 """%(' '.join(ipcontroller_cmd_argv)))
929
942
930 def start(self, cluster_dir):
943 def start(self, cluster_dir):
931 """Start the controller by profile or cluster_dir."""
944 """Start the controller by profile or cluster_dir."""
932 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
945 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
933 return super(PBSControllerLauncher, self).start(1, cluster_dir)
946 return super(PBSControllerLauncher, self).start(1, cluster_dir)
934
947
935 class SGEEngineSetLauncher(SGELauncher):
948 class SGEEngineSetLauncher(SGELauncher):
936 """Launch Engines with SGE"""
949 """Launch Engines with SGE"""
937 batch_file_name = CUnicode(u'sge_engines', config=True)
950 batch_file_name = CUnicode(u'sge_engines', config=True)
938 default_template = CUnicode("""#$$ -V
951 default_template = CUnicode("""#$$ -V
939 #$$ -S /bin/sh
952 #$$ -S /bin/sh
940 #$$ -N ipengine
953 #$$ -N ipengine
941 %s --cluster-dir $cluster_dir
954 %s --cluster-dir $cluster_dir
942 """%(' '.join(ipengine_cmd_argv)))
955 """%(' '.join(ipengine_cmd_argv)))
943
956
944 def start(self, n, cluster_dir):
957 def start(self, n, cluster_dir):
945 """Start n engines by profile or cluster_dir."""
958 """Start n engines by profile or cluster_dir."""
946 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
959 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
947 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
960 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
948
961
949
962
950 #-----------------------------------------------------------------------------
963 #-----------------------------------------------------------------------------
951 # A launcher for ipcluster itself!
964 # A launcher for ipcluster itself!
952 #-----------------------------------------------------------------------------
965 #-----------------------------------------------------------------------------
953
966
954
967
955 class IPClusterLauncher(LocalProcessLauncher):
968 class IPClusterLauncher(LocalProcessLauncher):
956 """Launch the ipcluster program in an external process."""
969 """Launch the ipcluster program in an external process."""
957
970
958 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
971 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
959 # Command line arguments to pass to ipcluster.
972 # Command line arguments to pass to ipcluster.
960 ipcluster_args = List(
973 ipcluster_args = List(
961 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
974 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
962 ipcluster_subcommand = Str('start')
975 ipcluster_subcommand = Str('start')
963 ipcluster_n = Int(2)
976 ipcluster_n = Int(2)
964
977
965 def find_args(self):
978 def find_args(self):
966 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
979 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
967 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
980 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
968
981
969 def start(self):
982 def start(self):
970 self.log.info("Starting ipcluster: %r" % self.args)
983 self.log.info("Starting ipcluster: %r" % self.args)
971 return super(IPClusterLauncher, self).start()
984 return super(IPClusterLauncher, self).start()
972
985
General Comments 0
You need to be logged in to leave comments. Login now