##// END OF EJS Templates
Merge pull request #3351 from minrk/sshproxy...
Matthias Bussonnier -
r10813:7f805f26 merge
parent child Browse files
Show More
@@ -0,0 +1,179
1 """Tests for launchers
2
3 Doesn't actually start any subprocesses, but goes through the motions of constructing
4 objects, which should test basic config.
5
6 Authors:
7
8 * Min RK
9 """
10
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2013 The IPython Development Team
13 #
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
17
18 #-------------------------------------------------------------------------------
19 # Imports
20 #-------------------------------------------------------------------------------
21
22 import logging
23 import os
24 import shutil
25 import sys
26 import tempfile
27
28 from unittest import TestCase
29
30 from nose import SkipTest
31
32 from IPython.config import Config
33
34 from IPython.parallel.apps import launcher
35
36 from IPython.testing import decorators as dec
37
38
39 #-------------------------------------------------------------------------------
40 # TestCase Mixins
41 #-------------------------------------------------------------------------------
42
43 class LauncherTest:
44 """Mixin for generic launcher tests"""
45 def setUp(self):
46 self.profile_dir = tempfile.mkdtemp(prefix="profile_")
47
48 def tearDown(self):
49 shutil.rmtree(self.profile_dir)
50
51 @property
52 def config(self):
53 return Config()
54
55 def build_launcher(self, **kwargs):
56 kw = dict(
57 work_dir=self.profile_dir,
58 profile_dir=self.profile_dir,
59 config=self.config,
60 cluster_id='',
61 log=logging.getLogger(),
62 )
63 kw.update(kwargs)
64 return self.launcher_class(**kw)
65
66 def test_profile_dir_arg(self):
67 launcher = self.build_launcher()
68 self.assertTrue("--profile-dir" in launcher.cluster_args)
69 self.assertTrue(self.profile_dir in launcher.cluster_args)
70
71 def test_cluster_id_arg(self):
72 launcher = self.build_launcher()
73 self.assertTrue("--cluster-id" in launcher.cluster_args)
74 idx = launcher.cluster_args.index("--cluster-id")
75 self.assertEqual(launcher.cluster_args[idx+1], '')
76 launcher.cluster_id = 'foo'
77 self.assertEqual(launcher.cluster_args[idx+1], 'foo')
78
79 def test_args(self):
80 launcher = self.build_launcher()
81 for arg in launcher.args:
82 self.assertTrue(isinstance(arg, basestring), str(arg))
83
84 class BatchTest:
85 """Tests for batch-system launchers (LSF, SGE, PBS)"""
86 def test_batch_template(self):
87 launcher = self.build_launcher()
88 batch_file = os.path.join(self.profile_dir, launcher.batch_file_name)
89 self.assertEqual(launcher.batch_file, batch_file)
90 launcher.write_batch_script(1)
91 self.assertTrue(os.path.isfile(batch_file))
92
93 class SSHTest:
94 """Tests for SSH launchers"""
95 def test_cluster_id_arg(self):
96 raise SkipTest("SSH Launchers don't support cluster ID")
97
98 def test_remote_profile_dir(self):
99 cfg = Config()
100 launcher_cfg = getattr(cfg, self.launcher_class.__name__)
101 launcher_cfg.remote_profile_dir = "foo"
102 launcher = self.build_launcher(config=cfg)
103 self.assertEqual(launcher.remote_profile_dir, "foo")
104
105 def test_remote_profile_dir_default(self):
106 launcher = self.build_launcher()
107 self.assertEqual(launcher.remote_profile_dir, self.profile_dir)
108
109 #-------------------------------------------------------------------------------
110 # Controller Launcher Tests
111 #-------------------------------------------------------------------------------
112
113 class ControllerLauncherTest(LauncherTest):
114 """Tests for Controller Launchers"""
115 pass
116
117 class TestLocalControllerLauncher(ControllerLauncherTest, TestCase):
118 launcher_class = launcher.LocalControllerLauncher
119
120 class TestMPIControllerLauncher(ControllerLauncherTest, TestCase):
121 launcher_class = launcher.MPIControllerLauncher
122
123 class TestPBSControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
124 launcher_class = launcher.PBSControllerLauncher
125
126 class TestSGEControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
127 launcher_class = launcher.SGEControllerLauncher
128
129 class TestLSFControllerLauncher(BatchTest, ControllerLauncherTest, TestCase):
130 launcher_class = launcher.LSFControllerLauncher
131
132 class TestSSHControllerLauncher(SSHTest, ControllerLauncherTest, TestCase):
133 launcher_class = launcher.SSHControllerLauncher
134
135 #-------------------------------------------------------------------------------
136 # Engine Set Launcher Tests
137 #-------------------------------------------------------------------------------
138
139 class EngineSetLauncherTest(LauncherTest):
140 """Tests for EngineSet launchers"""
141 pass
142
143 class TestLocalEngineSetLauncher(EngineSetLauncherTest, TestCase):
144 launcher_class = launcher.LocalEngineSetLauncher
145
146 class TestMPIEngineSetLauncher(EngineSetLauncherTest, TestCase):
147 launcher_class = launcher.MPIEngineSetLauncher
148
149 class TestPBSEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
150 launcher_class = launcher.PBSEngineSetLauncher
151
152 class TestSGEEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
153 launcher_class = launcher.SGEEngineSetLauncher
154
155 class TestLSFEngineSetLauncher(BatchTest, EngineSetLauncherTest, TestCase):
156 launcher_class = launcher.LSFEngineSetLauncher
157
158 class TestSSHEngineSetLauncher(EngineSetLauncherTest, TestCase):
159 launcher_class = launcher.SSHEngineSetLauncher
160
161 def test_cluster_id_arg(self):
162 raise SkipTest("SSH Launchers don't support cluster ID")
163
164 class TestSSHProxyEngineSetLauncher(SSHTest, LauncherTest, TestCase):
165 launcher_class = launcher.SSHProxyEngineSetLauncher
166
167 class TestSSHEngineLauncher(SSHTest, LauncherTest, TestCase):
168 launcher_class = launcher.SSHEngineLauncher
169
170 #-------------------------------------------------------------------------------
171 # Windows Launcher Tests
172 #-------------------------------------------------------------------------------
173
174 if sys.platform.startswith("win"):
175 class TestWinHPCControllerLauncher(ControllerLauncherTest, TestCase):
176 launcher_class = launcher.WindowsHPCControllerLauncher
177
178 class TestWinHPCEngineSetLauncher(EngineSetLauncherTest, TestCase):
179 launcher_class = launcher.WindowsHPCEngineSetLauncher
@@ -1,1358 +1,1359
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.encoding import DEFAULT_ENCODING
65 65 from IPython.utils.path import get_home_dir
66 66 from IPython.utils.process import find_cmd, FindCmdError
67 67
68 68 from .win32support import forward_read_events
69 69
70 70 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 71
72 72 WINDOWS = os.name == 'nt'
73 73
74 74 #-----------------------------------------------------------------------------
75 75 # Paths to the kernel apps
76 76 #-----------------------------------------------------------------------------
77 77
78 78 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 79
80 80 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 81
82 82 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 83
84 84 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Base launchers and errors
88 88 #-----------------------------------------------------------------------------
89 89
90 90 class LauncherError(Exception):
91 91 pass
92 92
93 93
94 94 class ProcessStateError(LauncherError):
95 95 pass
96 96
97 97
98 98 class UnknownStatus(LauncherError):
99 99 pass
100 100
101 101
102 102 class BaseLauncher(LoggingConfigurable):
103 103 """An asbtraction for starting, stopping and signaling a process."""
104 104
105 105 # In all of the launchers, the work_dir is where child processes will be
106 106 # run. This will usually be the profile_dir, but may not be. any work_dir
107 107 # passed into the __init__ method will override the config value.
108 108 # This should not be used to set the work_dir for the actual engine
109 109 # and controller. Instead, use their own config files or the
110 110 # controller_args, engine_args attributes of the launchers to add
111 111 # the work_dir option.
112 112 work_dir = Unicode(u'.')
113 113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114 114
115 115 start_data = Any()
116 116 stop_data = Any()
117 117
118 118 def _loop_default(self):
119 119 return ioloop.IOLoop.instance()
120 120
121 121 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 123 self.state = 'before' # can be before, running, after
124 124 self.stop_callbacks = []
125 125 self.start_data = None
126 126 self.stop_data = None
127 127
128 128 @property
129 129 def args(self):
130 130 """A list of cmd and args that will be used to start the process.
131 131
132 132 This is what is passed to :func:`spawnProcess` and the first element
133 133 will be the process name.
134 134 """
135 135 return self.find_args()
136 136
137 137 def find_args(self):
138 138 """The ``.args`` property calls this to find the args list.
139 139
140 140 Subcommand should implement this to construct the cmd and args.
141 141 """
142 142 raise NotImplementedError('find_args must be implemented in a subclass')
143 143
144 144 @property
145 145 def arg_str(self):
146 146 """The string form of the program arguments."""
147 147 return ' '.join(self.args)
148 148
149 149 @property
150 150 def running(self):
151 151 """Am I running."""
152 152 if self.state == 'running':
153 153 return True
154 154 else:
155 155 return False
156 156
157 157 def start(self):
158 158 """Start the process."""
159 159 raise NotImplementedError('start must be implemented in a subclass')
160 160
161 161 def stop(self):
162 162 """Stop the process and notify observers of stopping.
163 163
164 164 This method will return None immediately.
165 165 To observe the actual process stopping, see :meth:`on_stop`.
166 166 """
167 167 raise NotImplementedError('stop must be implemented in a subclass')
168 168
169 169 def on_stop(self, f):
170 170 """Register a callback to be called with this Launcher's stop_data
171 171 when the process actually finishes.
172 172 """
173 173 if self.state=='after':
174 174 return f(self.stop_data)
175 175 else:
176 176 self.stop_callbacks.append(f)
177 177
178 178 def notify_start(self, data):
179 179 """Call this to trigger startup actions.
180 180
181 181 This logs the process startup and sets the state to 'running'. It is
182 182 a pass-through so it can be used as a callback.
183 183 """
184 184
185 185 self.log.debug('Process %r started: %r', self.args[0], data)
186 186 self.start_data = data
187 187 self.state = 'running'
188 188 return data
189 189
190 190 def notify_stop(self, data):
191 191 """Call this to trigger process stop actions.
192 192
193 193 This logs the process stopping and sets the state to 'after'. Call
194 194 this to trigger callbacks registered via :meth:`on_stop`."""
195 195
196 196 self.log.debug('Process %r stopped: %r', self.args[0], data)
197 197 self.stop_data = data
198 198 self.state = 'after'
199 199 for i in range(len(self.stop_callbacks)):
200 200 d = self.stop_callbacks.pop()
201 201 d(data)
202 202 return data
203 203
204 204 def signal(self, sig):
205 205 """Signal the process.
206 206
207 207 Parameters
208 208 ----------
209 209 sig : str or int
210 210 'KILL', 'INT', etc., or any signal number
211 211 """
212 212 raise NotImplementedError('signal must be implemented in a subclass')
213 213
214 214 class ClusterAppMixin(HasTraits):
215 215 """MixIn for cluster args as traits"""
216 216 profile_dir=Unicode('')
217 217 cluster_id=Unicode('')
218 218
219 219 @property
220 220 def cluster_args(self):
221 221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
222 222
223 223 class ControllerMixin(ClusterAppMixin):
224 224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
225 225 help="""Popen command to launch ipcontroller.""")
226 226 # Command line arguments to ipcontroller.
227 227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
228 228 help="""command-line args to pass to ipcontroller""")
229 229
230 230 class EngineMixin(ClusterAppMixin):
231 231 engine_cmd = List(ipengine_cmd_argv, config=True,
232 232 help="""command to launch the Engine.""")
233 233 # Command line arguments for ipengine.
234 234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 235 help="command-line arguments to pass to ipengine"
236 236 )
237 237
238 238
239 239 #-----------------------------------------------------------------------------
240 240 # Local process launchers
241 241 #-----------------------------------------------------------------------------
242 242
243 243
244 244 class LocalProcessLauncher(BaseLauncher):
245 245 """Start and stop an external process in an asynchronous manner.
246 246
247 247 This will launch the external process with a working directory of
248 248 ``self.work_dir``.
249 249 """
250 250
251 251 # This is used to to construct self.args, which is passed to
252 252 # spawnProcess.
253 253 cmd_and_args = List([])
254 254 poll_frequency = Integer(100) # in ms
255 255
256 256 def __init__(self, work_dir=u'.', config=None, **kwargs):
257 257 super(LocalProcessLauncher, self).__init__(
258 258 work_dir=work_dir, config=config, **kwargs
259 259 )
260 260 self.process = None
261 261 self.poller = None
262 262
263 263 def find_args(self):
264 264 return self.cmd_and_args
265 265
266 266 def start(self):
267 267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
268 268 if self.state == 'before':
269 269 self.process = Popen(self.args,
270 270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
271 271 env=os.environ,
272 272 cwd=self.work_dir
273 273 )
274 274 if WINDOWS:
275 275 self.stdout = forward_read_events(self.process.stdout)
276 276 self.stderr = forward_read_events(self.process.stderr)
277 277 else:
278 278 self.stdout = self.process.stdout.fileno()
279 279 self.stderr = self.process.stderr.fileno()
280 280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
281 281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
282 282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
283 283 self.poller.start()
284 284 self.notify_start(self.process.pid)
285 285 else:
286 286 s = 'The process was already started and has state: %r' % self.state
287 287 raise ProcessStateError(s)
288 288
289 289 def stop(self):
290 290 return self.interrupt_then_kill()
291 291
292 292 def signal(self, sig):
293 293 if self.state == 'running':
294 294 if WINDOWS and sig != SIGINT:
295 295 # use Windows tree-kill for better child cleanup
296 296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
297 297 else:
298 298 self.process.send_signal(sig)
299 299
300 300 def interrupt_then_kill(self, delay=2.0):
301 301 """Send INT, wait a delay and then send KILL."""
302 302 try:
303 303 self.signal(SIGINT)
304 304 except Exception:
305 305 self.log.debug("interrupt failed")
306 306 pass
307 307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
308 308 self.killer.start()
309 309
310 310 # callbacks, etc:
311 311
312 312 def handle_stdout(self, fd, events):
313 313 if WINDOWS:
314 314 line = self.stdout.recv()
315 315 else:
316 316 line = self.process.stdout.readline()
317 317 # a stopped process will be readable but return empty strings
318 318 if line:
319 319 self.log.debug(line[:-1])
320 320 else:
321 321 self.poll()
322 322
323 323 def handle_stderr(self, fd, events):
324 324 if WINDOWS:
325 325 line = self.stderr.recv()
326 326 else:
327 327 line = self.process.stderr.readline()
328 328 # a stopped process will be readable but return empty strings
329 329 if line:
330 330 self.log.debug(line[:-1])
331 331 else:
332 332 self.poll()
333 333
334 334 def poll(self):
335 335 status = self.process.poll()
336 336 if status is not None:
337 337 self.poller.stop()
338 338 self.loop.remove_handler(self.stdout)
339 339 self.loop.remove_handler(self.stderr)
340 340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
341 341 return status
342 342
343 343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
344 344 """Launch a controller as a regular external process."""
345 345
346 346 def find_args(self):
347 347 return self.controller_cmd + self.cluster_args + self.controller_args
348 348
349 349 def start(self):
350 350 """Start the controller by profile_dir."""
351 351 return super(LocalControllerLauncher, self).start()
352 352
353 353
354 354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
355 355 """Launch a single engine as a regular externall process."""
356 356
357 357 def find_args(self):
358 358 return self.engine_cmd + self.cluster_args + self.engine_args
359 359
360 360
361 361 class LocalEngineSetLauncher(LocalEngineLauncher):
362 362 """Launch a set of engines as regular external processes."""
363 363
364 364 delay = CFloat(0.1, config=True,
365 365 help="""delay (in seconds) between starting each engine after the first.
366 366 This can help force the engines to get their ids in order, or limit
367 367 process flood when starting many engines."""
368 368 )
369 369
370 370 # launcher class
371 371 launcher_class = LocalEngineLauncher
372 372
373 373 launchers = Dict()
374 374 stop_data = Dict()
375 375
376 376 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 377 super(LocalEngineSetLauncher, self).__init__(
378 378 work_dir=work_dir, config=config, **kwargs
379 379 )
380 380 self.stop_data = {}
381 381
382 382 def start(self, n):
383 383 """Start n engines by profile or profile_dir."""
384 384 dlist = []
385 385 for i in range(n):
386 386 if i > 0:
387 387 time.sleep(self.delay)
388 388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
389 389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
390 390 )
391 391
392 392 # Copy the engine args over to each engine launcher.
393 393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
394 394 el.engine_args = copy.deepcopy(self.engine_args)
395 395 el.on_stop(self._notice_engine_stopped)
396 396 d = el.start()
397 397 self.launchers[i] = el
398 398 dlist.append(d)
399 399 self.notify_start(dlist)
400 400 return dlist
401 401
402 402 def find_args(self):
403 403 return ['engine set']
404 404
405 405 def signal(self, sig):
406 406 dlist = []
407 407 for el in self.launchers.itervalues():
408 408 d = el.signal(sig)
409 409 dlist.append(d)
410 410 return dlist
411 411
412 412 def interrupt_then_kill(self, delay=1.0):
413 413 dlist = []
414 414 for el in self.launchers.itervalues():
415 415 d = el.interrupt_then_kill(delay)
416 416 dlist.append(d)
417 417 return dlist
418 418
419 419 def stop(self):
420 420 return self.interrupt_then_kill()
421 421
422 422 def _notice_engine_stopped(self, data):
423 423 pid = data['pid']
424 424 for idx,el in self.launchers.iteritems():
425 425 if el.process.pid == pid:
426 426 break
427 427 self.launchers.pop(idx)
428 428 self.stop_data[idx] = data
429 429 if not self.launchers:
430 430 self.notify_stop(self.stop_data)
431 431
432 432
433 433 #-----------------------------------------------------------------------------
434 434 # MPI launchers
435 435 #-----------------------------------------------------------------------------
436 436
437 437
438 438 class MPILauncher(LocalProcessLauncher):
439 439 """Launch an external process using mpiexec."""
440 440
441 441 mpi_cmd = List(['mpiexec'], config=True,
442 442 help="The mpiexec command to use in starting the process."
443 443 )
444 444 mpi_args = List([], config=True,
445 445 help="The command line arguments to pass to mpiexec."
446 446 )
447 447 program = List(['date'],
448 448 help="The program to start via mpiexec.")
449 449 program_args = List([],
450 450 help="The command line argument to the program."
451 451 )
452 452 n = Integer(1)
453 453
454 454 def __init__(self, *args, **kwargs):
455 455 # deprecation for old MPIExec names:
456 456 config = kwargs.get('config', {})
457 457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
458 458 deprecated = config.get(oldname)
459 459 if deprecated:
460 460 newname = oldname.replace('MPIExec', 'MPI')
461 461 config[newname].update(deprecated)
462 462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
463 463
464 464 super(MPILauncher, self).__init__(*args, **kwargs)
465 465
466 466 def find_args(self):
467 467 """Build self.args using all the fields."""
468 468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 469 self.program + self.program_args
470 470
471 471 def start(self, n):
472 472 """Start n instances of the program using mpiexec."""
473 473 self.n = n
474 474 return super(MPILauncher, self).start()
475 475
476 476
477 477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
478 478 """Launch a controller using mpiexec."""
479 479
480 480 # alias back to *non-configurable* program[_args] for use in find_args()
481 481 # this way all Controller/EngineSetLaunchers have the same form, rather
482 482 # than *some* having `program_args` and others `controller_args`
483 483 @property
484 484 def program(self):
485 485 return self.controller_cmd
486 486
487 487 @property
488 488 def program_args(self):
489 489 return self.cluster_args + self.controller_args
490 490
491 491 def start(self):
492 492 """Start the controller by profile_dir."""
493 493 return super(MPIControllerLauncher, self).start(1)
494 494
495 495
496 496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
497 497 """Launch engines using mpiexec"""
498 498
499 499 # alias back to *non-configurable* program[_args] for use in find_args()
500 500 # this way all Controller/EngineSetLaunchers have the same form, rather
501 501 # than *some* having `program_args` and others `controller_args`
502 502 @property
503 503 def program(self):
504 504 return self.engine_cmd
505 505
506 506 @property
507 507 def program_args(self):
508 508 return self.cluster_args + self.engine_args
509 509
510 510 def start(self, n):
511 511 """Start n engines by profile or profile_dir."""
512 512 self.n = n
513 513 return super(MPIEngineSetLauncher, self).start(n)
514 514
515 515 # deprecated MPIExec names
516 516 class DeprecatedMPILauncher(object):
517 517 def warn(self):
518 518 oldname = self.__class__.__name__
519 519 newname = oldname.replace('MPIExec', 'MPI')
520 520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
521 521
522 522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
523 523 """Deprecated, use MPILauncher"""
524 524 def __init__(self, *args, **kwargs):
525 525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
526 526 self.warn()
527 527
528 528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
529 529 """Deprecated, use MPIControllerLauncher"""
530 530 def __init__(self, *args, **kwargs):
531 531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
532 532 self.warn()
533 533
534 534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
535 535 """Deprecated, use MPIEngineSetLauncher"""
536 536 def __init__(self, *args, **kwargs):
537 537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
538 538 self.warn()
539 539
540 540
541 541 #-----------------------------------------------------------------------------
542 542 # SSH launchers
543 543 #-----------------------------------------------------------------------------
544 544
545 545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
546 546
547 547 class SSHLauncher(LocalProcessLauncher):
548 548 """A minimal launcher for ssh.
549 549
550 550 To be useful this will probably have to be extended to use the ``sshx``
551 551 idea for environment variables. There could be other things this needs
552 552 as well.
553 553 """
554 554
555 555 ssh_cmd = List(['ssh'], config=True,
556 556 help="command for starting ssh")
557 557 ssh_args = List(['-tt'], config=True,
558 558 help="args to pass to ssh")
559 559 scp_cmd = List(['scp'], config=True,
560 560 help="command for sending files")
561 561 program = List(['date'],
562 562 help="Program to launch via ssh")
563 563 program_args = List([],
564 564 help="args to pass to remote program")
565 565 hostname = Unicode('', config=True,
566 566 help="hostname on which to launch the program")
567 567 user = Unicode('', config=True,
568 568 help="username for ssh")
569 569 location = Unicode('', config=True,
570 570 help="user@hostname location for ssh in one setting")
571 571 to_fetch = List([], config=True,
572 572 help="List of (remote, local) files to fetch after starting")
573 573 to_send = List([], config=True,
574 574 help="List of (local, remote) files to send before starting")
575 575
576 576 def _hostname_changed(self, name, old, new):
577 577 if self.user:
578 578 self.location = u'%s@%s' % (self.user, new)
579 579 else:
580 580 self.location = new
581 581
582 582 def _user_changed(self, name, old, new):
583 583 self.location = u'%s@%s' % (new, self.hostname)
584 584
585 585 def find_args(self):
586 586 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 587 list(map(pipes.quote, self.program + self.program_args))
588 588
589 589 def _send_file(self, local, remote):
590 590 """send a single file"""
591 591 remote = "%s:%s" % (self.location, remote)
592 592 for i in range(10):
593 593 if not os.path.exists(local):
594 594 self.log.debug("waiting for %s" % local)
595 595 time.sleep(1)
596 596 else:
597 597 break
598 598 self.log.info("sending %s to %s", local, remote)
599 599 check_output(self.scp_cmd + [local, remote])
600 600
601 601 def send_files(self):
602 602 """send our files (called before start)"""
603 603 if not self.to_send:
604 604 return
605 605 for local_file, remote_file in self.to_send:
606 606 self._send_file(local_file, remote_file)
607 607
608 608 def _fetch_file(self, remote, local):
609 609 """fetch a single file"""
610 610 full_remote = "%s:%s" % (self.location, remote)
611 611 self.log.info("fetching %s from %s", local, full_remote)
612 612 for i in range(10):
613 613 # wait up to 10s for remote file to exist
614 614 check = check_output(self.ssh_cmd + self.ssh_args + \
615 615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
616 616 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
617 617 if check == u'no':
618 618 time.sleep(1)
619 619 elif check == u'yes':
620 620 break
621 621 check_output(self.scp_cmd + [full_remote, local])
622 622
623 623 def fetch_files(self):
624 624 """fetch remote files (called after start)"""
625 625 if not self.to_fetch:
626 626 return
627 627 for remote_file, local_file in self.to_fetch:
628 628 self._fetch_file(remote_file, local_file)
629 629
630 630 def start(self, hostname=None, user=None):
631 631 if hostname is not None:
632 632 self.hostname = hostname
633 633 if user is not None:
634 634 self.user = user
635 635
636 636 self.send_files()
637 637 super(SSHLauncher, self).start()
638 638 self.fetch_files()
639 639
640 640 def signal(self, sig):
641 641 if self.state == 'running':
642 642 # send escaped ssh connection-closer
643 643 self.process.stdin.write('~.')
644 644 self.process.stdin.flush()
645 645
646 class SSHClusterLauncher(SSHLauncher):
646 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
647 647
648 648 remote_profile_dir = Unicode('', config=True,
649 649 help="""The remote profile_dir to use.
650 650
651 651 If not specified, use calling profile, stripping out possible leading homedir.
652 652 """)
653 653
654 654 def _profile_dir_changed(self, name, old, new):
655 655 if not self.remote_profile_dir:
656 656 # trigger remote_profile_dir_default logic again,
657 657 # in case it was already triggered before profile_dir was set
658 658 self.remote_profile_dir = self._strip_home(new)
659 659
660 660 @staticmethod
661 661 def _strip_home(path):
662 662 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
663 663 home = get_home_dir()
664 664 if not home.endswith('/'):
665 665 home = home+'/'
666 666
667 667 if path.startswith(home):
668 668 return path[len(home):]
669 669 else:
670 670 return path
671 671
672 672 def _remote_profile_dir_default(self):
673 673 return self._strip_home(self.profile_dir)
674 674
675 675 def _cluster_id_changed(self, name, old, new):
676 676 if new:
677 677 raise ValueError("cluster id not supported by SSH launchers")
678 678
679 679 @property
680 680 def cluster_args(self):
681 681 return ['--profile-dir', self.remote_profile_dir]
682 682
683 683 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
684 684
685 685 # alias back to *non-configurable* program[_args] for use in find_args()
686 686 # this way all Controller/EngineSetLaunchers have the same form, rather
687 687 # than *some* having `program_args` and others `controller_args`
688 688
689 689 def _controller_cmd_default(self):
690 690 return ['ipcontroller']
691 691
692 692 @property
693 693 def program(self):
694 694 return self.controller_cmd
695 695
696 696 @property
697 697 def program_args(self):
698 698 return self.cluster_args + self.controller_args
699 699
700 700 def _to_fetch_default(self):
701 701 return [
702 702 (os.path.join(self.remote_profile_dir, 'security', cf),
703 703 os.path.join(self.profile_dir, 'security', cf),)
704 704 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
705 705 ]
706 706
707 707 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
708 708
709 709 # alias back to *non-configurable* program[_args] for use in find_args()
710 710 # this way all Controller/EngineSetLaunchers have the same form, rather
711 711 # than *some* having `program_args` and others `controller_args`
712 712
713 713 def _engine_cmd_default(self):
714 714 return ['ipengine']
715 715
716 716 @property
717 717 def program(self):
718 718 return self.engine_cmd
719 719
720 720 @property
721 721 def program_args(self):
722 722 return self.cluster_args + self.engine_args
723 723
724 724 def _to_send_default(self):
725 725 return [
726 726 (os.path.join(self.profile_dir, 'security', cf),
727 727 os.path.join(self.remote_profile_dir, 'security', cf))
728 728 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
729 729 ]
730 730
731 731
732 732 class SSHEngineSetLauncher(LocalEngineSetLauncher):
733 733 launcher_class = SSHEngineLauncher
734 734 engines = Dict(config=True,
735 735 help="""dict of engines to launch. This is a dict by hostname of ints,
736 736 corresponding to the number of engines to start on that host.""")
737 737
738 738 def _engine_cmd_default(self):
739 739 return ['ipengine']
740 740
741 741 @property
742 742 def engine_count(self):
743 743 """determine engine count from `engines` dict"""
744 744 count = 0
745 745 for n in self.engines.itervalues():
746 746 if isinstance(n, (tuple,list)):
747 747 n,args = n
748 748 count += n
749 749 return count
750 750
751 751 def start(self, n):
752 752 """Start engines by profile or profile_dir.
753 753 `n` is ignored, and the `engines` config property is used instead.
754 754 """
755 755
756 756 dlist = []
757 757 for host, n in self.engines.iteritems():
758 758 if isinstance(n, (tuple, list)):
759 759 n, args = n
760 760 else:
761 761 args = copy.deepcopy(self.engine_args)
762 762
763 763 if '@' in host:
764 764 user,host = host.split('@',1)
765 765 else:
766 766 user=None
767 767 for i in range(n):
768 768 if i > 0:
769 769 time.sleep(self.delay)
770 770 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
771 771 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
772 772 )
773 773 if i > 0:
774 774 # only send files for the first engine on each host
775 775 el.to_send = []
776 776
777 777 # Copy the engine args over to each engine launcher.
778 778 el.engine_cmd = self.engine_cmd
779 779 el.engine_args = args
780 780 el.on_stop(self._notice_engine_stopped)
781 781 d = el.start(user=user, hostname=host)
782 782 self.launchers[ "%s/%i" % (host,i) ] = el
783 783 dlist.append(d)
784 784 self.notify_start(dlist)
785 785 return dlist
786 786
787 787
788 788 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
789 789 """Launcher for calling
790 790 `ipcluster engines` on a remote machine.
791 791
792 792 Requires that remote profile is already configured.
793 793 """
794 794
795 795 n = Integer()
796 796 ipcluster_cmd = List(['ipcluster'], config=True)
797 797
798 798 @property
799 799 def program(self):
800 800 return self.ipcluster_cmd + ['engines']
801 801
802 802 @property
803 803 def program_args(self):
804 804 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
805 805
806 806 def _to_send_default(self):
807 807 return [
808 808 (os.path.join(self.profile_dir, 'security', cf),
809 809 os.path.join(self.remote_profile_dir, 'security', cf))
810 810 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
811 811 ]
812 812
813 813 def start(self, n):
814 814 self.n = n
815 815 super(SSHProxyEngineSetLauncher, self).start()
816 816
817 817
818 818 #-----------------------------------------------------------------------------
819 819 # Windows HPC Server 2008 scheduler launchers
820 820 #-----------------------------------------------------------------------------
821 821
822 822
823 823 # This is only used on Windows.
824 824 def find_job_cmd():
825 825 if WINDOWS:
826 826 try:
827 827 return find_cmd('job')
828 828 except (FindCmdError, ImportError):
829 829 # ImportError will be raised if win32api is not installed
830 830 return 'job'
831 831 else:
832 832 return 'job'
833 833
834 834
835 835 class WindowsHPCLauncher(BaseLauncher):
836 836
837 837 job_id_regexp = CRegExp(r'\d+', config=True,
838 838 help="""A regular expression used to get the job id from the output of the
839 839 submit_command. """
840 840 )
841 841 job_file_name = Unicode(u'ipython_job.xml', config=True,
842 842 help="The filename of the instantiated job script.")
843 843 # The full path to the instantiated job script. This gets made dynamically
844 844 # by combining the work_dir with the job_file_name.
845 845 job_file = Unicode(u'')
846 846 scheduler = Unicode('', config=True,
847 847 help="The hostname of the scheduler to submit the job to.")
848 848 job_cmd = Unicode(find_job_cmd(), config=True,
849 849 help="The command for submitting jobs.")
850 850
851 851 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 852 super(WindowsHPCLauncher, self).__init__(
853 853 work_dir=work_dir, config=config, **kwargs
854 854 )
855 855
856 856 @property
857 857 def job_file(self):
858 858 return os.path.join(self.work_dir, self.job_file_name)
859 859
860 860 def write_job_file(self, n):
861 861 raise NotImplementedError("Implement write_job_file in a subclass.")
862 862
863 863 def find_args(self):
864 864 return [u'job.exe']
865 865
866 866 def parse_job_id(self, output):
867 867 """Take the output of the submit command and return the job id."""
868 868 m = self.job_id_regexp.search(output)
869 869 if m is not None:
870 870 job_id = m.group()
871 871 else:
872 872 raise LauncherError("Job id couldn't be determined: %s" % output)
873 873 self.job_id = job_id
874 874 self.log.info('Job started with id: %r', job_id)
875 875 return job_id
876 876
877 877 def start(self, n):
878 878 """Start n copies of the process using the Win HPC job scheduler."""
879 879 self.write_job_file(n)
880 880 args = [
881 881 'submit',
882 882 '/jobfile:%s' % self.job_file,
883 883 '/scheduler:%s' % self.scheduler
884 884 ]
885 885 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
886 886
887 887 output = check_output([self.job_cmd]+args,
888 888 env=os.environ,
889 889 cwd=self.work_dir,
890 890 stderr=STDOUT
891 891 )
892 892 output = output.decode(DEFAULT_ENCODING, 'replace')
893 893 job_id = self.parse_job_id(output)
894 894 self.notify_start(job_id)
895 895 return job_id
896 896
897 897 def stop(self):
898 898 args = [
899 899 'cancel',
900 900 self.job_id,
901 901 '/scheduler:%s' % self.scheduler
902 902 ]
903 903 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
904 904 try:
905 905 output = check_output([self.job_cmd]+args,
906 906 env=os.environ,
907 907 cwd=self.work_dir,
908 908 stderr=STDOUT
909 909 )
910 910 output = output.decode(DEFAULT_ENCODING, 'replace')
911 911 except:
912 912 output = u'The job already appears to be stopped: %r' % self.job_id
913 913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
914 914 return output
915 915
916 916
917 917 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
918 918
919 919 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
920 920 help="WinHPC xml job file.")
921 921 controller_args = List([], config=False,
922 922 help="extra args to pass to ipcontroller")
923 923
924 924 def write_job_file(self, n):
925 925 job = IPControllerJob(config=self.config)
926 926
927 927 t = IPControllerTask(config=self.config)
928 928 # The tasks work directory is *not* the actual work directory of
929 929 # the controller. It is used as the base path for the stdout/stderr
930 930 # files that the scheduler redirects to.
931 931 t.work_directory = self.profile_dir
932 932 # Add the profile_dir and from self.start().
933 933 t.controller_args.extend(self.cluster_args)
934 934 t.controller_args.extend(self.controller_args)
935 935 job.add_task(t)
936 936
937 937 self.log.debug("Writing job description file: %s", self.job_file)
938 938 job.write(self.job_file)
939 939
940 940 @property
941 941 def job_file(self):
942 942 return os.path.join(self.profile_dir, self.job_file_name)
943 943
944 944 def start(self):
945 945 """Start the controller by profile_dir."""
946 946 return super(WindowsHPCControllerLauncher, self).start(1)
947 947
948 948
949 949 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
950 950
951 951 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
952 952 help="jobfile for ipengines job")
953 953 engine_args = List([], config=False,
954 954 help="extra args to pas to ipengine")
955 955
956 956 def write_job_file(self, n):
957 957 job = IPEngineSetJob(config=self.config)
958 958
959 959 for i in range(n):
960 960 t = IPEngineTask(config=self.config)
961 961 # The tasks work directory is *not* the actual work directory of
962 962 # the engine. It is used as the base path for the stdout/stderr
963 963 # files that the scheduler redirects to.
964 964 t.work_directory = self.profile_dir
965 965 # Add the profile_dir and from self.start().
966 966 t.engine_args.extend(self.cluster_args)
967 967 t.engine_args.extend(self.engine_args)
968 968 job.add_task(t)
969 969
970 970 self.log.debug("Writing job description file: %s", self.job_file)
971 971 job.write(self.job_file)
972 972
973 973 @property
974 974 def job_file(self):
975 975 return os.path.join(self.profile_dir, self.job_file_name)
976 976
977 977 def start(self, n):
978 978 """Start the controller by profile_dir."""
979 979 return super(WindowsHPCEngineSetLauncher, self).start(n)
980 980
981 981
982 982 #-----------------------------------------------------------------------------
983 983 # Batch (PBS) system launchers
984 984 #-----------------------------------------------------------------------------
985 985
986 986 class BatchClusterAppMixin(ClusterAppMixin):
987 987 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
988 988 def _profile_dir_changed(self, name, old, new):
989 989 self.context[name] = new
990 990 _cluster_id_changed = _profile_dir_changed
991 991
992 992 def _profile_dir_default(self):
993 993 self.context['profile_dir'] = ''
994 994 return ''
995 995 def _cluster_id_default(self):
996 996 self.context['cluster_id'] = ''
997 997 return ''
998 998
999 999
1000 1000 class BatchSystemLauncher(BaseLauncher):
1001 1001 """Launch an external process using a batch system.
1002 1002
1003 1003 This class is designed to work with UNIX batch systems like PBS, LSF,
1004 1004 GridEngine, etc. The overall model is that there are different commands
1005 1005 like qsub, qdel, etc. that handle the starting and stopping of the process.
1006 1006
1007 1007 This class also has the notion of a batch script. The ``batch_template``
1008 1008 attribute can be set to a string that is a template for the batch script.
1009 1009 This template is instantiated using string formatting. Thus the template can
1010 1010 use {n} fot the number of instances. Subclasses can add additional variables
1011 1011 to the template dict.
1012 1012 """
1013 1013
1014 1014 # Subclasses must fill these in. See PBSEngineSet
1015 1015 submit_command = List([''], config=True,
1016 1016 help="The name of the command line program used to submit jobs.")
1017 1017 delete_command = List([''], config=True,
1018 1018 help="The name of the command line program used to delete jobs.")
1019 1019 job_id_regexp = CRegExp('', config=True,
1020 1020 help="""A regular expression used to get the job id from the output of the
1021 1021 submit_command.""")
1022 1022 batch_template = Unicode('', config=True,
1023 1023 help="The string that is the batch script template itself.")
1024 1024 batch_template_file = Unicode(u'', config=True,
1025 1025 help="The file that contains the batch template.")
1026 1026 batch_file_name = Unicode(u'batch_script', config=True,
1027 1027 help="The filename of the instantiated batch script.")
1028 1028 queue = Unicode(u'', config=True,
1029 1029 help="The PBS Queue.")
1030 1030
1031 1031 def _queue_changed(self, name, old, new):
1032 1032 self.context[name] = new
1033 1033
1034 1034 n = Integer(1)
1035 1035 _n_changed = _queue_changed
1036 1036
1037 1037 # not configurable, override in subclasses
1038 1038 # PBS Job Array regex
1039 1039 job_array_regexp = CRegExp('')
1040 1040 job_array_template = Unicode('')
1041 1041 # PBS Queue regex
1042 1042 queue_regexp = CRegExp('')
1043 1043 queue_template = Unicode('')
1044 1044 # The default batch template, override in subclasses
1045 1045 default_template = Unicode('')
1046 1046 # The full path to the instantiated batch script.
1047 1047 batch_file = Unicode(u'')
1048 1048 # the format dict used with batch_template:
1049 1049 context = Dict()
1050 1050 def _context_default(self):
1051 1051 """load the default context with the default values for the basic keys
1052 1052
1053 1053 because the _trait_changed methods only load the context if they
1054 1054 are set to something other than the default value.
1055 1055 """
1056 1056 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1057 1057
1058 1058 # the Formatter instance for rendering the templates:
1059 1059 formatter = Instance(EvalFormatter, (), {})
1060 1060
1061 1061
1062 1062 def find_args(self):
1063 1063 return self.submit_command + [self.batch_file]
1064 1064
1065 1065 def __init__(self, work_dir=u'.', config=None, **kwargs):
1066 1066 super(BatchSystemLauncher, self).__init__(
1067 1067 work_dir=work_dir, config=config, **kwargs
1068 1068 )
1069 1069 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1070 1070
1071 1071 def parse_job_id(self, output):
1072 1072 """Take the output of the submit command and return the job id."""
1073 1073 m = self.job_id_regexp.search(output)
1074 1074 if m is not None:
1075 1075 job_id = m.group()
1076 1076 else:
1077 1077 raise LauncherError("Job id couldn't be determined: %s" % output)
1078 1078 self.job_id = job_id
1079 1079 self.log.info('Job submitted with job id: %r', job_id)
1080 1080 return job_id
1081 1081
1082 1082 def write_batch_script(self, n):
1083 1083 """Instantiate and write the batch script to the work_dir."""
1084 1084 self.n = n
1085 1085 # first priority is batch_template if set
1086 1086 if self.batch_template_file and not self.batch_template:
1087 1087 # second priority is batch_template_file
1088 1088 with open(self.batch_template_file) as f:
1089 1089 self.batch_template = f.read()
1090 1090 if not self.batch_template:
1091 1091 # third (last) priority is default_template
1092 1092 self.batch_template = self.default_template
1093 1093
1094 1094 # add jobarray or queue lines to user-specified template
1095 1095 # note that this is *only* when user did not specify a template.
1096 1096 # print self.job_array_regexp.search(self.batch_template)
1097 1097 if not self.job_array_regexp.search(self.batch_template):
1098 1098 self.log.debug("adding job array settings to batch script")
1099 1099 firstline, rest = self.batch_template.split('\n',1)
1100 1100 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1101 1101
1102 1102 # print self.queue_regexp.search(self.batch_template)
1103 1103 if self.queue and not self.queue_regexp.search(self.batch_template):
1104 1104 self.log.debug("adding PBS queue settings to batch script")
1105 1105 firstline, rest = self.batch_template.split('\n',1)
1106 1106 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1107 1107
1108 1108 script_as_string = self.formatter.format(self.batch_template, **self.context)
1109 1109 self.log.debug('Writing batch script: %s', self.batch_file)
1110 1110
1111 1111 with open(self.batch_file, 'w') as f:
1112 1112 f.write(script_as_string)
1113 1113 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1114 1114
1115 1115 def start(self, n):
1116 1116 """Start n copies of the process using a batch system."""
1117 1117 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1118 1118 # Here we save profile_dir in the context so they
1119 1119 # can be used in the batch script template as {profile_dir}
1120 1120 self.write_batch_script(n)
1121 1121 output = check_output(self.args, env=os.environ)
1122 1122 output = output.decode(DEFAULT_ENCODING, 'replace')
1123 1123
1124 1124 job_id = self.parse_job_id(output)
1125 1125 self.notify_start(job_id)
1126 1126 return job_id
1127 1127
1128 1128 def stop(self):
1129 1129 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1130 1130 output = output.decode(DEFAULT_ENCODING, 'replace')
1131 1131 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1132 1132 return output
1133 1133
1134 1134
1135 1135 class PBSLauncher(BatchSystemLauncher):
1136 1136 """A BatchSystemLauncher subclass for PBS."""
1137 1137
1138 1138 submit_command = List(['qsub'], config=True,
1139 1139 help="The PBS submit command ['qsub']")
1140 1140 delete_command = List(['qdel'], config=True,
1141 1141 help="The PBS delete command ['qsub']")
1142 1142 job_id_regexp = CRegExp(r'\d+', config=True,
1143 1143 help="Regular expresion for identifying the job ID [r'\d+']")
1144 1144
1145 1145 batch_file = Unicode(u'')
1146 1146 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1147 1147 job_array_template = Unicode('#PBS -t 1-{n}')
1148 1148 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1149 1149 queue_template = Unicode('#PBS -q {queue}')
1150 1150
1151 1151
1152 1152 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1153 1153 """Launch a controller using PBS."""
1154 1154
1155 1155 batch_file_name = Unicode(u'pbs_controller', config=True,
1156 1156 help="batch file name for the controller job.")
1157 1157 default_template= Unicode("""#!/bin/sh
1158 1158 #PBS -V
1159 1159 #PBS -N ipcontroller
1160 1160 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1161 1161 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1162 1162
1163 1163
1164 1164 def start(self):
1165 1165 """Start the controller by profile or profile_dir."""
1166 1166 return super(PBSControllerLauncher, self).start(1)
1167 1167
1168 1168
1169 1169 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1170 1170 """Launch Engines using PBS"""
1171 1171 batch_file_name = Unicode(u'pbs_engines', config=True,
1172 1172 help="batch file name for the engine(s) job.")
1173 1173 default_template= Unicode(u"""#!/bin/sh
1174 1174 #PBS -V
1175 1175 #PBS -N ipengine
1176 1176 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1177 1177 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1178 1178
1179 1179 def start(self, n):
1180 1180 """Start n engines by profile or profile_dir."""
1181 1181 return super(PBSEngineSetLauncher, self).start(n)
1182 1182
1183 1183 #SGE is very similar to PBS
1184 1184
1185 1185 class SGELauncher(PBSLauncher):
1186 1186 """Sun GridEngine is a PBS clone with slightly different syntax"""
1187 1187 job_array_regexp = CRegExp('#\$\W+\-t')
1188 1188 job_array_template = Unicode('#$ -t 1-{n}')
1189 1189 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1190 1190 queue_template = Unicode('#$ -q {queue}')
1191 1191
1192 1192 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1193 1193 """Launch a controller using SGE."""
1194 1194
1195 1195 batch_file_name = Unicode(u'sge_controller', config=True,
1196 1196 help="batch file name for the ipontroller job.")
1197 1197 default_template= Unicode(u"""#$ -V
1198 1198 #$ -S /bin/sh
1199 1199 #$ -N ipcontroller
1200 1200 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1201 1201 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1202 1202
1203 1203 def start(self):
1204 1204 """Start the controller by profile or profile_dir."""
1205 1205 return super(SGEControllerLauncher, self).start(1)
1206 1206
1207 1207 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1208 1208 """Launch Engines with SGE"""
1209 1209 batch_file_name = Unicode(u'sge_engines', config=True,
1210 1210 help="batch file name for the engine(s) job.")
1211 1211 default_template = Unicode("""#$ -V
1212 1212 #$ -S /bin/sh
1213 1213 #$ -N ipengine
1214 1214 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1215 1215 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1216 1216
1217 1217 def start(self, n):
1218 1218 """Start n engines by profile or profile_dir."""
1219 1219 return super(SGEEngineSetLauncher, self).start(n)
1220 1220
1221 1221
1222 1222 # LSF launchers
1223 1223
1224 1224 class LSFLauncher(BatchSystemLauncher):
1225 1225 """A BatchSystemLauncher subclass for LSF."""
1226 1226
1227 1227 submit_command = List(['bsub'], config=True,
1228 1228 help="The PBS submit command ['bsub']")
1229 1229 delete_command = List(['bkill'], config=True,
1230 1230 help="The PBS delete command ['bkill']")
1231 1231 job_id_regexp = CRegExp(r'\d+', config=True,
1232 1232 help="Regular expresion for identifying the job ID [r'\d+']")
1233 1233
1234 1234 batch_file = Unicode(u'')
1235 1235 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1236 1236 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1237 1237 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1238 1238 queue_template = Unicode('#BSUB -q {queue}')
1239 1239
1240 1240 def start(self, n):
1241 1241 """Start n copies of the process using LSF batch system.
1242 1242 This cant inherit from the base class because bsub expects
1243 1243 to be piped a shell script in order to honor the #BSUB directives :
1244 1244 bsub < script
1245 1245 """
1246 1246 # Here we save profile_dir in the context so they
1247 1247 # can be used in the batch script template as {profile_dir}
1248 1248 self.write_batch_script(n)
1249 1249 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1250 1250 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1251 1251 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1252 1252 output,err = p.communicate()
1253 1253 output = output.decode(DEFAULT_ENCODING, 'replace')
1254 1254 job_id = self.parse_job_id(output)
1255 1255 self.notify_start(job_id)
1256 1256 return job_id
1257 1257
1258 1258
1259 1259 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1260 1260 """Launch a controller using LSF."""
1261 1261
1262 1262 batch_file_name = Unicode(u'lsf_controller', config=True,
1263 1263 help="batch file name for the controller job.")
1264 1264 default_template= Unicode("""#!/bin/sh
1265 1265 #BSUB -J ipcontroller
1266 1266 #BSUB -oo ipcontroller.o.%%J
1267 1267 #BSUB -eo ipcontroller.e.%%J
1268 1268 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1269 1269 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1270 1270
1271 1271 def start(self):
1272 1272 """Start the controller by profile or profile_dir."""
1273 1273 return super(LSFControllerLauncher, self).start(1)
1274 1274
1275 1275
1276 1276 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1277 1277 """Launch Engines using LSF"""
1278 1278 batch_file_name = Unicode(u'lsf_engines', config=True,
1279 1279 help="batch file name for the engine(s) job.")
1280 1280 default_template= Unicode(u"""#!/bin/sh
1281 1281 #BSUB -oo ipengine.o.%%J
1282 1282 #BSUB -eo ipengine.e.%%J
1283 1283 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1284 1284 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1285 1285
1286 1286 def start(self, n):
1287 1287 """Start n engines by profile or profile_dir."""
1288 1288 return super(LSFEngineSetLauncher, self).start(n)
1289 1289
1290 1290
1291 1291 #-----------------------------------------------------------------------------
1292 1292 # A launcher for ipcluster itself!
1293 1293 #-----------------------------------------------------------------------------
1294 1294
1295 1295
1296 1296 class IPClusterLauncher(LocalProcessLauncher):
1297 1297 """Launch the ipcluster program in an external process."""
1298 1298
1299 1299 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1300 1300 help="Popen command for ipcluster")
1301 1301 ipcluster_args = List(
1302 1302 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1303 1303 help="Command line arguments to pass to ipcluster.")
1304 1304 ipcluster_subcommand = Unicode('start')
1305 1305 profile = Unicode('default')
1306 1306 n = Integer(2)
1307 1307
1308 1308 def find_args(self):
1309 1309 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1310 1310 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1311 1311 self.ipcluster_args
1312 1312
1313 1313 def start(self):
1314 1314 return super(IPClusterLauncher, self).start()
1315 1315
1316 1316 #-----------------------------------------------------------------------------
1317 1317 # Collections of launchers
1318 1318 #-----------------------------------------------------------------------------
1319 1319
1320 1320 local_launchers = [
1321 1321 LocalControllerLauncher,
1322 1322 LocalEngineLauncher,
1323 1323 LocalEngineSetLauncher,
1324 1324 ]
1325 1325 mpi_launchers = [
1326 1326 MPILauncher,
1327 1327 MPIControllerLauncher,
1328 1328 MPIEngineSetLauncher,
1329 1329 ]
1330 1330 ssh_launchers = [
1331 1331 SSHLauncher,
1332 1332 SSHControllerLauncher,
1333 1333 SSHEngineLauncher,
1334 1334 SSHEngineSetLauncher,
1335 SSHProxyEngineSetLauncher,
1335 1336 ]
1336 1337 winhpc_launchers = [
1337 1338 WindowsHPCLauncher,
1338 1339 WindowsHPCControllerLauncher,
1339 1340 WindowsHPCEngineSetLauncher,
1340 1341 ]
1341 1342 pbs_launchers = [
1342 1343 PBSLauncher,
1343 1344 PBSControllerLauncher,
1344 1345 PBSEngineSetLauncher,
1345 1346 ]
1346 1347 sge_launchers = [
1347 1348 SGELauncher,
1348 1349 SGEControllerLauncher,
1349 1350 SGEEngineSetLauncher,
1350 1351 ]
1351 1352 lsf_launchers = [
1352 1353 LSFLauncher,
1353 1354 LSFControllerLauncher,
1354 1355 LSFEngineSetLauncher,
1355 1356 ]
1356 1357 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1357 1358 + pbs_launchers + sge_launchers + lsf_launchers
1358 1359
General Comments 0
You need to be logged in to leave comments. Login now