##// END OF EJS Templates
improve process cleanup on Windows...
MinRK -
Show More
@@ -1,994 +1,996 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
25
24 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
25 try:
27 try:
26 from signal import SIGKILL
28 from signal import SIGKILL
27 except ImportError:
29 except ImportError:
28 # windows
30 # Windows
29 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
30
32
33 try:
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
37 pass
38
31 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
32 try:
40 try:
33 from subprocess import check_output
41 from subprocess import check_output
34 except ImportError:
42 except ImportError:
35 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
36 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
37 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
38 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
39 out,err = p.communicate()
47 out,err = p.communicate()
40 return out
48 return out
41
49
42 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
43
51
44 from IPython.external import Itpl
52 from IPython.external import Itpl
45 # from IPython.config.configurable import Configurable
53 # from IPython.config.configurable import Configurable
46 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
54 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
47 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.path import get_ipython_module_path
48 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49
57
50 from IPython.parallel.factory import LoggingFactory
58 from IPython.parallel.factory import LoggingFactory
51
59
52 from .win32support import forward_read_events
60 from .win32support import forward_read_events
53
61
54 # load winhpcjob only on Windows
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
55 try:
56 from .winhpcjob import (
57 IPControllerTask, IPEngineTask,
58 IPControllerJob, IPEngineSetJob
59 )
60 except ImportError:
61 pass
62
63
63 WINDOWS = os.name == 'nt'
64 WINDOWS = os.name == 'nt'
64
65
65 if WINDOWS:
66 try:
67 # >= 2.7, 3.2
68 from signal import CTRL_C_EVENT as SIGINT
69 except ImportError:
70 pass
71
72 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
67 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
75
69
76
70
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
72 'IPython.parallel.apps.ipclusterapp'
79 ))
73 ))
80
74
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
76 'IPython.parallel.apps.ipengineapp'
83 ))
77 ))
84
78
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
80 'IPython.parallel.apps.ipcontrollerapp'
87 ))
81 ))
88
82
89 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
90 # Base launchers and errors
84 # Base launchers and errors
91 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
92
86
93
87
94 class LauncherError(Exception):
88 class LauncherError(Exception):
95 pass
89 pass
96
90
97
91
98 class ProcessStateError(LauncherError):
92 class ProcessStateError(LauncherError):
99 pass
93 pass
100
94
101
95
102 class UnknownStatus(LauncherError):
96 class UnknownStatus(LauncherError):
103 pass
97 pass
104
98
105
99
106 class BaseLauncher(LoggingFactory):
100 class BaseLauncher(LoggingFactory):
107 """An asbtraction for starting, stopping and signaling a process."""
101 """An asbtraction for starting, stopping and signaling a process."""
108
102
109 # In all of the launchers, the work_dir is where child processes will be
103 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 # run. This will usually be the cluster_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
105 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
106 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
107 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
108 # controller_args, engine_args attributes of the launchers to add
115 # the --work-dir option.
109 # the --work-dir option.
116 work_dir = Unicode(u'.')
110 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
112
119 start_data = Any()
113 start_data = Any()
120 stop_data = Any()
114 stop_data = Any()
121
115
122 def _loop_default(self):
116 def _loop_default(self):
123 return ioloop.IOLoop.instance()
117 return ioloop.IOLoop.instance()
124
118
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
121 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
122 self.stop_callbacks = []
129 self.start_data = None
123 self.start_data = None
130 self.stop_data = None
124 self.stop_data = None
131
125
132 @property
126 @property
133 def args(self):
127 def args(self):
134 """A list of cmd and args that will be used to start the process.
128 """A list of cmd and args that will be used to start the process.
135
129
136 This is what is passed to :func:`spawnProcess` and the first element
130 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
131 will be the process name.
138 """
132 """
139 return self.find_args()
133 return self.find_args()
140
134
141 def find_args(self):
135 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
136 """The ``.args`` property calls this to find the args list.
143
137
144 Subcommand should implement this to construct the cmd and args.
138 Subcommand should implement this to construct the cmd and args.
145 """
139 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
140 raise NotImplementedError('find_args must be implemented in a subclass')
147
141
148 @property
142 @property
149 def arg_str(self):
143 def arg_str(self):
150 """The string form of the program arguments."""
144 """The string form of the program arguments."""
151 return ' '.join(self.args)
145 return ' '.join(self.args)
152
146
153 @property
147 @property
154 def running(self):
148 def running(self):
155 """Am I running."""
149 """Am I running."""
156 if self.state == 'running':
150 if self.state == 'running':
157 return True
151 return True
158 else:
152 else:
159 return False
153 return False
160
154
161 def start(self):
155 def start(self):
162 """Start the process.
156 """Start the process.
163
157
164 This must return a deferred that fires with information about the
158 This must return a deferred that fires with information about the
165 process starting (like a pid, job id, etc.).
159 process starting (like a pid, job id, etc.).
166 """
160 """
167 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
168
162
169 def stop(self):
163 def stop(self):
170 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
171
165
172 This must return a deferred that fires with information about the
166 This must return a deferred that fires with information about the
173 processing stopping, like errors that occur while the process is
167 processing stopping, like errors that occur while the process is
174 attempting to be shut down. This deferred won't fire when the process
168 attempting to be shut down. This deferred won't fire when the process
175 actually stops. To observe the actual process stopping, see
169 actually stops. To observe the actual process stopping, see
176 :func:`observe_stop`.
170 :func:`observe_stop`.
177 """
171 """
178 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
179
173
180 def on_stop(self, f):
174 def on_stop(self, f):
181 """Get a deferred that will fire when the process stops.
175 """Get a deferred that will fire when the process stops.
182
176
183 The deferred will fire with data that contains information about
177 The deferred will fire with data that contains information about
184 the exit status of the process.
178 the exit status of the process.
185 """
179 """
186 if self.state=='after':
180 if self.state=='after':
187 return f(self.stop_data)
181 return f(self.stop_data)
188 else:
182 else:
189 self.stop_callbacks.append(f)
183 self.stop_callbacks.append(f)
190
184
191 def notify_start(self, data):
185 def notify_start(self, data):
192 """Call this to trigger startup actions.
186 """Call this to trigger startup actions.
193
187
194 This logs the process startup and sets the state to 'running'. It is
188 This logs the process startup and sets the state to 'running'. It is
195 a pass-through so it can be used as a callback.
189 a pass-through so it can be used as a callback.
196 """
190 """
197
191
198 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.log.info('Process %r started: %r' % (self.args[0], data))
199 self.start_data = data
193 self.start_data = data
200 self.state = 'running'
194 self.state = 'running'
201 return data
195 return data
202
196
203 def notify_stop(self, data):
197 def notify_stop(self, data):
204 """Call this to trigger process stop actions.
198 """Call this to trigger process stop actions.
205
199
206 This logs the process stopping and sets the state to 'after'. Call
200 This logs the process stopping and sets the state to 'after'. Call
207 this to trigger all the deferreds from :func:`observe_stop`."""
201 this to trigger all the deferreds from :func:`observe_stop`."""
208
202
209 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
210 self.stop_data = data
204 self.stop_data = data
211 self.state = 'after'
205 self.state = 'after'
212 for i in range(len(self.stop_callbacks)):
206 for i in range(len(self.stop_callbacks)):
213 d = self.stop_callbacks.pop()
207 d = self.stop_callbacks.pop()
214 d(data)
208 d(data)
215 return data
209 return data
216
210
217 def signal(self, sig):
211 def signal(self, sig):
218 """Signal the process.
212 """Signal the process.
219
213
220 Return a semi-meaningless deferred after signaling the process.
214 Return a semi-meaningless deferred after signaling the process.
221
215
222 Parameters
216 Parameters
223 ----------
217 ----------
224 sig : str or int
218 sig : str or int
225 'KILL', 'INT', etc., or any signal number
219 'KILL', 'INT', etc., or any signal number
226 """
220 """
227 raise NotImplementedError('signal must be implemented in a subclass')
221 raise NotImplementedError('signal must be implemented in a subclass')
228
222
229
223
230 #-----------------------------------------------------------------------------
224 #-----------------------------------------------------------------------------
231 # Local process launchers
225 # Local process launchers
232 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
233
227
234
228
235 class LocalProcessLauncher(BaseLauncher):
229 class LocalProcessLauncher(BaseLauncher):
236 """Start and stop an external process in an asynchronous manner.
230 """Start and stop an external process in an asynchronous manner.
237
231
238 This will launch the external process with a working directory of
232 This will launch the external process with a working directory of
239 ``self.work_dir``.
233 ``self.work_dir``.
240 """
234 """
241
235
242 # This is used to to construct self.args, which is passed to
236 # This is used to to construct self.args, which is passed to
243 # spawnProcess.
237 # spawnProcess.
244 cmd_and_args = List([])
238 cmd_and_args = List([])
245 poll_frequency = Int(100) # in ms
239 poll_frequency = Int(100) # in ms
246
240
247 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
248 super(LocalProcessLauncher, self).__init__(
242 super(LocalProcessLauncher, self).__init__(
249 work_dir=work_dir, config=config, **kwargs
243 work_dir=work_dir, config=config, **kwargs
250 )
244 )
251 self.process = None
245 self.process = None
252 self.start_deferred = None
246 self.start_deferred = None
253 self.poller = None
247 self.poller = None
254
248
255 def find_args(self):
249 def find_args(self):
256 return self.cmd_and_args
250 return self.cmd_and_args
257
251
258 def start(self):
252 def start(self):
259 if self.state == 'before':
253 if self.state == 'before':
260 self.process = Popen(self.args,
254 self.process = Popen(self.args,
261 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
262 env=os.environ,
256 env=os.environ,
263 cwd=self.work_dir
257 cwd=self.work_dir
264 )
258 )
265 if WINDOWS:
259 if WINDOWS:
266 self.stdout = forward_read_events(self.process.stdout)
260 self.stdout = forward_read_events(self.process.stdout)
267 self.stderr = forward_read_events(self.process.stderr)
261 self.stderr = forward_read_events(self.process.stderr)
268 else:
262 else:
269 self.stdout = self.process.stdout.fileno()
263 self.stdout = self.process.stdout.fileno()
270 self.stderr = self.process.stderr.fileno()
264 self.stderr = self.process.stderr.fileno()
271 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
272 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
273 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
274 self.poller.start()
268 self.poller.start()
275 self.notify_start(self.process.pid)
269 self.notify_start(self.process.pid)
276 else:
270 else:
277 s = 'The process was already started and has state: %r' % self.state
271 s = 'The process was already started and has state: %r' % self.state
278 raise ProcessStateError(s)
272 raise ProcessStateError(s)
279
273
280 def stop(self):
274 def stop(self):
281 return self.interrupt_then_kill()
275 return self.interrupt_then_kill()
282
276
283 def signal(self, sig):
277 def signal(self, sig):
284 if self.state == 'running':
278 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
285 self.process.send_signal(sig)
283 self.process.send_signal(sig)
286
284
287 def interrupt_then_kill(self, delay=2.0):
285 def interrupt_then_kill(self, delay=2.0):
288 """Send INT, wait a delay and then send KILL."""
286 """Send INT, wait a delay and then send KILL."""
287 try:
289 self.signal(SIGINT)
288 self.signal(SIGINT)
289 except Exception:
290 self.log.debug("interrupt failed")
291 pass
290 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 self.killer.start()
293 self.killer.start()
292
294
293 # callbacks, etc:
295 # callbacks, etc:
294
296
295 def handle_stdout(self, fd, events):
297 def handle_stdout(self, fd, events):
296 if WINDOWS:
298 if WINDOWS:
297 line = self.stdout.recv()
299 line = self.stdout.recv()
298 else:
300 else:
299 line = self.process.stdout.readline()
301 line = self.process.stdout.readline()
300 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
301 if line:
303 if line:
302 self.log.info(line[:-1])
304 self.log.info(line[:-1])
303 else:
305 else:
304 self.poll()
306 self.poll()
305
307
306 def handle_stderr(self, fd, events):
308 def handle_stderr(self, fd, events):
307 if WINDOWS:
309 if WINDOWS:
308 line = self.stderr.recv()
310 line = self.stderr.recv()
309 else:
311 else:
310 line = self.process.stderr.readline()
312 line = self.process.stderr.readline()
311 # a stopped process will be readable but return empty strings
313 # a stopped process will be readable but return empty strings
312 if line:
314 if line:
313 self.log.error(line[:-1])
315 self.log.error(line[:-1])
314 else:
316 else:
315 self.poll()
317 self.poll()
316
318
317 def poll(self):
319 def poll(self):
318 status = self.process.poll()
320 status = self.process.poll()
319 if status is not None:
321 if status is not None:
320 self.poller.stop()
322 self.poller.stop()
321 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stdout)
322 self.loop.remove_handler(self.stderr)
324 self.loop.remove_handler(self.stderr)
323 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
324 return status
326 return status
325
327
326 class LocalControllerLauncher(LocalProcessLauncher):
328 class LocalControllerLauncher(LocalProcessLauncher):
327 """Launch a controller as a regular external process."""
329 """Launch a controller as a regular external process."""
328
330
329 controller_cmd = List(ipcontroller_cmd_argv, config=True)
331 controller_cmd = List(ipcontroller_cmd_argv, config=True)
330 # Command line arguments to ipcontroller.
332 # Command line arguments to ipcontroller.
331 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
333 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
332
334
333 def find_args(self):
335 def find_args(self):
334 return self.controller_cmd + self.controller_args
336 return self.controller_cmd + self.controller_args
335
337
336 def start(self, cluster_dir):
338 def start(self, cluster_dir):
337 """Start the controller by cluster_dir."""
339 """Start the controller by cluster_dir."""
338 self.controller_args.extend(['--cluster-dir', cluster_dir])
340 self.controller_args.extend(['--cluster-dir', cluster_dir])
339 self.cluster_dir = unicode(cluster_dir)
341 self.cluster_dir = unicode(cluster_dir)
340 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
342 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
341 return super(LocalControllerLauncher, self).start()
343 return super(LocalControllerLauncher, self).start()
342
344
343
345
344 class LocalEngineLauncher(LocalProcessLauncher):
346 class LocalEngineLauncher(LocalProcessLauncher):
345 """Launch a single engine as a regular externall process."""
347 """Launch a single engine as a regular externall process."""
346
348
347 engine_cmd = List(ipengine_cmd_argv, config=True)
349 engine_cmd = List(ipengine_cmd_argv, config=True)
348 # Command line arguments for ipengine.
350 # Command line arguments for ipengine.
349 engine_args = List(
351 engine_args = List(
350 ['--log-to-file','--log-level', str(logging.INFO)], config=True
352 ['--log-to-file','--log-level', str(logging.INFO)], config=True
351 )
353 )
352
354
353 def find_args(self):
355 def find_args(self):
354 return self.engine_cmd + self.engine_args
356 return self.engine_cmd + self.engine_args
355
357
356 def start(self, cluster_dir):
358 def start(self, cluster_dir):
357 """Start the engine by cluster_dir."""
359 """Start the engine by cluster_dir."""
358 self.engine_args.extend(['--cluster-dir', cluster_dir])
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
359 self.cluster_dir = unicode(cluster_dir)
361 self.cluster_dir = unicode(cluster_dir)
360 return super(LocalEngineLauncher, self).start()
362 return super(LocalEngineLauncher, self).start()
361
363
362
364
363 class LocalEngineSetLauncher(BaseLauncher):
365 class LocalEngineSetLauncher(BaseLauncher):
364 """Launch a set of engines as regular external processes."""
366 """Launch a set of engines as regular external processes."""
365
367
366 # Command line arguments for ipengine.
368 # Command line arguments for ipengine.
367 engine_args = List(
369 engine_args = List(
368 ['--log-to-file','--log-level', str(logging.INFO)], config=True
370 ['--log-to-file','--log-level', str(logging.INFO)], config=True
369 )
371 )
370 # launcher class
372 # launcher class
371 launcher_class = LocalEngineLauncher
373 launcher_class = LocalEngineLauncher
372
374
373 launchers = Dict()
375 launchers = Dict()
374 stop_data = Dict()
376 stop_data = Dict()
375
377
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
378 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 super(LocalEngineSetLauncher, self).__init__(
379 super(LocalEngineSetLauncher, self).__init__(
378 work_dir=work_dir, config=config, **kwargs
380 work_dir=work_dir, config=config, **kwargs
379 )
381 )
380 self.stop_data = {}
382 self.stop_data = {}
381
383
382 def start(self, n, cluster_dir):
384 def start(self, n, cluster_dir):
383 """Start n engines by profile or cluster_dir."""
385 """Start n engines by profile or cluster_dir."""
384 self.cluster_dir = unicode(cluster_dir)
386 self.cluster_dir = unicode(cluster_dir)
385 dlist = []
387 dlist = []
386 for i in range(n):
388 for i in range(n):
387 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
389 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
388 # Copy the engine args over to each engine launcher.
390 # Copy the engine args over to each engine launcher.
389 el.engine_args = copy.deepcopy(self.engine_args)
391 el.engine_args = copy.deepcopy(self.engine_args)
390 el.on_stop(self._notice_engine_stopped)
392 el.on_stop(self._notice_engine_stopped)
391 d = el.start(cluster_dir)
393 d = el.start(cluster_dir)
392 if i==0:
394 if i==0:
393 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
395 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
394 self.launchers[i] = el
396 self.launchers[i] = el
395 dlist.append(d)
397 dlist.append(d)
396 self.notify_start(dlist)
398 self.notify_start(dlist)
397 # The consumeErrors here could be dangerous
399 # The consumeErrors here could be dangerous
398 # dfinal = gatherBoth(dlist, consumeErrors=True)
400 # dfinal = gatherBoth(dlist, consumeErrors=True)
399 # dfinal.addCallback(self.notify_start)
401 # dfinal.addCallback(self.notify_start)
400 return dlist
402 return dlist
401
403
402 def find_args(self):
404 def find_args(self):
403 return ['engine set']
405 return ['engine set']
404
406
405 def signal(self, sig):
407 def signal(self, sig):
406 dlist = []
408 dlist = []
407 for el in self.launchers.itervalues():
409 for el in self.launchers.itervalues():
408 d = el.signal(sig)
410 d = el.signal(sig)
409 dlist.append(d)
411 dlist.append(d)
410 # dfinal = gatherBoth(dlist, consumeErrors=True)
412 # dfinal = gatherBoth(dlist, consumeErrors=True)
411 return dlist
413 return dlist
412
414
413 def interrupt_then_kill(self, delay=1.0):
415 def interrupt_then_kill(self, delay=1.0):
414 dlist = []
416 dlist = []
415 for el in self.launchers.itervalues():
417 for el in self.launchers.itervalues():
416 d = el.interrupt_then_kill(delay)
418 d = el.interrupt_then_kill(delay)
417 dlist.append(d)
419 dlist.append(d)
418 # dfinal = gatherBoth(dlist, consumeErrors=True)
420 # dfinal = gatherBoth(dlist, consumeErrors=True)
419 return dlist
421 return dlist
420
422
421 def stop(self):
423 def stop(self):
422 return self.interrupt_then_kill()
424 return self.interrupt_then_kill()
423
425
424 def _notice_engine_stopped(self, data):
426 def _notice_engine_stopped(self, data):
425 pid = data['pid']
427 pid = data['pid']
426 for idx,el in self.launchers.iteritems():
428 for idx,el in self.launchers.iteritems():
427 if el.process.pid == pid:
429 if el.process.pid == pid:
428 break
430 break
429 self.launchers.pop(idx)
431 self.launchers.pop(idx)
430 self.stop_data[idx] = data
432 self.stop_data[idx] = data
431 if not self.launchers:
433 if not self.launchers:
432 self.notify_stop(self.stop_data)
434 self.notify_stop(self.stop_data)
433
435
434
436
435 #-----------------------------------------------------------------------------
437 #-----------------------------------------------------------------------------
436 # MPIExec launchers
438 # MPIExec launchers
437 #-----------------------------------------------------------------------------
439 #-----------------------------------------------------------------------------
438
440
439
441
440 class MPIExecLauncher(LocalProcessLauncher):
442 class MPIExecLauncher(LocalProcessLauncher):
441 """Launch an external process using mpiexec."""
443 """Launch an external process using mpiexec."""
442
444
443 # The mpiexec command to use in starting the process.
445 # The mpiexec command to use in starting the process.
444 mpi_cmd = List(['mpiexec'], config=True)
446 mpi_cmd = List(['mpiexec'], config=True)
445 # The command line arguments to pass to mpiexec.
447 # The command line arguments to pass to mpiexec.
446 mpi_args = List([], config=True)
448 mpi_args = List([], config=True)
447 # The program to start using mpiexec.
449 # The program to start using mpiexec.
448 program = List(['date'], config=True)
450 program = List(['date'], config=True)
449 # The command line argument to the program.
451 # The command line argument to the program.
450 program_args = List([], config=True)
452 program_args = List([], config=True)
451 # The number of instances of the program to start.
453 # The number of instances of the program to start.
452 n = Int(1, config=True)
454 n = Int(1, config=True)
453
455
454 def find_args(self):
456 def find_args(self):
455 """Build self.args using all the fields."""
457 """Build self.args using all the fields."""
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
458 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
457 self.program + self.program_args
459 self.program + self.program_args
458
460
459 def start(self, n):
461 def start(self, n):
460 """Start n instances of the program using mpiexec."""
462 """Start n instances of the program using mpiexec."""
461 self.n = n
463 self.n = n
462 return super(MPIExecLauncher, self).start()
464 return super(MPIExecLauncher, self).start()
463
465
464
466
465 class MPIExecControllerLauncher(MPIExecLauncher):
467 class MPIExecControllerLauncher(MPIExecLauncher):
466 """Launch a controller using mpiexec."""
468 """Launch a controller using mpiexec."""
467
469
468 controller_cmd = List(ipcontroller_cmd_argv, config=True)
470 controller_cmd = List(ipcontroller_cmd_argv, config=True)
469 # Command line arguments to ipcontroller.
471 # Command line arguments to ipcontroller.
470 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
472 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
471 n = Int(1, config=False)
473 n = Int(1, config=False)
472
474
473 def start(self, cluster_dir):
475 def start(self, cluster_dir):
474 """Start the controller by cluster_dir."""
476 """Start the controller by cluster_dir."""
475 self.controller_args.extend(['--cluster-dir', cluster_dir])
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
476 self.cluster_dir = unicode(cluster_dir)
478 self.cluster_dir = unicode(cluster_dir)
477 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
479 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
478 return super(MPIExecControllerLauncher, self).start(1)
480 return super(MPIExecControllerLauncher, self).start(1)
479
481
480 def find_args(self):
482 def find_args(self):
481 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
483 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
482 self.controller_cmd + self.controller_args
484 self.controller_cmd + self.controller_args
483
485
484
486
485 class MPIExecEngineSetLauncher(MPIExecLauncher):
487 class MPIExecEngineSetLauncher(MPIExecLauncher):
486
488
487 program = List(ipengine_cmd_argv, config=True)
489 program = List(ipengine_cmd_argv, config=True)
488 # Command line arguments for ipengine.
490 # Command line arguments for ipengine.
489 program_args = List(
491 program_args = List(
490 ['--log-to-file','--log-level', str(logging.INFO)], config=True
492 ['--log-to-file','--log-level', str(logging.INFO)], config=True
491 )
493 )
492 n = Int(1, config=True)
494 n = Int(1, config=True)
493
495
494 def start(self, n, cluster_dir):
496 def start(self, n, cluster_dir):
495 """Start n engines by profile or cluster_dir."""
497 """Start n engines by profile or cluster_dir."""
496 self.program_args.extend(['--cluster-dir', cluster_dir])
498 self.program_args.extend(['--cluster-dir', cluster_dir])
497 self.cluster_dir = unicode(cluster_dir)
499 self.cluster_dir = unicode(cluster_dir)
498 self.n = n
500 self.n = n
499 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
501 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
500 return super(MPIExecEngineSetLauncher, self).start(n)
502 return super(MPIExecEngineSetLauncher, self).start(n)
501
503
502 #-----------------------------------------------------------------------------
504 #-----------------------------------------------------------------------------
503 # SSH launchers
505 # SSH launchers
504 #-----------------------------------------------------------------------------
506 #-----------------------------------------------------------------------------
505
507
506 # TODO: Get SSH Launcher working again.
508 # TODO: Get SSH Launcher working again.
507
509
508 class SSHLauncher(LocalProcessLauncher):
510 class SSHLauncher(LocalProcessLauncher):
509 """A minimal launcher for ssh.
511 """A minimal launcher for ssh.
510
512
511 To be useful this will probably have to be extended to use the ``sshx``
513 To be useful this will probably have to be extended to use the ``sshx``
512 idea for environment variables. There could be other things this needs
514 idea for environment variables. There could be other things this needs
513 as well.
515 as well.
514 """
516 """
515
517
516 ssh_cmd = List(['ssh'], config=True)
518 ssh_cmd = List(['ssh'], config=True)
517 ssh_args = List(['-tt'], config=True)
519 ssh_args = List(['-tt'], config=True)
518 program = List(['date'], config=True)
520 program = List(['date'], config=True)
519 program_args = List([], config=True)
521 program_args = List([], config=True)
520 hostname = CUnicode('', config=True)
522 hostname = CUnicode('', config=True)
521 user = CUnicode('', config=True)
523 user = CUnicode('', config=True)
522 location = CUnicode('')
524 location = CUnicode('')
523
525
524 def _hostname_changed(self, name, old, new):
526 def _hostname_changed(self, name, old, new):
525 if self.user:
527 if self.user:
526 self.location = u'%s@%s' % (self.user, new)
528 self.location = u'%s@%s' % (self.user, new)
527 else:
529 else:
528 self.location = new
530 self.location = new
529
531
530 def _user_changed(self, name, old, new):
532 def _user_changed(self, name, old, new):
531 self.location = u'%s@%s' % (new, self.hostname)
533 self.location = u'%s@%s' % (new, self.hostname)
532
534
533 def find_args(self):
535 def find_args(self):
534 return self.ssh_cmd + self.ssh_args + [self.location] + \
536 return self.ssh_cmd + self.ssh_args + [self.location] + \
535 self.program + self.program_args
537 self.program + self.program_args
536
538
537 def start(self, cluster_dir, hostname=None, user=None):
539 def start(self, cluster_dir, hostname=None, user=None):
538 self.cluster_dir = unicode(cluster_dir)
540 self.cluster_dir = unicode(cluster_dir)
539 if hostname is not None:
541 if hostname is not None:
540 self.hostname = hostname
542 self.hostname = hostname
541 if user is not None:
543 if user is not None:
542 self.user = user
544 self.user = user
543
545
544 return super(SSHLauncher, self).start()
546 return super(SSHLauncher, self).start()
545
547
546 def signal(self, sig):
548 def signal(self, sig):
547 if self.state == 'running':
549 if self.state == 'running':
548 # send escaped ssh connection-closer
550 # send escaped ssh connection-closer
549 self.process.stdin.write('~.')
551 self.process.stdin.write('~.')
550 self.process.stdin.flush()
552 self.process.stdin.flush()
551
553
552
554
553
555
554 class SSHControllerLauncher(SSHLauncher):
556 class SSHControllerLauncher(SSHLauncher):
555
557
556 program = List(ipcontroller_cmd_argv, config=True)
558 program = List(ipcontroller_cmd_argv, config=True)
557 # Command line arguments to ipcontroller.
559 # Command line arguments to ipcontroller.
558 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
560 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
559
561
560
562
561 class SSHEngineLauncher(SSHLauncher):
563 class SSHEngineLauncher(SSHLauncher):
562 program = List(ipengine_cmd_argv, config=True)
564 program = List(ipengine_cmd_argv, config=True)
563 # Command line arguments for ipengine.
565 # Command line arguments for ipengine.
564 program_args = List(
566 program_args = List(
565 ['--log-to-file','--log-level', str(logging.INFO)], config=True
567 ['--log-to-file','--log-level', str(logging.INFO)], config=True
566 )
568 )
567
569
568 class SSHEngineSetLauncher(LocalEngineSetLauncher):
570 class SSHEngineSetLauncher(LocalEngineSetLauncher):
569 launcher_class = SSHEngineLauncher
571 launcher_class = SSHEngineLauncher
570 engines = Dict(config=True)
572 engines = Dict(config=True)
571
573
572 def start(self, n, cluster_dir):
574 def start(self, n, cluster_dir):
573 """Start engines by profile or cluster_dir.
575 """Start engines by profile or cluster_dir.
574 `n` is ignored, and the `engines` config property is used instead.
576 `n` is ignored, and the `engines` config property is used instead.
575 """
577 """
576
578
577 self.cluster_dir = unicode(cluster_dir)
579 self.cluster_dir = unicode(cluster_dir)
578 dlist = []
580 dlist = []
579 for host, n in self.engines.iteritems():
581 for host, n in self.engines.iteritems():
580 if isinstance(n, (tuple, list)):
582 if isinstance(n, (tuple, list)):
581 n, args = n
583 n, args = n
582 else:
584 else:
583 args = copy.deepcopy(self.engine_args)
585 args = copy.deepcopy(self.engine_args)
584
586
585 if '@' in host:
587 if '@' in host:
586 user,host = host.split('@',1)
588 user,host = host.split('@',1)
587 else:
589 else:
588 user=None
590 user=None
589 for i in range(n):
591 for i in range(n):
590 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
592 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
591
593
592 # Copy the engine args over to each engine launcher.
594 # Copy the engine args over to each engine launcher.
593 i
595 i
594 el.program_args = args
596 el.program_args = args
595 el.on_stop(self._notice_engine_stopped)
597 el.on_stop(self._notice_engine_stopped)
596 d = el.start(cluster_dir, user=user, hostname=host)
598 d = el.start(cluster_dir, user=user, hostname=host)
597 if i==0:
599 if i==0:
598 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
600 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
599 self.launchers[host+str(i)] = el
601 self.launchers[host+str(i)] = el
600 dlist.append(d)
602 dlist.append(d)
601 self.notify_start(dlist)
603 self.notify_start(dlist)
602 return dlist
604 return dlist
603
605
604
606
605
607
606 #-----------------------------------------------------------------------------
608 #-----------------------------------------------------------------------------
607 # Windows HPC Server 2008 scheduler launchers
609 # Windows HPC Server 2008 scheduler launchers
608 #-----------------------------------------------------------------------------
610 #-----------------------------------------------------------------------------
609
611
610
612
611 # This is only used on Windows.
613 # This is only used on Windows.
612 def find_job_cmd():
614 def find_job_cmd():
613 if WINDOWS:
615 if WINDOWS:
614 try:
616 try:
615 return find_cmd('job')
617 return find_cmd('job')
616 except (FindCmdError, ImportError):
618 except (FindCmdError, ImportError):
617 # ImportError will be raised if win32api is not installed
619 # ImportError will be raised if win32api is not installed
618 return 'job'
620 return 'job'
619 else:
621 else:
620 return 'job'
622 return 'job'
621
623
622
624
623 class WindowsHPCLauncher(BaseLauncher):
625 class WindowsHPCLauncher(BaseLauncher):
624
626
625 # A regular expression used to get the job id from the output of the
627 # A regular expression used to get the job id from the output of the
626 # submit_command.
628 # submit_command.
627 job_id_regexp = Str(r'\d+', config=True)
629 job_id_regexp = Str(r'\d+', config=True)
628 # The filename of the instantiated job script.
630 # The filename of the instantiated job script.
629 job_file_name = CUnicode(u'ipython_job.xml', config=True)
631 job_file_name = CUnicode(u'ipython_job.xml', config=True)
630 # The full path to the instantiated job script. This gets made dynamically
632 # The full path to the instantiated job script. This gets made dynamically
631 # by combining the work_dir with the job_file_name.
633 # by combining the work_dir with the job_file_name.
632 job_file = CUnicode(u'')
634 job_file = CUnicode(u'')
633 # The hostname of the scheduler to submit the job to
635 # The hostname of the scheduler to submit the job to
634 scheduler = CUnicode('', config=True)
636 scheduler = CUnicode('', config=True)
635 job_cmd = CUnicode(find_job_cmd(), config=True)
637 job_cmd = CUnicode(find_job_cmd(), config=True)
636
638
637 def __init__(self, work_dir=u'.', config=None, **kwargs):
639 def __init__(self, work_dir=u'.', config=None, **kwargs):
638 super(WindowsHPCLauncher, self).__init__(
640 super(WindowsHPCLauncher, self).__init__(
639 work_dir=work_dir, config=config, **kwargs
641 work_dir=work_dir, config=config, **kwargs
640 )
642 )
641
643
642 @property
644 @property
643 def job_file(self):
645 def job_file(self):
644 return os.path.join(self.work_dir, self.job_file_name)
646 return os.path.join(self.work_dir, self.job_file_name)
645
647
646 def write_job_file(self, n):
648 def write_job_file(self, n):
647 raise NotImplementedError("Implement write_job_file in a subclass.")
649 raise NotImplementedError("Implement write_job_file in a subclass.")
648
650
649 def find_args(self):
651 def find_args(self):
650 return [u'job.exe']
652 return [u'job.exe']
651
653
652 def parse_job_id(self, output):
654 def parse_job_id(self, output):
653 """Take the output of the submit command and return the job id."""
655 """Take the output of the submit command and return the job id."""
654 m = re.search(self.job_id_regexp, output)
656 m = re.search(self.job_id_regexp, output)
655 if m is not None:
657 if m is not None:
656 job_id = m.group()
658 job_id = m.group()
657 else:
659 else:
658 raise LauncherError("Job id couldn't be determined: %s" % output)
660 raise LauncherError("Job id couldn't be determined: %s" % output)
659 self.job_id = job_id
661 self.job_id = job_id
660 self.log.info('Job started with job id: %r' % job_id)
662 self.log.info('Job started with job id: %r' % job_id)
661 return job_id
663 return job_id
662
664
663 def start(self, n):
665 def start(self, n):
664 """Start n copies of the process using the Win HPC job scheduler."""
666 """Start n copies of the process using the Win HPC job scheduler."""
665 self.write_job_file(n)
667 self.write_job_file(n)
666 args = [
668 args = [
667 'submit',
669 'submit',
668 '/jobfile:%s' % self.job_file,
670 '/jobfile:%s' % self.job_file,
669 '/scheduler:%s' % self.scheduler
671 '/scheduler:%s' % self.scheduler
670 ]
672 ]
671 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
673 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
672 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
674 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
673 output = check_output([self.job_cmd]+args,
675 output = check_output([self.job_cmd]+args,
674 env=os.environ,
676 env=os.environ,
675 cwd=self.work_dir,
677 cwd=self.work_dir,
676 stderr=STDOUT
678 stderr=STDOUT
677 )
679 )
678 job_id = self.parse_job_id(output)
680 job_id = self.parse_job_id(output)
679 self.notify_start(job_id)
681 self.notify_start(job_id)
680 return job_id
682 return job_id
681
683
682 def stop(self):
684 def stop(self):
683 args = [
685 args = [
684 'cancel',
686 'cancel',
685 self.job_id,
687 self.job_id,
686 '/scheduler:%s' % self.scheduler
688 '/scheduler:%s' % self.scheduler
687 ]
689 ]
688 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
690 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
689 try:
691 try:
690 output = check_output([self.job_cmd]+args,
692 output = check_output([self.job_cmd]+args,
691 env=os.environ,
693 env=os.environ,
692 cwd=self.work_dir,
694 cwd=self.work_dir,
693 stderr=STDOUT
695 stderr=STDOUT
694 )
696 )
695 except:
697 except:
696 output = 'The job already appears to be stoppped: %r' % self.job_id
698 output = 'The job already appears to be stoppped: %r' % self.job_id
697 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
699 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
698 return output
700 return output
699
701
700
702
701 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
703 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
702
704
703 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
705 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
704 extra_args = List([], config=False)
706 extra_args = List([], config=False)
705
707
706 def write_job_file(self, n):
708 def write_job_file(self, n):
707 job = IPControllerJob(config=self.config)
709 job = IPControllerJob(config=self.config)
708
710
709 t = IPControllerTask(config=self.config)
711 t = IPControllerTask(config=self.config)
710 # The tasks work directory is *not* the actual work directory of
712 # The tasks work directory is *not* the actual work directory of
711 # the controller. It is used as the base path for the stdout/stderr
713 # the controller. It is used as the base path for the stdout/stderr
712 # files that the scheduler redirects to.
714 # files that the scheduler redirects to.
713 t.work_directory = self.cluster_dir
715 t.work_directory = self.cluster_dir
714 # Add the --cluster-dir and from self.start().
716 # Add the --cluster-dir and from self.start().
715 t.controller_args.extend(self.extra_args)
717 t.controller_args.extend(self.extra_args)
716 job.add_task(t)
718 job.add_task(t)
717
719
718 self.log.info("Writing job description file: %s" % self.job_file)
720 self.log.info("Writing job description file: %s" % self.job_file)
719 job.write(self.job_file)
721 job.write(self.job_file)
720
722
721 @property
723 @property
722 def job_file(self):
724 def job_file(self):
723 return os.path.join(self.cluster_dir, self.job_file_name)
725 return os.path.join(self.cluster_dir, self.job_file_name)
724
726
725 def start(self, cluster_dir):
727 def start(self, cluster_dir):
726 """Start the controller by cluster_dir."""
728 """Start the controller by cluster_dir."""
727 self.extra_args = ['--cluster-dir', cluster_dir]
729 self.extra_args = ['--cluster-dir', cluster_dir]
728 self.cluster_dir = unicode(cluster_dir)
730 self.cluster_dir = unicode(cluster_dir)
729 return super(WindowsHPCControllerLauncher, self).start(1)
731 return super(WindowsHPCControllerLauncher, self).start(1)
730
732
731
733
732 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
734 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
733
735
734 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
736 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
735 extra_args = List([], config=False)
737 extra_args = List([], config=False)
736
738
737 def write_job_file(self, n):
739 def write_job_file(self, n):
738 job = IPEngineSetJob(config=self.config)
740 job = IPEngineSetJob(config=self.config)
739
741
740 for i in range(n):
742 for i in range(n):
741 t = IPEngineTask(config=self.config)
743 t = IPEngineTask(config=self.config)
742 # The tasks work directory is *not* the actual work directory of
744 # The tasks work directory is *not* the actual work directory of
743 # the engine. It is used as the base path for the stdout/stderr
745 # the engine. It is used as the base path for the stdout/stderr
744 # files that the scheduler redirects to.
746 # files that the scheduler redirects to.
745 t.work_directory = self.cluster_dir
747 t.work_directory = self.cluster_dir
746 # Add the --cluster-dir and from self.start().
748 # Add the --cluster-dir and from self.start().
747 t.engine_args.extend(self.extra_args)
749 t.engine_args.extend(self.extra_args)
748 job.add_task(t)
750 job.add_task(t)
749
751
750 self.log.info("Writing job description file: %s" % self.job_file)
752 self.log.info("Writing job description file: %s" % self.job_file)
751 job.write(self.job_file)
753 job.write(self.job_file)
752
754
753 @property
755 @property
754 def job_file(self):
756 def job_file(self):
755 return os.path.join(self.cluster_dir, self.job_file_name)
757 return os.path.join(self.cluster_dir, self.job_file_name)
756
758
757 def start(self, n, cluster_dir):
759 def start(self, n, cluster_dir):
758 """Start the controller by cluster_dir."""
760 """Start the controller by cluster_dir."""
759 self.extra_args = ['--cluster-dir', cluster_dir]
761 self.extra_args = ['--cluster-dir', cluster_dir]
760 self.cluster_dir = unicode(cluster_dir)
762 self.cluster_dir = unicode(cluster_dir)
761 return super(WindowsHPCEngineSetLauncher, self).start(n)
763 return super(WindowsHPCEngineSetLauncher, self).start(n)
762
764
763
765
764 #-----------------------------------------------------------------------------
766 #-----------------------------------------------------------------------------
765 # Batch (PBS) system launchers
767 # Batch (PBS) system launchers
766 #-----------------------------------------------------------------------------
768 #-----------------------------------------------------------------------------
767
769
768 class BatchSystemLauncher(BaseLauncher):
770 class BatchSystemLauncher(BaseLauncher):
769 """Launch an external process using a batch system.
771 """Launch an external process using a batch system.
770
772
771 This class is designed to work with UNIX batch systems like PBS, LSF,
773 This class is designed to work with UNIX batch systems like PBS, LSF,
772 GridEngine, etc. The overall model is that there are different commands
774 GridEngine, etc. The overall model is that there are different commands
773 like qsub, qdel, etc. that handle the starting and stopping of the process.
775 like qsub, qdel, etc. that handle the starting and stopping of the process.
774
776
775 This class also has the notion of a batch script. The ``batch_template``
777 This class also has the notion of a batch script. The ``batch_template``
776 attribute can be set to a string that is a template for the batch script.
778 attribute can be set to a string that is a template for the batch script.
777 This template is instantiated using Itpl. Thus the template can use
779 This template is instantiated using Itpl. Thus the template can use
778 ${n} fot the number of instances. Subclasses can add additional variables
780 ${n} fot the number of instances. Subclasses can add additional variables
779 to the template dict.
781 to the template dict.
780 """
782 """
781
783
782 # Subclasses must fill these in. See PBSEngineSet
784 # Subclasses must fill these in. See PBSEngineSet
783 # The name of the command line program used to submit jobs.
785 # The name of the command line program used to submit jobs.
784 submit_command = List([''], config=True)
786 submit_command = List([''], config=True)
785 # The name of the command line program used to delete jobs.
787 # The name of the command line program used to delete jobs.
786 delete_command = List([''], config=True)
788 delete_command = List([''], config=True)
787 # A regular expression used to get the job id from the output of the
789 # A regular expression used to get the job id from the output of the
788 # submit_command.
790 # submit_command.
789 job_id_regexp = CUnicode('', config=True)
791 job_id_regexp = CUnicode('', config=True)
790 # The string that is the batch script template itself.
792 # The string that is the batch script template itself.
791 batch_template = CUnicode('', config=True)
793 batch_template = CUnicode('', config=True)
792 # The file that contains the batch template
794 # The file that contains the batch template
793 batch_template_file = CUnicode(u'', config=True)
795 batch_template_file = CUnicode(u'', config=True)
794 # The filename of the instantiated batch script.
796 # The filename of the instantiated batch script.
795 batch_file_name = CUnicode(u'batch_script', config=True)
797 batch_file_name = CUnicode(u'batch_script', config=True)
796 # The PBS Queue
798 # The PBS Queue
797 queue = CUnicode(u'', config=True)
799 queue = CUnicode(u'', config=True)
798
800
799 # not configurable, override in subclasses
801 # not configurable, override in subclasses
800 # PBS Job Array regex
802 # PBS Job Array regex
801 job_array_regexp = CUnicode('')
803 job_array_regexp = CUnicode('')
802 job_array_template = CUnicode('')
804 job_array_template = CUnicode('')
803 # PBS Queue regex
805 # PBS Queue regex
804 queue_regexp = CUnicode('')
806 queue_regexp = CUnicode('')
805 queue_template = CUnicode('')
807 queue_template = CUnicode('')
806 # The default batch template, override in subclasses
808 # The default batch template, override in subclasses
807 default_template = CUnicode('')
809 default_template = CUnicode('')
808 # The full path to the instantiated batch script.
810 # The full path to the instantiated batch script.
809 batch_file = CUnicode(u'')
811 batch_file = CUnicode(u'')
810 # the format dict used with batch_template:
812 # the format dict used with batch_template:
811 context = Dict()
813 context = Dict()
812
814
813
815
814 def find_args(self):
816 def find_args(self):
815 return self.submit_command + [self.batch_file]
817 return self.submit_command + [self.batch_file]
816
818
817 def __init__(self, work_dir=u'.', config=None, **kwargs):
819 def __init__(self, work_dir=u'.', config=None, **kwargs):
818 super(BatchSystemLauncher, self).__init__(
820 super(BatchSystemLauncher, self).__init__(
819 work_dir=work_dir, config=config, **kwargs
821 work_dir=work_dir, config=config, **kwargs
820 )
822 )
821 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
823 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
822
824
823 def parse_job_id(self, output):
825 def parse_job_id(self, output):
824 """Take the output of the submit command and return the job id."""
826 """Take the output of the submit command and return the job id."""
825 m = re.search(self.job_id_regexp, output)
827 m = re.search(self.job_id_regexp, output)
826 if m is not None:
828 if m is not None:
827 job_id = m.group()
829 job_id = m.group()
828 else:
830 else:
829 raise LauncherError("Job id couldn't be determined: %s" % output)
831 raise LauncherError("Job id couldn't be determined: %s" % output)
830 self.job_id = job_id
832 self.job_id = job_id
831 self.log.info('Job submitted with job id: %r' % job_id)
833 self.log.info('Job submitted with job id: %r' % job_id)
832 return job_id
834 return job_id
833
835
834 def write_batch_script(self, n):
836 def write_batch_script(self, n):
835 """Instantiate and write the batch script to the work_dir."""
837 """Instantiate and write the batch script to the work_dir."""
836 self.context['n'] = n
838 self.context['n'] = n
837 self.context['queue'] = self.queue
839 self.context['queue'] = self.queue
838 print self.context
840 print self.context
839 # first priority is batch_template if set
841 # first priority is batch_template if set
840 if self.batch_template_file and not self.batch_template:
842 if self.batch_template_file and not self.batch_template:
841 # second priority is batch_template_file
843 # second priority is batch_template_file
842 with open(self.batch_template_file) as f:
844 with open(self.batch_template_file) as f:
843 self.batch_template = f.read()
845 self.batch_template = f.read()
844 if not self.batch_template:
846 if not self.batch_template:
845 # third (last) priority is default_template
847 # third (last) priority is default_template
846 self.batch_template = self.default_template
848 self.batch_template = self.default_template
847
849
848 regex = re.compile(self.job_array_regexp)
850 regex = re.compile(self.job_array_regexp)
849 # print regex.search(self.batch_template)
851 # print regex.search(self.batch_template)
850 if not regex.search(self.batch_template):
852 if not regex.search(self.batch_template):
851 self.log.info("adding job array settings to batch script")
853 self.log.info("adding job array settings to batch script")
852 firstline, rest = self.batch_template.split('\n',1)
854 firstline, rest = self.batch_template.split('\n',1)
853 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
855 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
854
856
855 regex = re.compile(self.queue_regexp)
857 regex = re.compile(self.queue_regexp)
856 # print regex.search(self.batch_template)
858 # print regex.search(self.batch_template)
857 if self.queue and not regex.search(self.batch_template):
859 if self.queue and not regex.search(self.batch_template):
858 self.log.info("adding PBS queue settings to batch script")
860 self.log.info("adding PBS queue settings to batch script")
859 firstline, rest = self.batch_template.split('\n',1)
861 firstline, rest = self.batch_template.split('\n',1)
860 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
862 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
861
863
862 script_as_string = Itpl.itplns(self.batch_template, self.context)
864 script_as_string = Itpl.itplns(self.batch_template, self.context)
863 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
865 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
864
866
865 with open(self.batch_file, 'w') as f:
867 with open(self.batch_file, 'w') as f:
866 f.write(script_as_string)
868 f.write(script_as_string)
867 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
869 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
868
870
869 def start(self, n, cluster_dir):
871 def start(self, n, cluster_dir):
870 """Start n copies of the process using a batch system."""
872 """Start n copies of the process using a batch system."""
871 # Here we save profile and cluster_dir in the context so they
873 # Here we save profile and cluster_dir in the context so they
872 # can be used in the batch script template as ${profile} and
874 # can be used in the batch script template as ${profile} and
873 # ${cluster_dir}
875 # ${cluster_dir}
874 self.context['cluster_dir'] = cluster_dir
876 self.context['cluster_dir'] = cluster_dir
875 self.cluster_dir = unicode(cluster_dir)
877 self.cluster_dir = unicode(cluster_dir)
876 self.write_batch_script(n)
878 self.write_batch_script(n)
877 output = check_output(self.args, env=os.environ)
879 output = check_output(self.args, env=os.environ)
878
880
879 job_id = self.parse_job_id(output)
881 job_id = self.parse_job_id(output)
880 self.notify_start(job_id)
882 self.notify_start(job_id)
881 return job_id
883 return job_id
882
884
883 def stop(self):
885 def stop(self):
884 output = check_output(self.delete_command+[self.job_id], env=os.environ)
886 output = check_output(self.delete_command+[self.job_id], env=os.environ)
885 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
887 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
886 return output
888 return output
887
889
888
890
889 class PBSLauncher(BatchSystemLauncher):
891 class PBSLauncher(BatchSystemLauncher):
890 """A BatchSystemLauncher subclass for PBS."""
892 """A BatchSystemLauncher subclass for PBS."""
891
893
892 submit_command = List(['qsub'], config=True)
894 submit_command = List(['qsub'], config=True)
893 delete_command = List(['qdel'], config=True)
895 delete_command = List(['qdel'], config=True)
894 job_id_regexp = CUnicode(r'\d+', config=True)
896 job_id_regexp = CUnicode(r'\d+', config=True)
895
897
896 batch_file = CUnicode(u'')
898 batch_file = CUnicode(u'')
897 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
899 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
898 job_array_template = CUnicode('#PBS -t 1-$n')
900 job_array_template = CUnicode('#PBS -t 1-$n')
899 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
901 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
900 queue_template = CUnicode('#PBS -q $queue')
902 queue_template = CUnicode('#PBS -q $queue')
901
903
902
904
903 class PBSControllerLauncher(PBSLauncher):
905 class PBSControllerLauncher(PBSLauncher):
904 """Launch a controller using PBS."""
906 """Launch a controller using PBS."""
905
907
906 batch_file_name = CUnicode(u'pbs_controller', config=True)
908 batch_file_name = CUnicode(u'pbs_controller', config=True)
907 default_template= CUnicode("""#!/bin/sh
909 default_template= CUnicode("""#!/bin/sh
908 #PBS -V
910 #PBS -V
909 #PBS -N ipcontroller
911 #PBS -N ipcontroller
910 %s --log-to-file --cluster-dir $cluster_dir
912 %s --log-to-file --cluster-dir $cluster_dir
911 """%(' '.join(ipcontroller_cmd_argv)))
913 """%(' '.join(ipcontroller_cmd_argv)))
912
914
913 def start(self, cluster_dir):
915 def start(self, cluster_dir):
914 """Start the controller by profile or cluster_dir."""
916 """Start the controller by profile or cluster_dir."""
915 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
917 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
916 return super(PBSControllerLauncher, self).start(1, cluster_dir)
918 return super(PBSControllerLauncher, self).start(1, cluster_dir)
917
919
918
920
919 class PBSEngineSetLauncher(PBSLauncher):
921 class PBSEngineSetLauncher(PBSLauncher):
920 """Launch Engines using PBS"""
922 """Launch Engines using PBS"""
921 batch_file_name = CUnicode(u'pbs_engines', config=True)
923 batch_file_name = CUnicode(u'pbs_engines', config=True)
922 default_template= CUnicode(u"""#!/bin/sh
924 default_template= CUnicode(u"""#!/bin/sh
923 #PBS -V
925 #PBS -V
924 #PBS -N ipengine
926 #PBS -N ipengine
925 %s --cluster-dir $cluster_dir
927 %s --cluster-dir $cluster_dir
926 """%(' '.join(ipengine_cmd_argv)))
928 """%(' '.join(ipengine_cmd_argv)))
927
929
928 def start(self, n, cluster_dir):
930 def start(self, n, cluster_dir):
929 """Start n engines by profile or cluster_dir."""
931 """Start n engines by profile or cluster_dir."""
930 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
932 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
931 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
933 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
932
934
933 #SGE is very similar to PBS
935 #SGE is very similar to PBS
934
936
935 class SGELauncher(PBSLauncher):
937 class SGELauncher(PBSLauncher):
936 """Sun GridEngine is a PBS clone with slightly different syntax"""
938 """Sun GridEngine is a PBS clone with slightly different syntax"""
937 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
939 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
938 job_array_template = CUnicode('#$$ -t 1-$n')
940 job_array_template = CUnicode('#$$ -t 1-$n')
939 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
941 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
940 queue_template = CUnicode('#$$ -q $queue')
942 queue_template = CUnicode('#$$ -q $queue')
941
943
942 class SGEControllerLauncher(SGELauncher):
944 class SGEControllerLauncher(SGELauncher):
943 """Launch a controller using SGE."""
945 """Launch a controller using SGE."""
944
946
945 batch_file_name = CUnicode(u'sge_controller', config=True)
947 batch_file_name = CUnicode(u'sge_controller', config=True)
946 default_template= CUnicode(u"""#$$ -V
948 default_template= CUnicode(u"""#$$ -V
947 #$$ -S /bin/sh
949 #$$ -S /bin/sh
948 #$$ -N ipcontroller
950 #$$ -N ipcontroller
949 %s --log-to-file --cluster-dir $cluster_dir
951 %s --log-to-file --cluster-dir $cluster_dir
950 """%(' '.join(ipcontroller_cmd_argv)))
952 """%(' '.join(ipcontroller_cmd_argv)))
951
953
952 def start(self, cluster_dir):
954 def start(self, cluster_dir):
953 """Start the controller by profile or cluster_dir."""
955 """Start the controller by profile or cluster_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
956 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, cluster_dir)
957 return super(PBSControllerLauncher, self).start(1, cluster_dir)
956
958
957 class SGEEngineSetLauncher(SGELauncher):
959 class SGEEngineSetLauncher(SGELauncher):
958 """Launch Engines with SGE"""
960 """Launch Engines with SGE"""
959 batch_file_name = CUnicode(u'sge_engines', config=True)
961 batch_file_name = CUnicode(u'sge_engines', config=True)
960 default_template = CUnicode("""#$$ -V
962 default_template = CUnicode("""#$$ -V
961 #$$ -S /bin/sh
963 #$$ -S /bin/sh
962 #$$ -N ipengine
964 #$$ -N ipengine
963 %s --cluster-dir $cluster_dir
965 %s --cluster-dir $cluster_dir
964 """%(' '.join(ipengine_cmd_argv)))
966 """%(' '.join(ipengine_cmd_argv)))
965
967
966 def start(self, n, cluster_dir):
968 def start(self, n, cluster_dir):
967 """Start n engines by profile or cluster_dir."""
969 """Start n engines by profile or cluster_dir."""
968 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
970 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
969 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
971 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
970
972
971
973
972 #-----------------------------------------------------------------------------
974 #-----------------------------------------------------------------------------
973 # A launcher for ipcluster itself!
975 # A launcher for ipcluster itself!
974 #-----------------------------------------------------------------------------
976 #-----------------------------------------------------------------------------
975
977
976
978
977 class IPClusterLauncher(LocalProcessLauncher):
979 class IPClusterLauncher(LocalProcessLauncher):
978 """Launch the ipcluster program in an external process."""
980 """Launch the ipcluster program in an external process."""
979
981
980 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
982 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
981 # Command line arguments to pass to ipcluster.
983 # Command line arguments to pass to ipcluster.
982 ipcluster_args = List(
984 ipcluster_args = List(
983 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
985 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
984 ipcluster_subcommand = Str('start')
986 ipcluster_subcommand = Str('start')
985 ipcluster_n = Int(2)
987 ipcluster_n = Int(2)
986
988
987 def find_args(self):
989 def find_args(self):
988 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
990 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
989 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
991 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
990
992
991 def start(self):
993 def start(self):
992 self.log.info("Starting ipcluster: %r" % self.args)
994 self.log.info("Starting ipcluster: %r" % self.args)
993 return super(IPClusterLauncher, self).start()
995 return super(IPClusterLauncher, self).start()
994
996
@@ -1,316 +1,314 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Job and task components for writing .xml files that the Windows HPC Server
4 Job and task components for writing .xml files that the Windows HPC Server
5 2008 can use to start jobs.
5 2008 can use to start jobs.
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
9 # Copyright (C) 2008-2009 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import with_statement
20
21 import os
19 import os
22 import re
20 import re
23 import uuid
21 import uuid
24
22
25 from xml.etree import ElementTree as ET
23 from xml.etree import ElementTree as ET
26
24
27 from IPython.config.configurable import Configurable
25 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import (
26 from IPython.utils.traitlets import (
29 Str, Int, List, Instance,
27 Str, Int, List, Instance,
30 Enum, Bool, CStr
28 Enum, Bool, CStr
31 )
29 )
32
30
33 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
34 # Job and Task classes
32 # Job and Task classes
35 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
36
34
37
35
38 def as_str(value):
36 def as_str(value):
39 if isinstance(value, str):
37 if isinstance(value, str):
40 return value
38 return value
41 elif isinstance(value, bool):
39 elif isinstance(value, bool):
42 if value:
40 if value:
43 return 'true'
41 return 'true'
44 else:
42 else:
45 return 'false'
43 return 'false'
46 elif isinstance(value, (int, float)):
44 elif isinstance(value, (int, float)):
47 return repr(value)
45 return repr(value)
48 else:
46 else:
49 return value
47 return value
50
48
51
49
52 def indent(elem, level=0):
50 def indent(elem, level=0):
53 i = "\n" + level*" "
51 i = "\n" + level*" "
54 if len(elem):
52 if len(elem):
55 if not elem.text or not elem.text.strip():
53 if not elem.text or not elem.text.strip():
56 elem.text = i + " "
54 elem.text = i + " "
57 if not elem.tail or not elem.tail.strip():
55 if not elem.tail or not elem.tail.strip():
58 elem.tail = i
56 elem.tail = i
59 for elem in elem:
57 for elem in elem:
60 indent(elem, level+1)
58 indent(elem, level+1)
61 if not elem.tail or not elem.tail.strip():
59 if not elem.tail or not elem.tail.strip():
62 elem.tail = i
60 elem.tail = i
63 else:
61 else:
64 if level and (not elem.tail or not elem.tail.strip()):
62 if level and (not elem.tail or not elem.tail.strip()):
65 elem.tail = i
63 elem.tail = i
66
64
67
65
68 def find_username():
66 def find_username():
69 domain = os.environ.get('USERDOMAIN')
67 domain = os.environ.get('USERDOMAIN')
70 username = os.environ.get('USERNAME','')
68 username = os.environ.get('USERNAME','')
71 if domain is None:
69 if domain is None:
72 return username
70 return username
73 else:
71 else:
74 return '%s\\%s' % (domain, username)
72 return '%s\\%s' % (domain, username)
75
73
76
74
77 class WinHPCJob(Configurable):
75 class WinHPCJob(Configurable):
78
76
79 job_id = Str('')
77 job_id = Str('')
80 job_name = Str('MyJob', config=True)
78 job_name = Str('MyJob', config=True)
81 min_cores = Int(1, config=True)
79 min_cores = Int(1, config=True)
82 max_cores = Int(1, config=True)
80 max_cores = Int(1, config=True)
83 min_sockets = Int(1, config=True)
81 min_sockets = Int(1, config=True)
84 max_sockets = Int(1, config=True)
82 max_sockets = Int(1, config=True)
85 min_nodes = Int(1, config=True)
83 min_nodes = Int(1, config=True)
86 max_nodes = Int(1, config=True)
84 max_nodes = Int(1, config=True)
87 unit_type = Str("Core", config=True)
85 unit_type = Str("Core", config=True)
88 auto_calculate_min = Bool(True, config=True)
86 auto_calculate_min = Bool(True, config=True)
89 auto_calculate_max = Bool(True, config=True)
87 auto_calculate_max = Bool(True, config=True)
90 run_until_canceled = Bool(False, config=True)
88 run_until_canceled = Bool(False, config=True)
91 is_exclusive = Bool(False, config=True)
89 is_exclusive = Bool(False, config=True)
92 username = Str(find_username(), config=True)
90 username = Str(find_username(), config=True)
93 job_type = Str('Batch', config=True)
91 job_type = Str('Batch', config=True)
94 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
92 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
95 default_value='Highest', config=True)
93 default_value='Highest', config=True)
96 requested_nodes = Str('', config=True)
94 requested_nodes = Str('', config=True)
97 project = Str('IPython', config=True)
95 project = Str('IPython', config=True)
98 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
96 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
99 version = Str("2.000")
97 version = Str("2.000")
100 tasks = List([])
98 tasks = List([])
101
99
102 @property
100 @property
103 def owner(self):
101 def owner(self):
104 return self.username
102 return self.username
105
103
106 def _write_attr(self, root, attr, key):
104 def _write_attr(self, root, attr, key):
107 s = as_str(getattr(self, attr, ''))
105 s = as_str(getattr(self, attr, ''))
108 if s:
106 if s:
109 root.set(key, s)
107 root.set(key, s)
110
108
111 def as_element(self):
109 def as_element(self):
112 # We have to add _A_ type things to get the right order than
110 # We have to add _A_ type things to get the right order than
113 # the MSFT XML parser expects.
111 # the MSFT XML parser expects.
114 root = ET.Element('Job')
112 root = ET.Element('Job')
115 self._write_attr(root, 'version', '_A_Version')
113 self._write_attr(root, 'version', '_A_Version')
116 self._write_attr(root, 'job_name', '_B_Name')
114 self._write_attr(root, 'job_name', '_B_Name')
117 self._write_attr(root, 'unit_type', '_C_UnitType')
115 self._write_attr(root, 'unit_type', '_C_UnitType')
118 self._write_attr(root, 'min_cores', '_D_MinCores')
116 self._write_attr(root, 'min_cores', '_D_MinCores')
119 self._write_attr(root, 'max_cores', '_E_MaxCores')
117 self._write_attr(root, 'max_cores', '_E_MaxCores')
120 self._write_attr(root, 'min_sockets', '_F_MinSockets')
118 self._write_attr(root, 'min_sockets', '_F_MinSockets')
121 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
119 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
122 self._write_attr(root, 'min_nodes', '_H_MinNodes')
120 self._write_attr(root, 'min_nodes', '_H_MinNodes')
123 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
121 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
124 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
122 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
125 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
123 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
126 self._write_attr(root, 'username', '_L_UserName')
124 self._write_attr(root, 'username', '_L_UserName')
127 self._write_attr(root, 'job_type', '_M_JobType')
125 self._write_attr(root, 'job_type', '_M_JobType')
128 self._write_attr(root, 'priority', '_N_Priority')
126 self._write_attr(root, 'priority', '_N_Priority')
129 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
127 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
130 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
128 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
131 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
129 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
132 self._write_attr(root, 'project', '_R_Project')
130 self._write_attr(root, 'project', '_R_Project')
133 self._write_attr(root, 'owner', '_S_Owner')
131 self._write_attr(root, 'owner', '_S_Owner')
134 self._write_attr(root, 'xmlns', '_T_xmlns')
132 self._write_attr(root, 'xmlns', '_T_xmlns')
135 dependencies = ET.SubElement(root, "Dependencies")
133 dependencies = ET.SubElement(root, "Dependencies")
136 etasks = ET.SubElement(root, "Tasks")
134 etasks = ET.SubElement(root, "Tasks")
137 for t in self.tasks:
135 for t in self.tasks:
138 etasks.append(t.as_element())
136 etasks.append(t.as_element())
139 return root
137 return root
140
138
141 def tostring(self):
139 def tostring(self):
142 """Return the string representation of the job description XML."""
140 """Return the string representation of the job description XML."""
143 root = self.as_element()
141 root = self.as_element()
144 indent(root)
142 indent(root)
145 txt = ET.tostring(root, encoding="utf-8")
143 txt = ET.tostring(root, encoding="utf-8")
146 # Now remove the tokens used to order the attributes.
144 # Now remove the tokens used to order the attributes.
147 txt = re.sub(r'_[A-Z]_','',txt)
145 txt = re.sub(r'_[A-Z]_','',txt)
148 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
146 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
149 return txt
147 return txt
150
148
151 def write(self, filename):
149 def write(self, filename):
152 """Write the XML job description to a file."""
150 """Write the XML job description to a file."""
153 txt = self.tostring()
151 txt = self.tostring()
154 with open(filename, 'w') as f:
152 with open(filename, 'w') as f:
155 f.write(txt)
153 f.write(txt)
156
154
157 def add_task(self, task):
155 def add_task(self, task):
158 """Add a task to the job.
156 """Add a task to the job.
159
157
160 Parameters
158 Parameters
161 ----------
159 ----------
162 task : :class:`WinHPCTask`
160 task : :class:`WinHPCTask`
163 The task object to add.
161 The task object to add.
164 """
162 """
165 self.tasks.append(task)
163 self.tasks.append(task)
166
164
167
165
168 class WinHPCTask(Configurable):
166 class WinHPCTask(Configurable):
169
167
170 task_id = Str('')
168 task_id = Str('')
171 task_name = Str('')
169 task_name = Str('')
172 version = Str("2.000")
170 version = Str("2.000")
173 min_cores = Int(1, config=True)
171 min_cores = Int(1, config=True)
174 max_cores = Int(1, config=True)
172 max_cores = Int(1, config=True)
175 min_sockets = Int(1, config=True)
173 min_sockets = Int(1, config=True)
176 max_sockets = Int(1, config=True)
174 max_sockets = Int(1, config=True)
177 min_nodes = Int(1, config=True)
175 min_nodes = Int(1, config=True)
178 max_nodes = Int(1, config=True)
176 max_nodes = Int(1, config=True)
179 unit_type = Str("Core", config=True)
177 unit_type = Str("Core", config=True)
180 command_line = CStr('', config=True)
178 command_line = CStr('', config=True)
181 work_directory = CStr('', config=True)
179 work_directory = CStr('', config=True)
182 is_rerunnaable = Bool(True, config=True)
180 is_rerunnaable = Bool(True, config=True)
183 std_out_file_path = CStr('', config=True)
181 std_out_file_path = CStr('', config=True)
184 std_err_file_path = CStr('', config=True)
182 std_err_file_path = CStr('', config=True)
185 is_parametric = Bool(False, config=True)
183 is_parametric = Bool(False, config=True)
186 environment_variables = Instance(dict, args=(), config=True)
184 environment_variables = Instance(dict, args=(), config=True)
187
185
188 def _write_attr(self, root, attr, key):
186 def _write_attr(self, root, attr, key):
189 s = as_str(getattr(self, attr, ''))
187 s = as_str(getattr(self, attr, ''))
190 if s:
188 if s:
191 root.set(key, s)
189 root.set(key, s)
192
190
193 def as_element(self):
191 def as_element(self):
194 root = ET.Element('Task')
192 root = ET.Element('Task')
195 self._write_attr(root, 'version', '_A_Version')
193 self._write_attr(root, 'version', '_A_Version')
196 self._write_attr(root, 'task_name', '_B_Name')
194 self._write_attr(root, 'task_name', '_B_Name')
197 self._write_attr(root, 'min_cores', '_C_MinCores')
195 self._write_attr(root, 'min_cores', '_C_MinCores')
198 self._write_attr(root, 'max_cores', '_D_MaxCores')
196 self._write_attr(root, 'max_cores', '_D_MaxCores')
199 self._write_attr(root, 'min_sockets', '_E_MinSockets')
197 self._write_attr(root, 'min_sockets', '_E_MinSockets')
200 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
198 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
201 self._write_attr(root, 'min_nodes', '_G_MinNodes')
199 self._write_attr(root, 'min_nodes', '_G_MinNodes')
202 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
200 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
203 self._write_attr(root, 'command_line', '_I_CommandLine')
201 self._write_attr(root, 'command_line', '_I_CommandLine')
204 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
202 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
205 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
203 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
206 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
204 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
207 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
205 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
208 self._write_attr(root, 'is_parametric', '_N_IsParametric')
206 self._write_attr(root, 'is_parametric', '_N_IsParametric')
209 self._write_attr(root, 'unit_type', '_O_UnitType')
207 self._write_attr(root, 'unit_type', '_O_UnitType')
210 root.append(self.get_env_vars())
208 root.append(self.get_env_vars())
211 return root
209 return root
212
210
213 def get_env_vars(self):
211 def get_env_vars(self):
214 env_vars = ET.Element('EnvironmentVariables')
212 env_vars = ET.Element('EnvironmentVariables')
215 for k, v in self.environment_variables.iteritems():
213 for k, v in self.environment_variables.iteritems():
216 variable = ET.SubElement(env_vars, "Variable")
214 variable = ET.SubElement(env_vars, "Variable")
217 name = ET.SubElement(variable, "Name")
215 name = ET.SubElement(variable, "Name")
218 name.text = k
216 name.text = k
219 value = ET.SubElement(variable, "Value")
217 value = ET.SubElement(variable, "Value")
220 value.text = v
218 value.text = v
221 return env_vars
219 return env_vars
222
220
223
221
224
222
225 # By declaring these, we can configure the controller and engine separately!
223 # By declaring these, we can configure the controller and engine separately!
226
224
227 class IPControllerJob(WinHPCJob):
225 class IPControllerJob(WinHPCJob):
228 job_name = Str('IPController', config=False)
226 job_name = Str('IPController', config=False)
229 is_exclusive = Bool(False, config=True)
227 is_exclusive = Bool(False, config=True)
230 username = Str(find_username(), config=True)
228 username = Str(find_username(), config=True)
231 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
229 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
232 default_value='Highest', config=True)
230 default_value='Highest', config=True)
233 requested_nodes = Str('', config=True)
231 requested_nodes = Str('', config=True)
234 project = Str('IPython', config=True)
232 project = Str('IPython', config=True)
235
233
236
234
237 class IPEngineSetJob(WinHPCJob):
235 class IPEngineSetJob(WinHPCJob):
238 job_name = Str('IPEngineSet', config=False)
236 job_name = Str('IPEngineSet', config=False)
239 is_exclusive = Bool(False, config=True)
237 is_exclusive = Bool(False, config=True)
240 username = Str(find_username(), config=True)
238 username = Str(find_username(), config=True)
241 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
239 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
242 default_value='Highest', config=True)
240 default_value='Highest', config=True)
243 requested_nodes = Str('', config=True)
241 requested_nodes = Str('', config=True)
244 project = Str('IPython', config=True)
242 project = Str('IPython', config=True)
245
243
246
244
247 class IPControllerTask(WinHPCTask):
245 class IPControllerTask(WinHPCTask):
248
246
249 task_name = Str('IPController', config=True)
247 task_name = Str('IPController', config=True)
250 controller_cmd = List(['ipcontroller.exe'], config=True)
248 controller_cmd = List(['ipcontroller.exe'], config=True)
251 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
249 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
252 # I don't want these to be configurable
250 # I don't want these to be configurable
253 std_out_file_path = CStr('', config=False)
251 std_out_file_path = CStr('', config=False)
254 std_err_file_path = CStr('', config=False)
252 std_err_file_path = CStr('', config=False)
255 min_cores = Int(1, config=False)
253 min_cores = Int(1, config=False)
256 max_cores = Int(1, config=False)
254 max_cores = Int(1, config=False)
257 min_sockets = Int(1, config=False)
255 min_sockets = Int(1, config=False)
258 max_sockets = Int(1, config=False)
256 max_sockets = Int(1, config=False)
259 min_nodes = Int(1, config=False)
257 min_nodes = Int(1, config=False)
260 max_nodes = Int(1, config=False)
258 max_nodes = Int(1, config=False)
261 unit_type = Str("Core", config=False)
259 unit_type = Str("Core", config=False)
262 work_directory = CStr('', config=False)
260 work_directory = CStr('', config=False)
263
261
264 def __init__(self, config=None):
262 def __init__(self, config=None):
265 super(IPControllerTask, self).__init__(config=config)
263 super(IPControllerTask, self).__init__(config=config)
266 the_uuid = uuid.uuid1()
264 the_uuid = uuid.uuid1()
267 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
265 self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
268 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
266 self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
269
267
270 @property
268 @property
271 def command_line(self):
269 def command_line(self):
272 return ' '.join(self.controller_cmd + self.controller_args)
270 return ' '.join(self.controller_cmd + self.controller_args)
273
271
274
272
275 class IPEngineTask(WinHPCTask):
273 class IPEngineTask(WinHPCTask):
276
274
277 task_name = Str('IPEngine', config=True)
275 task_name = Str('IPEngine', config=True)
278 engine_cmd = List(['ipengine.exe'], config=True)
276 engine_cmd = List(['ipengine.exe'], config=True)
279 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
277 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
280 # I don't want these to be configurable
278 # I don't want these to be configurable
281 std_out_file_path = CStr('', config=False)
279 std_out_file_path = CStr('', config=False)
282 std_err_file_path = CStr('', config=False)
280 std_err_file_path = CStr('', config=False)
283 min_cores = Int(1, config=False)
281 min_cores = Int(1, config=False)
284 max_cores = Int(1, config=False)
282 max_cores = Int(1, config=False)
285 min_sockets = Int(1, config=False)
283 min_sockets = Int(1, config=False)
286 max_sockets = Int(1, config=False)
284 max_sockets = Int(1, config=False)
287 min_nodes = Int(1, config=False)
285 min_nodes = Int(1, config=False)
288 max_nodes = Int(1, config=False)
286 max_nodes = Int(1, config=False)
289 unit_type = Str("Core", config=False)
287 unit_type = Str("Core", config=False)
290 work_directory = CStr('', config=False)
288 work_directory = CStr('', config=False)
291
289
292 def __init__(self, config=None):
290 def __init__(self, config=None):
293 super(IPEngineTask,self).__init__(config=config)
291 super(IPEngineTask,self).__init__(config=config)
294 the_uuid = uuid.uuid1()
292 the_uuid = uuid.uuid1()
295 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
293 self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
296 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
294 self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
297
295
298 @property
296 @property
299 def command_line(self):
297 def command_line(self):
300 return ' '.join(self.engine_cmd + self.engine_args)
298 return ' '.join(self.engine_cmd + self.engine_args)
301
299
302
300
303 # j = WinHPCJob(None)
301 # j = WinHPCJob(None)
304 # j.job_name = 'IPCluster'
302 # j.job_name = 'IPCluster'
305 # j.username = 'GNET\\bgranger'
303 # j.username = 'GNET\\bgranger'
306 # j.requested_nodes = 'GREEN'
304 # j.requested_nodes = 'GREEN'
307 #
305 #
308 # t = WinHPCTask(None)
306 # t = WinHPCTask(None)
309 # t.task_name = 'Controller'
307 # t.task_name = 'Controller'
310 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
308 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
311 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
309 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
312 # t.std_out_file_path = 'controller-out.txt'
310 # t.std_out_file_path = 'controller-out.txt'
313 # t.std_err_file_path = 'controller-err.txt'
311 # t.std_err_file_path = 'controller-err.txt'
314 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
312 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
315 # j.add_task(t)
313 # j.add_task(t)
316
314
@@ -1,80 +1,107 b''
1 """toplevel setup/teardown for parallel tests."""
1 """toplevel setup/teardown for parallel tests."""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import tempfile
15 import tempfile
16 import time
16 import time
17 from subprocess import Popen, PIPE, STDOUT
17 from subprocess import Popen
18
18
19 from IPython.utils.path import get_ipython_dir
19 from IPython.utils.path import get_ipython_dir
20 from IPython.parallel import Client
20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL)
21
25
22 processes = []
26 # globals
23 blackhole = tempfile.TemporaryFile()
27 launchers = []
28 blackhole = open(os.devnull, 'w')
29
30 # Launcher class
31 class TestProcessLauncher(LocalProcessLauncher):
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 def start(self):
34 if self.state == 'before':
35 self.process = Popen(self.args,
36 stdout=blackhole, stderr=blackhole,
37 env=os.environ,
38 cwd=self.work_dir
39 )
40 self.notify_start(self.process.pid)
41 self.poll = self.process.poll
42 else:
43 s = 'The process was already started and has state: %r' % self.state
44 raise ProcessStateError(s)
24
45
25 # nose setup/teardown
46 # nose setup/teardown
26
47
27 def setup():
48 def setup():
28 cp = Popen('ipcontroller --profile iptest -r --log-level 10 --log-to-file --usethreads'.split(), stdout=blackhole, stderr=STDOUT)
49 cp = TestProcessLauncher()
29 processes.append(cp)
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
30 engine_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-engine.json')
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
31 client_json = os.path.join(get_ipython_dir(), 'cluster_iptest', 'security', 'ipcontroller-client.json')
52 cp.start()
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
55 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
56 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
32 tic = time.time()
57 tic = time.time()
33 while not os.path.exists(engine_json) or not os.path.exists(client_json):
58 while not os.path.exists(engine_json) or not os.path.exists(client_json):
34 if cp.poll() is not None:
59 if cp.poll() is not None:
35 print cp.poll()
60 print cp.poll()
36 raise RuntimeError("The test controller failed to start.")
61 raise RuntimeError("The test controller failed to start.")
37 elif time.time()-tic > 10:
62 elif time.time()-tic > 10:
38 raise RuntimeError("Timeout waiting for the test controller to start.")
63 raise RuntimeError("Timeout waiting for the test controller to start.")
39 time.sleep(0.1)
64 time.sleep(0.1)
40 add_engines(1)
65 add_engines(1)
41
66
42 def add_engines(n=1, profile='iptest'):
67 def add_engines(n=1, profile='iptest'):
43 rc = Client(profile=profile)
68 rc = Client(profile=profile)
44 base = len(rc)
69 base = len(rc)
45 eps = []
70 eps = []
46 for i in range(n):
71 for i in range(n):
47 ep = Popen(['ipengine']+ ['--profile', profile, '--log-level', '10', '--log-to-file'], stdout=blackhole, stderr=STDOUT)
72 ep = TestProcessLauncher()
48 # ep.start()
73 ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
49 processes.append(ep)
74 ep.start()
75 launchers.append(ep)
50 eps.append(ep)
76 eps.append(ep)
51 tic = time.time()
77 tic = time.time()
52 while len(rc) < base+n:
78 while len(rc) < base+n:
53 if any([ ep.poll() is not None for ep in eps ]):
79 if any([ ep.poll() is not None for ep in eps ]):
54 raise RuntimeError("A test engine failed to start.")
80 raise RuntimeError("A test engine failed to start.")
55 elif time.time()-tic > 10:
81 elif time.time()-tic > 10:
56 raise RuntimeError("Timeout waiting for engines to connect.")
82 raise RuntimeError("Timeout waiting for engines to connect.")
57 time.sleep(.1)
83 time.sleep(.1)
58 rc.spin()
84 rc.spin()
59 rc.close()
85 rc.close()
60 return eps
86 return eps
61
87
62 def teardown():
88 def teardown():
63 time.sleep(1)
89 time.sleep(1)
64 while processes:
90 while launchers:
65 p = processes.pop()
91 p = launchers.pop()
66 if p.poll() is None:
92 if p.poll() is None:
67 try:
93 try:
68 p.terminate()
94 p.stop()
69 except Exception, e:
95 except Exception, e:
70 print e
96 print e
71 pass
97 pass
72 if p.poll() is None:
98 if p.poll() is None:
73 time.sleep(.25)
99 time.sleep(.25)
74 if p.poll() is None:
100 if p.poll() is None:
75 try:
101 try:
76 print 'killing'
102 print 'cleaning up test process...'
77 p.kill()
103 p.signal(SIGKILL)
78 except:
104 except:
79 print "couldn't shutdown process: ", p
105 print "couldn't shutdown process: ", p
106 blackhole.close()
80
107
@@ -1,128 +1,129 b''
1 """base class for parallel client tests"""
1 """base class for parallel client tests"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12 import time
12 import time
13
13
14 from nose import SkipTest
14 from nose import SkipTest
15
15
16 import zmq
16 import zmq
17 from zmq.tests import BaseZMQTestCase
17 from zmq.tests import BaseZMQTestCase
18
18
19 from IPython.external.decorator import decorator
19 from IPython.external.decorator import decorator
20
20
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import Client
22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23
24 from IPython.parallel.tests import launchers, add_engines
24
25
25 # simple tasks for use in apply tests
26 # simple tasks for use in apply tests
26
27
27 def segfault():
28 def segfault():
28 """this will segfault"""
29 """this will segfault"""
29 import ctypes
30 import ctypes
30 ctypes.memset(-1,0,1)
31 ctypes.memset(-1,0,1)
31
32
32 def crash():
33 def crash():
33 """from stdlib crashers in the test suite"""
34 """from stdlib crashers in the test suite"""
34 import types
35 import types
35 if sys.platform.startswith('win'):
36 if sys.platform.startswith('win'):
36 import ctypes
37 import ctypes
37 ctypes.windll.kernel32.SetErrorMode(0x0002);
38 ctypes.windll.kernel32.SetErrorMode(0x0002);
38
39
39 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
40 co = types.CodeType(0, 0, 0, 0, b'\x04\x71\x00\x00',
40 (), (), (), '', '', 1, b'')
41 (), (), (), '', '', 1, b'')
41 exec(co)
42 exec(co)
42
43
43 def wait(n):
44 def wait(n):
44 """sleep for a time"""
45 """sleep for a time"""
45 import time
46 import time
46 time.sleep(n)
47 time.sleep(n)
47 return n
48 return n
48
49
49 def raiser(eclass):
50 def raiser(eclass):
50 """raise an exception"""
51 """raise an exception"""
51 raise eclass()
52 raise eclass()
52
53
53 # test decorator for skipping tests when libraries are unavailable
54 # test decorator for skipping tests when libraries are unavailable
54 def skip_without(*names):
55 def skip_without(*names):
55 """skip a test if some names are not importable"""
56 """skip a test if some names are not importable"""
56 @decorator
57 @decorator
57 def skip_without_names(f, *args, **kwargs):
58 def skip_without_names(f, *args, **kwargs):
58 """decorator to skip tests in the absence of numpy."""
59 """decorator to skip tests in the absence of numpy."""
59 for name in names:
60 for name in names:
60 try:
61 try:
61 __import__(name)
62 __import__(name)
62 except ImportError:
63 except ImportError:
63 raise SkipTest
64 raise SkipTest
64 return f(*args, **kwargs)
65 return f(*args, **kwargs)
65 return skip_without_names
66 return skip_without_names
66
67
67 class ClusterTestCase(BaseZMQTestCase):
68 class ClusterTestCase(BaseZMQTestCase):
68
69
69 def add_engines(self, n=1, block=True):
70 def add_engines(self, n=1, block=True):
70 """add multiple engines to our cluster"""
71 """add multiple engines to our cluster"""
71 self.engines.extend(add_engines(n))
72 self.engines.extend(add_engines(n))
72 if block:
73 if block:
73 self.wait_on_engines()
74 self.wait_on_engines()
74
75
75 def wait_on_engines(self, timeout=5):
76 def wait_on_engines(self, timeout=5):
76 """wait for our engines to connect."""
77 """wait for our engines to connect."""
77 n = len(self.engines)+self.base_engine_count
78 n = len(self.engines)+self.base_engine_count
78 tic = time.time()
79 tic = time.time()
79 while time.time()-tic < timeout and len(self.client.ids) < n:
80 while time.time()-tic < timeout and len(self.client.ids) < n:
80 time.sleep(0.1)
81 time.sleep(0.1)
81
82
82 assert not len(self.client.ids) < n, "waiting for engines timed out"
83 assert not len(self.client.ids) < n, "waiting for engines timed out"
83
84
84 def connect_client(self):
85 def connect_client(self):
85 """connect a client with my Context, and track its sockets for cleanup"""
86 """connect a client with my Context, and track its sockets for cleanup"""
86 c = Client(profile='iptest', context=self.context)
87 c = Client(profile='iptest', context=self.context)
87 for name in filter(lambda n:n.endswith('socket'), dir(c)):
88 for name in filter(lambda n:n.endswith('socket'), dir(c)):
88 s = getattr(c, name)
89 s = getattr(c, name)
89 s.setsockopt(zmq.LINGER, 0)
90 s.setsockopt(zmq.LINGER, 0)
90 self.sockets.append(s)
91 self.sockets.append(s)
91 return c
92 return c
92
93
93 def assertRaisesRemote(self, etype, f, *args, **kwargs):
94 def assertRaisesRemote(self, etype, f, *args, **kwargs):
94 try:
95 try:
95 try:
96 try:
96 f(*args, **kwargs)
97 f(*args, **kwargs)
97 except error.CompositeError as e:
98 except error.CompositeError as e:
98 e.raise_exception()
99 e.raise_exception()
99 except error.RemoteError as e:
100 except error.RemoteError as e:
100 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
101 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
101 else:
102 else:
102 self.fail("should have raised a RemoteError")
103 self.fail("should have raised a RemoteError")
103
104
104 def setUp(self):
105 def setUp(self):
105 BaseZMQTestCase.setUp(self)
106 BaseZMQTestCase.setUp(self)
106 self.client = self.connect_client()
107 self.client = self.connect_client()
107 # start every test with clean engine namespaces:
108 # start every test with clean engine namespaces:
108 self.client.clear(block=True)
109 self.client.clear(block=True)
109 self.base_engine_count=len(self.client.ids)
110 self.base_engine_count=len(self.client.ids)
110 self.engines=[]
111 self.engines=[]
111
112
112 def tearDown(self):
113 def tearDown(self):
113 # self.client.clear(block=True)
114 # self.client.clear(block=True)
114 # close fds:
115 # close fds:
115 for e in filter(lambda e: e.poll() is not None, processes):
116 for e in filter(lambda e: e.poll() is not None, launchers):
116 processes.remove(e)
117 launchers.remove(e)
117
118
118 # allow flushing of incoming messages to prevent crash on socket close
119 # allow flushing of incoming messages to prevent crash on socket close
119 self.client.wait(timeout=2)
120 self.client.wait(timeout=2)
120 # time.sleep(2)
121 # time.sleep(2)
121 self.client.spin()
122 self.client.spin()
122 self.client.close()
123 self.client.close()
123 BaseZMQTestCase.tearDown(self)
124 BaseZMQTestCase.tearDown(self)
124 # this will be redundant when pyzmq merges PR #88
125 # this will be redundant when pyzmq merges PR #88
125 # self.context.term()
126 # self.context.term()
126 # print tempfile.TemporaryFile().fileno(),
127 # print tempfile.TemporaryFile().fileno(),
127 # sys.stdout.flush()
128 # sys.stdout.flush()
128 No newline at end of file
129
@@ -1,69 +1,68 b''
1 """Tests for asyncresult.py"""
1 """Tests for asyncresult.py"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14
14
15 from IPython.parallel.error import TimeoutError
15 from IPython.parallel.error import TimeoutError
16
16
17 from IPython.parallel.tests import add_engines
17 from IPython.parallel.tests import add_engines
18 from .clienttest import ClusterTestCase
18 from .clienttest import ClusterTestCase
19
19
20 def setup():
20 def setup():
21 add_engines(2)
21 add_engines(2)
22
22
23 def wait(n):
23 def wait(n):
24 import time
24 import time
25 time.sleep(n)
25 time.sleep(n)
26 return n
26 return n
27
27
28 class AsyncResultTest(ClusterTestCase):
28 class AsyncResultTest(ClusterTestCase):
29
29
30 def test_single_result(self):
30 def test_single_result(self):
31 eid = self.client.ids[-1]
31 eid = self.client.ids[-1]
32 ar = self.client[eid].apply_async(lambda : 42)
32 ar = self.client[eid].apply_async(lambda : 42)
33 self.assertEquals(ar.get(), 42)
33 self.assertEquals(ar.get(), 42)
34 ar = self.client[[eid]].apply_async(lambda : 42)
34 ar = self.client[[eid]].apply_async(lambda : 42)
35 self.assertEquals(ar.get(), [42])
35 self.assertEquals(ar.get(), [42])
36 ar = self.client[-1:].apply_async(lambda : 42)
36 ar = self.client[-1:].apply_async(lambda : 42)
37 self.assertEquals(ar.get(), [42])
37 self.assertEquals(ar.get(), [42])
38
38
39 def test_get_after_done(self):
39 def test_get_after_done(self):
40 ar = self.client[-1].apply_async(lambda : 42)
40 ar = self.client[-1].apply_async(lambda : 42)
41 self.assertFalse(ar.ready())
42 ar.wait()
41 ar.wait()
43 self.assertTrue(ar.ready())
42 self.assertTrue(ar.ready())
44 self.assertEquals(ar.get(), 42)
43 self.assertEquals(ar.get(), 42)
45 self.assertEquals(ar.get(), 42)
44 self.assertEquals(ar.get(), 42)
46
45
47 def test_get_before_done(self):
46 def test_get_before_done(self):
48 ar = self.client[-1].apply_async(wait, 0.1)
47 ar = self.client[-1].apply_async(wait, 0.1)
49 self.assertRaises(TimeoutError, ar.get, 0)
48 self.assertRaises(TimeoutError, ar.get, 0)
50 ar.wait(0)
49 ar.wait(0)
51 self.assertFalse(ar.ready())
50 self.assertFalse(ar.ready())
52 self.assertEquals(ar.get(), 0.1)
51 self.assertEquals(ar.get(), 0.1)
53
52
54 def test_get_after_error(self):
53 def test_get_after_error(self):
55 ar = self.client[-1].apply_async(lambda : 1/0)
54 ar = self.client[-1].apply_async(lambda : 1/0)
56 ar.wait()
55 ar.wait()
57 self.assertRaisesRemote(ZeroDivisionError, ar.get)
56 self.assertRaisesRemote(ZeroDivisionError, ar.get)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get)
57 self.assertRaisesRemote(ZeroDivisionError, ar.get)
59 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
58 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
60
59
61 def test_get_dict(self):
60 def test_get_dict(self):
62 n = len(self.client)
61 n = len(self.client)
63 ar = self.client[:].apply_async(lambda : 5)
62 ar = self.client[:].apply_async(lambda : 5)
64 self.assertEquals(ar.get(), [5]*n)
63 self.assertEquals(ar.get(), [5]*n)
65 d = ar.get_dict()
64 d = ar.get_dict()
66 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
65 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
67 for eid,r in d.iteritems():
66 for eid,r in d.iteritems():
68 self.assertEquals(r, 5)
67 self.assertEquals(r, 5)
69
68
General Comments 0
You need to be logged in to leave comments. Login now