##// END OF EJS Templates
Backport PR #2270: SSHLauncher tweaks...
MinRK -
Show More
@@ -1,1342 +1,1345 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.path import get_home_dir
65 65 from IPython.utils.process import find_cmd, FindCmdError
66 66
67 67 from .win32support import forward_read_events
68 68
69 69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70 70
71 71 WINDOWS = os.name == 'nt'
72 72
73 73 #-----------------------------------------------------------------------------
74 74 # Paths to the kernel apps
75 75 #-----------------------------------------------------------------------------
76 76
77 77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78 78
79 79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 80
81 81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 82
83 83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84 84
85 85 #-----------------------------------------------------------------------------
86 86 # Base launchers and errors
87 87 #-----------------------------------------------------------------------------
88 88
89 89
90 90 class LauncherError(Exception):
91 91 pass
92 92
93 93
94 94 class ProcessStateError(LauncherError):
95 95 pass
96 96
97 97
98 98 class UnknownStatus(LauncherError):
99 99 pass
100 100
101 101
102 102 class BaseLauncher(LoggingConfigurable):
103 103 """An asbtraction for starting, stopping and signaling a process."""
104 104
105 105 # In all of the launchers, the work_dir is where child processes will be
106 106 # run. This will usually be the profile_dir, but may not be. any work_dir
107 107 # passed into the __init__ method will override the config value.
108 108 # This should not be used to set the work_dir for the actual engine
109 109 # and controller. Instead, use their own config files or the
110 110 # controller_args, engine_args attributes of the launchers to add
111 111 # the work_dir option.
112 112 work_dir = Unicode(u'.')
113 113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114 114
115 115 start_data = Any()
116 116 stop_data = Any()
117 117
118 118 def _loop_default(self):
119 119 return ioloop.IOLoop.instance()
120 120
121 121 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 123 self.state = 'before' # can be before, running, after
124 124 self.stop_callbacks = []
125 125 self.start_data = None
126 126 self.stop_data = None
127 127
128 128 @property
129 129 def args(self):
130 130 """A list of cmd and args that will be used to start the process.
131 131
132 132 This is what is passed to :func:`spawnProcess` and the first element
133 133 will be the process name.
134 134 """
135 135 return self.find_args()
136 136
137 137 def find_args(self):
138 138 """The ``.args`` property calls this to find the args list.
139 139
140 140 Subcommand should implement this to construct the cmd and args.
141 141 """
142 142 raise NotImplementedError('find_args must be implemented in a subclass')
143 143
144 144 @property
145 145 def arg_str(self):
146 146 """The string form of the program arguments."""
147 147 return ' '.join(self.args)
148 148
149 149 @property
150 150 def running(self):
151 151 """Am I running."""
152 152 if self.state == 'running':
153 153 return True
154 154 else:
155 155 return False
156 156
157 157 def start(self):
158 158 """Start the process."""
159 159 raise NotImplementedError('start must be implemented in a subclass')
160 160
161 161 def stop(self):
162 162 """Stop the process and notify observers of stopping.
163 163
164 164 This method will return None immediately.
165 165 To observe the actual process stopping, see :meth:`on_stop`.
166 166 """
167 167 raise NotImplementedError('stop must be implemented in a subclass')
168 168
169 169 def on_stop(self, f):
170 170 """Register a callback to be called with this Launcher's stop_data
171 171 when the process actually finishes.
172 172 """
173 173 if self.state=='after':
174 174 return f(self.stop_data)
175 175 else:
176 176 self.stop_callbacks.append(f)
177 177
178 178 def notify_start(self, data):
179 179 """Call this to trigger startup actions.
180 180
181 181 This logs the process startup and sets the state to 'running'. It is
182 182 a pass-through so it can be used as a callback.
183 183 """
184 184
185 185 self.log.debug('Process %r started: %r', self.args[0], data)
186 186 self.start_data = data
187 187 self.state = 'running'
188 188 return data
189 189
190 190 def notify_stop(self, data):
191 191 """Call this to trigger process stop actions.
192 192
193 193 This logs the process stopping and sets the state to 'after'. Call
194 194 this to trigger callbacks registered via :meth:`on_stop`."""
195 195
196 196 self.log.debug('Process %r stopped: %r', self.args[0], data)
197 197 self.stop_data = data
198 198 self.state = 'after'
199 199 for i in range(len(self.stop_callbacks)):
200 200 d = self.stop_callbacks.pop()
201 201 d(data)
202 202 return data
203 203
204 204 def signal(self, sig):
205 205 """Signal the process.
206 206
207 207 Parameters
208 208 ----------
209 209 sig : str or int
210 210 'KILL', 'INT', etc., or any signal number
211 211 """
212 212 raise NotImplementedError('signal must be implemented in a subclass')
213 213
214 214 class ClusterAppMixin(HasTraits):
215 215 """MixIn for cluster args as traits"""
216 216 profile_dir=Unicode('')
217 217 cluster_id=Unicode('')
218 218
219 219 @property
220 220 def cluster_args(self):
221 221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
222 222
223 223 class ControllerMixin(ClusterAppMixin):
224 224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
225 225 help="""Popen command to launch ipcontroller.""")
226 226 # Command line arguments to ipcontroller.
227 227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
228 228 help="""command-line args to pass to ipcontroller""")
229 229
230 230 class EngineMixin(ClusterAppMixin):
231 231 engine_cmd = List(ipengine_cmd_argv, config=True,
232 232 help="""command to launch the Engine.""")
233 233 # Command line arguments for ipengine.
234 234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 235 help="command-line arguments to pass to ipengine"
236 236 )
237 237
238 238
239 239 #-----------------------------------------------------------------------------
240 240 # Local process launchers
241 241 #-----------------------------------------------------------------------------
242 242
243 243
244 244 class LocalProcessLauncher(BaseLauncher):
245 245 """Start and stop an external process in an asynchronous manner.
246 246
247 247 This will launch the external process with a working directory of
248 248 ``self.work_dir``.
249 249 """
250 250
251 251 # This is used to to construct self.args, which is passed to
252 252 # spawnProcess.
253 253 cmd_and_args = List([])
254 254 poll_frequency = Integer(100) # in ms
255 255
256 256 def __init__(self, work_dir=u'.', config=None, **kwargs):
257 257 super(LocalProcessLauncher, self).__init__(
258 258 work_dir=work_dir, config=config, **kwargs
259 259 )
260 260 self.process = None
261 261 self.poller = None
262 262
263 263 def find_args(self):
264 264 return self.cmd_and_args
265 265
266 266 def start(self):
267 267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
268 268 if self.state == 'before':
269 269 self.process = Popen(self.args,
270 270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
271 271 env=os.environ,
272 272 cwd=self.work_dir
273 273 )
274 274 if WINDOWS:
275 275 self.stdout = forward_read_events(self.process.stdout)
276 276 self.stderr = forward_read_events(self.process.stderr)
277 277 else:
278 278 self.stdout = self.process.stdout.fileno()
279 279 self.stderr = self.process.stderr.fileno()
280 280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
281 281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
282 282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
283 283 self.poller.start()
284 284 self.notify_start(self.process.pid)
285 285 else:
286 286 s = 'The process was already started and has state: %r' % self.state
287 287 raise ProcessStateError(s)
288 288
289 289 def stop(self):
290 290 return self.interrupt_then_kill()
291 291
292 292 def signal(self, sig):
293 293 if self.state == 'running':
294 294 if WINDOWS and sig != SIGINT:
295 295 # use Windows tree-kill for better child cleanup
296 296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
297 297 else:
298 298 self.process.send_signal(sig)
299 299
300 300 def interrupt_then_kill(self, delay=2.0):
301 301 """Send INT, wait a delay and then send KILL."""
302 302 try:
303 303 self.signal(SIGINT)
304 304 except Exception:
305 305 self.log.debug("interrupt failed")
306 306 pass
307 307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
308 308 self.killer.start()
309 309
310 310 # callbacks, etc:
311 311
312 312 def handle_stdout(self, fd, events):
313 313 if WINDOWS:
314 314 line = self.stdout.recv()
315 315 else:
316 316 line = self.process.stdout.readline()
317 317 # a stopped process will be readable but return empty strings
318 318 if line:
319 319 self.log.debug(line[:-1])
320 320 else:
321 321 self.poll()
322 322
323 323 def handle_stderr(self, fd, events):
324 324 if WINDOWS:
325 325 line = self.stderr.recv()
326 326 else:
327 327 line = self.process.stderr.readline()
328 328 # a stopped process will be readable but return empty strings
329 329 if line:
330 330 self.log.debug(line[:-1])
331 331 else:
332 332 self.poll()
333 333
334 334 def poll(self):
335 335 status = self.process.poll()
336 336 if status is not None:
337 337 self.poller.stop()
338 338 self.loop.remove_handler(self.stdout)
339 339 self.loop.remove_handler(self.stderr)
340 340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
341 341 return status
342 342
343 343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
344 344 """Launch a controller as a regular external process."""
345 345
346 346 def find_args(self):
347 347 return self.controller_cmd + self.cluster_args + self.controller_args
348 348
349 349 def start(self):
350 350 """Start the controller by profile_dir."""
351 351 return super(LocalControllerLauncher, self).start()
352 352
353 353
354 354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
355 355 """Launch a single engine as a regular externall process."""
356 356
357 357 def find_args(self):
358 358 return self.engine_cmd + self.cluster_args + self.engine_args
359 359
360 360
361 361 class LocalEngineSetLauncher(LocalEngineLauncher):
362 362 """Launch a set of engines as regular external processes."""
363 363
364 364 delay = CFloat(0.1, config=True,
365 365 help="""delay (in seconds) between starting each engine after the first.
366 366 This can help force the engines to get their ids in order, or limit
367 367 process flood when starting many engines."""
368 368 )
369 369
370 370 # launcher class
371 371 launcher_class = LocalEngineLauncher
372 372
373 373 launchers = Dict()
374 374 stop_data = Dict()
375 375
376 376 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 377 super(LocalEngineSetLauncher, self).__init__(
378 378 work_dir=work_dir, config=config, **kwargs
379 379 )
380 380 self.stop_data = {}
381 381
382 382 def start(self, n):
383 383 """Start n engines by profile or profile_dir."""
384 384 dlist = []
385 385 for i in range(n):
386 386 if i > 0:
387 387 time.sleep(self.delay)
388 388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
389 389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
390 390 )
391 391
392 392 # Copy the engine args over to each engine launcher.
393 393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
394 394 el.engine_args = copy.deepcopy(self.engine_args)
395 395 el.on_stop(self._notice_engine_stopped)
396 396 d = el.start()
397 397 self.launchers[i] = el
398 398 dlist.append(d)
399 399 self.notify_start(dlist)
400 400 return dlist
401 401
402 402 def find_args(self):
403 403 return ['engine set']
404 404
405 405 def signal(self, sig):
406 406 dlist = []
407 407 for el in self.launchers.itervalues():
408 408 d = el.signal(sig)
409 409 dlist.append(d)
410 410 return dlist
411 411
412 412 def interrupt_then_kill(self, delay=1.0):
413 413 dlist = []
414 414 for el in self.launchers.itervalues():
415 415 d = el.interrupt_then_kill(delay)
416 416 dlist.append(d)
417 417 return dlist
418 418
419 419 def stop(self):
420 420 return self.interrupt_then_kill()
421 421
422 422 def _notice_engine_stopped(self, data):
423 423 pid = data['pid']
424 424 for idx,el in self.launchers.iteritems():
425 425 if el.process.pid == pid:
426 426 break
427 427 self.launchers.pop(idx)
428 428 self.stop_data[idx] = data
429 429 if not self.launchers:
430 430 self.notify_stop(self.stop_data)
431 431
432 432
433 433 #-----------------------------------------------------------------------------
434 434 # MPI launchers
435 435 #-----------------------------------------------------------------------------
436 436
437 437
438 438 class MPILauncher(LocalProcessLauncher):
439 439 """Launch an external process using mpiexec."""
440 440
441 441 mpi_cmd = List(['mpiexec'], config=True,
442 442 help="The mpiexec command to use in starting the process."
443 443 )
444 444 mpi_args = List([], config=True,
445 445 help="The command line arguments to pass to mpiexec."
446 446 )
447 447 program = List(['date'],
448 448 help="The program to start via mpiexec.")
449 449 program_args = List([],
450 450 help="The command line argument to the program."
451 451 )
452 452 n = Integer(1)
453 453
454 454 def __init__(self, *args, **kwargs):
455 455 # deprecation for old MPIExec names:
456 456 config = kwargs.get('config', {})
457 457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
458 458 deprecated = config.get(oldname)
459 459 if deprecated:
460 460 newname = oldname.replace('MPIExec', 'MPI')
461 461 config[newname].update(deprecated)
462 462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
463 463
464 464 super(MPILauncher, self).__init__(*args, **kwargs)
465 465
466 466 def find_args(self):
467 467 """Build self.args using all the fields."""
468 468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 469 self.program + self.program_args
470 470
471 471 def start(self, n):
472 472 """Start n instances of the program using mpiexec."""
473 473 self.n = n
474 474 return super(MPILauncher, self).start()
475 475
476 476
477 477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
478 478 """Launch a controller using mpiexec."""
479 479
480 480 # alias back to *non-configurable* program[_args] for use in find_args()
481 481 # this way all Controller/EngineSetLaunchers have the same form, rather
482 482 # than *some* having `program_args` and others `controller_args`
483 483 @property
484 484 def program(self):
485 485 return self.controller_cmd
486 486
487 487 @property
488 488 def program_args(self):
489 489 return self.cluster_args + self.controller_args
490 490
491 491 def start(self):
492 492 """Start the controller by profile_dir."""
493 493 return super(MPIControllerLauncher, self).start(1)
494 494
495 495
496 496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
497 497 """Launch engines using mpiexec"""
498 498
499 499 # alias back to *non-configurable* program[_args] for use in find_args()
500 500 # this way all Controller/EngineSetLaunchers have the same form, rather
501 501 # than *some* having `program_args` and others `controller_args`
502 502 @property
503 503 def program(self):
504 504 return self.engine_cmd
505 505
506 506 @property
507 507 def program_args(self):
508 508 return self.cluster_args + self.engine_args
509 509
510 510 def start(self, n):
511 511 """Start n engines by profile or profile_dir."""
512 512 self.n = n
513 513 return super(MPIEngineSetLauncher, self).start(n)
514 514
515 515 # deprecated MPIExec names
516 516 class DeprecatedMPILauncher(object):
517 517 def warn(self):
518 518 oldname = self.__class__.__name__
519 519 newname = oldname.replace('MPIExec', 'MPI')
520 520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
521 521
522 522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
523 523 """Deprecated, use MPILauncher"""
524 524 def __init__(self, *args, **kwargs):
525 525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
526 526 self.warn()
527 527
528 528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
529 529 """Deprecated, use MPIControllerLauncher"""
530 530 def __init__(self, *args, **kwargs):
531 531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
532 532 self.warn()
533 533
534 534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
535 535 """Deprecated, use MPIEngineSetLauncher"""
536 536 def __init__(self, *args, **kwargs):
537 537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
538 538 self.warn()
539 539
540 540
541 541 #-----------------------------------------------------------------------------
542 542 # SSH launchers
543 543 #-----------------------------------------------------------------------------
544 544
545 545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
546 546
547 547 class SSHLauncher(LocalProcessLauncher):
548 548 """A minimal launcher for ssh.
549 549
550 550 To be useful this will probably have to be extended to use the ``sshx``
551 551 idea for environment variables. There could be other things this needs
552 552 as well.
553 553 """
554 554
555 555 ssh_cmd = List(['ssh'], config=True,
556 556 help="command for starting ssh")
557 557 ssh_args = List(['-tt'], config=True,
558 558 help="args to pass to ssh")
559 559 scp_cmd = List(['scp'], config=True,
560 560 help="command for sending files")
561 561 program = List(['date'],
562 562 help="Program to launch via ssh")
563 563 program_args = List([],
564 564 help="args to pass to remote program")
565 565 hostname = Unicode('', config=True,
566 566 help="hostname on which to launch the program")
567 567 user = Unicode('', config=True,
568 568 help="username for ssh")
569 569 location = Unicode('', config=True,
570 570 help="user@hostname location for ssh in one setting")
571 571 to_fetch = List([], config=True,
572 572 help="List of (remote, local) files to fetch after starting")
573 573 to_send = List([], config=True,
574 574 help="List of (local, remote) files to send before starting")
575 575
576 576 def _hostname_changed(self, name, old, new):
577 577 if self.user:
578 578 self.location = u'%s@%s' % (self.user, new)
579 579 else:
580 580 self.location = new
581 581
582 582 def _user_changed(self, name, old, new):
583 583 self.location = u'%s@%s' % (new, self.hostname)
584 584
585 585 def find_args(self):
586 586 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 self.program + self.program_args
587 list(map(pipes.quote, self.program + self.program_args))
588 588
589 589 def _send_file(self, local, remote):
590 590 """send a single file"""
591 591 remote = "%s:%s" % (self.location, remote)
592 592 for i in range(10):
593 593 if not os.path.exists(local):
594 594 self.log.debug("waiting for %s" % local)
595 595 time.sleep(1)
596 596 else:
597 597 break
598 598 self.log.info("sending %s to %s", local, remote)
599 599 check_output(self.scp_cmd + [local, remote])
600 600
601 601 def send_files(self):
602 602 """send our files (called before start)"""
603 603 if not self.to_send:
604 604 return
605 605 for local_file, remote_file in self.to_send:
606 606 self._send_file(local_file, remote_file)
607 607
608 608 def _fetch_file(self, remote, local):
609 609 """fetch a single file"""
610 610 full_remote = "%s:%s" % (self.location, remote)
611 611 self.log.info("fetching %s from %s", local, full_remote)
612 612 for i in range(10):
613 613 # wait up to 10s for remote file to exist
614 614 check = check_output(self.ssh_cmd + self.ssh_args + \
615 615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
616 616 check = check.strip()
617 617 if check == 'no':
618 618 time.sleep(1)
619 619 elif check == 'yes':
620 620 break
621 621 check_output(self.scp_cmd + [full_remote, local])
622 622
623 623 def fetch_files(self):
624 624 """fetch remote files (called after start)"""
625 625 if not self.to_fetch:
626 626 return
627 627 for remote_file, local_file in self.to_fetch:
628 628 self._fetch_file(remote_file, local_file)
629 629
630 630 def start(self, hostname=None, user=None):
631 631 if hostname is not None:
632 632 self.hostname = hostname
633 633 if user is not None:
634 634 self.user = user
635 635
636 636 self.send_files()
637 637 super(SSHLauncher, self).start()
638 638 self.fetch_files()
639 639
640 640 def signal(self, sig):
641 641 if self.state == 'running':
642 642 # send escaped ssh connection-closer
643 643 self.process.stdin.write('~.')
644 644 self.process.stdin.flush()
645 645
646 646 class SSHClusterLauncher(SSHLauncher):
647 647
648 648 remote_profile_dir = Unicode('', config=True,
649 649 help="""The remote profile_dir to use.
650 650
651 651 If not specified, use calling profile, stripping out possible leading homedir.
652 652 """)
653 653
654 654 def _remote_profile_dir_default(self):
655 655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
656 656 """
657 657 home = get_home_dir()
658 658 if not home.endswith('/'):
659 659 home = home+'/'
660 660
661 661 if self.profile_dir.startswith(home):
662 662 return self.profile_dir[len(home):]
663 663 else:
664 664 return self.profile_dir
665 665
666 666 def _cluster_id_changed(self, name, old, new):
667 667 if new:
668 668 raise ValueError("cluster id not supported by SSH launchers")
669 669
670 670 @property
671 671 def cluster_args(self):
672 672 return ['--profile-dir', self.remote_profile_dir]
673 673
674 674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
675 675
676 676 # alias back to *non-configurable* program[_args] for use in find_args()
677 677 # this way all Controller/EngineSetLaunchers have the same form, rather
678 678 # than *some* having `program_args` and others `controller_args`
679 679
680 680 def _controller_cmd_default(self):
681 681 return ['ipcontroller']
682 682
683 683 @property
684 684 def program(self):
685 685 return self.controller_cmd
686 686
687 687 @property
688 688 def program_args(self):
689 689 return self.cluster_args + self.controller_args
690 690
691 691 def _to_fetch_default(self):
692 692 return [
693 693 (os.path.join(self.remote_profile_dir, 'security', cf),
694 694 os.path.join(self.profile_dir, 'security', cf),)
695 695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
696 696 ]
697 697
698 698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
699 699
700 700 # alias back to *non-configurable* program[_args] for use in find_args()
701 701 # this way all Controller/EngineSetLaunchers have the same form, rather
702 702 # than *some* having `program_args` and others `controller_args`
703 703
704 704 def _engine_cmd_default(self):
705 705 return ['ipengine']
706 706
707 707 @property
708 708 def program(self):
709 709 return self.engine_cmd
710 710
711 711 @property
712 712 def program_args(self):
713 713 return self.cluster_args + self.engine_args
714 714
715 715 def _to_send_default(self):
716 716 return [
717 717 (os.path.join(self.profile_dir, 'security', cf),
718 718 os.path.join(self.remote_profile_dir, 'security', cf))
719 719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
720 720 ]
721 721
722 722
723 723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
724 724 launcher_class = SSHEngineLauncher
725 725 engines = Dict(config=True,
726 726 help="""dict of engines to launch. This is a dict by hostname of ints,
727 727 corresponding to the number of engines to start on that host.""")
728 728
729 def _engine_cmd_default(self):
730 return ['ipengine']
731
729 732 @property
730 733 def engine_count(self):
731 734 """determine engine count from `engines` dict"""
732 735 count = 0
733 736 for n in self.engines.itervalues():
734 737 if isinstance(n, (tuple,list)):
735 738 n,args = n
736 739 count += n
737 740 return count
738 741
739 742 def start(self, n):
740 743 """Start engines by profile or profile_dir.
741 744 `n` is ignored, and the `engines` config property is used instead.
742 745 """
743 746
744 747 dlist = []
745 748 for host, n in self.engines.iteritems():
746 749 if isinstance(n, (tuple, list)):
747 750 n, args = n
748 751 else:
749 752 args = copy.deepcopy(self.engine_args)
750 753
751 754 if '@' in host:
752 755 user,host = host.split('@',1)
753 756 else:
754 757 user=None
755 758 for i in range(n):
756 759 if i > 0:
757 760 time.sleep(self.delay)
758 761 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
759 762 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
760 763 )
761 764 if i > 0:
762 765 # only send files for the first engine on each host
763 766 el.to_send = []
764 767
765 768 # Copy the engine args over to each engine launcher.
766 769 el.engine_cmd = self.engine_cmd
767 770 el.engine_args = args
768 771 el.on_stop(self._notice_engine_stopped)
769 772 d = el.start(user=user, hostname=host)
770 773 self.launchers[ "%s/%i" % (host,i) ] = el
771 774 dlist.append(d)
772 775 self.notify_start(dlist)
773 776 return dlist
774 777
775 778
776 779 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
777 780 """Launcher for calling
778 781 `ipcluster engines` on a remote machine.
779 782
780 783 Requires that remote profile is already configured.
781 784 """
782 785
783 786 n = Integer()
784 787 ipcluster_cmd = List(['ipcluster'], config=True)
785 788
786 789 @property
787 790 def program(self):
788 791 return self.ipcluster_cmd + ['engines']
789 792
790 793 @property
791 794 def program_args(self):
792 795 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
793 796
794 797 def _to_send_default(self):
795 798 return [
796 799 (os.path.join(self.profile_dir, 'security', cf),
797 800 os.path.join(self.remote_profile_dir, 'security', cf))
798 801 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
799 802 ]
800 803
801 804 def start(self, n):
802 805 self.n = n
803 806 super(SSHProxyEngineSetLauncher, self).start()
804 807
805 808
806 809 #-----------------------------------------------------------------------------
807 810 # Windows HPC Server 2008 scheduler launchers
808 811 #-----------------------------------------------------------------------------
809 812
810 813
811 814 # This is only used on Windows.
812 815 def find_job_cmd():
813 816 if WINDOWS:
814 817 try:
815 818 return find_cmd('job')
816 819 except (FindCmdError, ImportError):
817 820 # ImportError will be raised if win32api is not installed
818 821 return 'job'
819 822 else:
820 823 return 'job'
821 824
822 825
823 826 class WindowsHPCLauncher(BaseLauncher):
824 827
825 828 job_id_regexp = CRegExp(r'\d+', config=True,
826 829 help="""A regular expression used to get the job id from the output of the
827 830 submit_command. """
828 831 )
829 832 job_file_name = Unicode(u'ipython_job.xml', config=True,
830 833 help="The filename of the instantiated job script.")
831 834 # The full path to the instantiated job script. This gets made dynamically
832 835 # by combining the work_dir with the job_file_name.
833 836 job_file = Unicode(u'')
834 837 scheduler = Unicode('', config=True,
835 838 help="The hostname of the scheduler to submit the job to.")
836 839 job_cmd = Unicode(find_job_cmd(), config=True,
837 840 help="The command for submitting jobs.")
838 841
839 842 def __init__(self, work_dir=u'.', config=None, **kwargs):
840 843 super(WindowsHPCLauncher, self).__init__(
841 844 work_dir=work_dir, config=config, **kwargs
842 845 )
843 846
844 847 @property
845 848 def job_file(self):
846 849 return os.path.join(self.work_dir, self.job_file_name)
847 850
848 851 def write_job_file(self, n):
849 852 raise NotImplementedError("Implement write_job_file in a subclass.")
850 853
851 854 def find_args(self):
852 855 return [u'job.exe']
853 856
854 857 def parse_job_id(self, output):
855 858 """Take the output of the submit command and return the job id."""
856 859 m = self.job_id_regexp.search(output)
857 860 if m is not None:
858 861 job_id = m.group()
859 862 else:
860 863 raise LauncherError("Job id couldn't be determined: %s" % output)
861 864 self.job_id = job_id
862 865 self.log.info('Job started with id: %r', job_id)
863 866 return job_id
864 867
865 868 def start(self, n):
866 869 """Start n copies of the process using the Win HPC job scheduler."""
867 870 self.write_job_file(n)
868 871 args = [
869 872 'submit',
870 873 '/jobfile:%s' % self.job_file,
871 874 '/scheduler:%s' % self.scheduler
872 875 ]
873 876 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
874 877
875 878 output = check_output([self.job_cmd]+args,
876 879 env=os.environ,
877 880 cwd=self.work_dir,
878 881 stderr=STDOUT
879 882 )
880 883 job_id = self.parse_job_id(output)
881 884 self.notify_start(job_id)
882 885 return job_id
883 886
884 887 def stop(self):
885 888 args = [
886 889 'cancel',
887 890 self.job_id,
888 891 '/scheduler:%s' % self.scheduler
889 892 ]
890 893 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
891 894 try:
892 895 output = check_output([self.job_cmd]+args,
893 896 env=os.environ,
894 897 cwd=self.work_dir,
895 898 stderr=STDOUT
896 899 )
897 900 except:
898 901 output = 'The job already appears to be stoppped: %r' % self.job_id
899 902 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
900 903 return output
901 904
902 905
903 906 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
904 907
905 908 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
906 909 help="WinHPC xml job file.")
907 910 controller_args = List([], config=False,
908 911 help="extra args to pass to ipcontroller")
909 912
910 913 def write_job_file(self, n):
911 914 job = IPControllerJob(config=self.config)
912 915
913 916 t = IPControllerTask(config=self.config)
914 917 # The tasks work directory is *not* the actual work directory of
915 918 # the controller. It is used as the base path for the stdout/stderr
916 919 # files that the scheduler redirects to.
917 920 t.work_directory = self.profile_dir
918 921 # Add the profile_dir and from self.start().
919 922 t.controller_args.extend(self.cluster_args)
920 923 t.controller_args.extend(self.controller_args)
921 924 job.add_task(t)
922 925
923 926 self.log.debug("Writing job description file: %s", self.job_file)
924 927 job.write(self.job_file)
925 928
926 929 @property
927 930 def job_file(self):
928 931 return os.path.join(self.profile_dir, self.job_file_name)
929 932
930 933 def start(self):
931 934 """Start the controller by profile_dir."""
932 935 return super(WindowsHPCControllerLauncher, self).start(1)
933 936
934 937
935 938 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
936 939
937 940 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
938 941 help="jobfile for ipengines job")
939 942 engine_args = List([], config=False,
940 943 help="extra args to pas to ipengine")
941 944
942 945 def write_job_file(self, n):
943 946 job = IPEngineSetJob(config=self.config)
944 947
945 948 for i in range(n):
946 949 t = IPEngineTask(config=self.config)
947 950 # The tasks work directory is *not* the actual work directory of
948 951 # the engine. It is used as the base path for the stdout/stderr
949 952 # files that the scheduler redirects to.
950 953 t.work_directory = self.profile_dir
951 954 # Add the profile_dir and from self.start().
952 955 t.engine_args.extend(self.cluster_args)
953 956 t.engine_args.extend(self.engine_args)
954 957 job.add_task(t)
955 958
956 959 self.log.debug("Writing job description file: %s", self.job_file)
957 960 job.write(self.job_file)
958 961
959 962 @property
960 963 def job_file(self):
961 964 return os.path.join(self.profile_dir, self.job_file_name)
962 965
963 966 def start(self, n):
964 967 """Start the controller by profile_dir."""
965 968 return super(WindowsHPCEngineSetLauncher, self).start(n)
966 969
967 970
968 971 #-----------------------------------------------------------------------------
969 972 # Batch (PBS) system launchers
970 973 #-----------------------------------------------------------------------------
971 974
972 975 class BatchClusterAppMixin(ClusterAppMixin):
973 976 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
974 977 def _profile_dir_changed(self, name, old, new):
975 978 self.context[name] = new
976 979 _cluster_id_changed = _profile_dir_changed
977 980
978 981 def _profile_dir_default(self):
979 982 self.context['profile_dir'] = ''
980 983 return ''
981 984 def _cluster_id_default(self):
982 985 self.context['cluster_id'] = ''
983 986 return ''
984 987
985 988
986 989 class BatchSystemLauncher(BaseLauncher):
987 990 """Launch an external process using a batch system.
988 991
989 992 This class is designed to work with UNIX batch systems like PBS, LSF,
990 993 GridEngine, etc. The overall model is that there are different commands
991 994 like qsub, qdel, etc. that handle the starting and stopping of the process.
992 995
993 996 This class also has the notion of a batch script. The ``batch_template``
994 997 attribute can be set to a string that is a template for the batch script.
995 998 This template is instantiated using string formatting. Thus the template can
996 999 use {n} fot the number of instances. Subclasses can add additional variables
997 1000 to the template dict.
998 1001 """
999 1002
1000 1003 # Subclasses must fill these in. See PBSEngineSet
1001 1004 submit_command = List([''], config=True,
1002 1005 help="The name of the command line program used to submit jobs.")
1003 1006 delete_command = List([''], config=True,
1004 1007 help="The name of the command line program used to delete jobs.")
1005 1008 job_id_regexp = CRegExp('', config=True,
1006 1009 help="""A regular expression used to get the job id from the output of the
1007 1010 submit_command.""")
1008 1011 batch_template = Unicode('', config=True,
1009 1012 help="The string that is the batch script template itself.")
1010 1013 batch_template_file = Unicode(u'', config=True,
1011 1014 help="The file that contains the batch template.")
1012 1015 batch_file_name = Unicode(u'batch_script', config=True,
1013 1016 help="The filename of the instantiated batch script.")
1014 1017 queue = Unicode(u'', config=True,
1015 1018 help="The PBS Queue.")
1016 1019
1017 1020 def _queue_changed(self, name, old, new):
1018 1021 self.context[name] = new
1019 1022
1020 1023 n = Integer(1)
1021 1024 _n_changed = _queue_changed
1022 1025
1023 1026 # not configurable, override in subclasses
1024 1027 # PBS Job Array regex
1025 1028 job_array_regexp = CRegExp('')
1026 1029 job_array_template = Unicode('')
1027 1030 # PBS Queue regex
1028 1031 queue_regexp = CRegExp('')
1029 1032 queue_template = Unicode('')
1030 1033 # The default batch template, override in subclasses
1031 1034 default_template = Unicode('')
1032 1035 # The full path to the instantiated batch script.
1033 1036 batch_file = Unicode(u'')
1034 1037 # the format dict used with batch_template:
1035 1038 context = Dict()
1036 1039 def _context_default(self):
1037 1040 """load the default context with the default values for the basic keys
1038 1041
1039 1042 because the _trait_changed methods only load the context if they
1040 1043 are set to something other than the default value.
1041 1044 """
1042 1045 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1043 1046
1044 1047 # the Formatter instance for rendering the templates:
1045 1048 formatter = Instance(EvalFormatter, (), {})
1046 1049
1047 1050
1048 1051 def find_args(self):
1049 1052 return self.submit_command + [self.batch_file]
1050 1053
1051 1054 def __init__(self, work_dir=u'.', config=None, **kwargs):
1052 1055 super(BatchSystemLauncher, self).__init__(
1053 1056 work_dir=work_dir, config=config, **kwargs
1054 1057 )
1055 1058 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1056 1059
1057 1060 def parse_job_id(self, output):
1058 1061 """Take the output of the submit command and return the job id."""
1059 1062 m = self.job_id_regexp.search(output)
1060 1063 if m is not None:
1061 1064 job_id = m.group()
1062 1065 else:
1063 1066 raise LauncherError("Job id couldn't be determined: %s" % output)
1064 1067 self.job_id = job_id
1065 1068 self.log.info('Job submitted with job id: %r', job_id)
1066 1069 return job_id
1067 1070
1068 1071 def write_batch_script(self, n):
1069 1072 """Instantiate and write the batch script to the work_dir."""
1070 1073 self.n = n
1071 1074 # first priority is batch_template if set
1072 1075 if self.batch_template_file and not self.batch_template:
1073 1076 # second priority is batch_template_file
1074 1077 with open(self.batch_template_file) as f:
1075 1078 self.batch_template = f.read()
1076 1079 if not self.batch_template:
1077 1080 # third (last) priority is default_template
1078 1081 self.batch_template = self.default_template
1079 1082
1080 1083 # add jobarray or queue lines to user-specified template
1081 1084 # note that this is *only* when user did not specify a template.
1082 1085 # print self.job_array_regexp.search(self.batch_template)
1083 1086 if not self.job_array_regexp.search(self.batch_template):
1084 1087 self.log.debug("adding job array settings to batch script")
1085 1088 firstline, rest = self.batch_template.split('\n',1)
1086 1089 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1087 1090
1088 1091 # print self.queue_regexp.search(self.batch_template)
1089 1092 if self.queue and not self.queue_regexp.search(self.batch_template):
1090 1093 self.log.debug("adding PBS queue settings to batch script")
1091 1094 firstline, rest = self.batch_template.split('\n',1)
1092 1095 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1093 1096
1094 1097 script_as_string = self.formatter.format(self.batch_template, **self.context)
1095 1098 self.log.debug('Writing batch script: %s', self.batch_file)
1096 1099
1097 1100 with open(self.batch_file, 'w') as f:
1098 1101 f.write(script_as_string)
1099 1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1100 1103
1101 1104 def start(self, n):
1102 1105 """Start n copies of the process using a batch system."""
1103 1106 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1104 1107 # Here we save profile_dir in the context so they
1105 1108 # can be used in the batch script template as {profile_dir}
1106 1109 self.write_batch_script(n)
1107 1110 output = check_output(self.args, env=os.environ)
1108 1111
1109 1112 job_id = self.parse_job_id(output)
1110 1113 self.notify_start(job_id)
1111 1114 return job_id
1112 1115
1113 1116 def stop(self):
1114 1117 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1115 1118 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1116 1119 return output
1117 1120
1118 1121
1119 1122 class PBSLauncher(BatchSystemLauncher):
1120 1123 """A BatchSystemLauncher subclass for PBS."""
1121 1124
1122 1125 submit_command = List(['qsub'], config=True,
1123 1126 help="The PBS submit command ['qsub']")
1124 1127 delete_command = List(['qdel'], config=True,
1125 1128 help="The PBS delete command ['qsub']")
1126 1129 job_id_regexp = CRegExp(r'\d+', config=True,
1127 1130 help="Regular expresion for identifying the job ID [r'\d+']")
1128 1131
1129 1132 batch_file = Unicode(u'')
1130 1133 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1131 1134 job_array_template = Unicode('#PBS -t 1-{n}')
1132 1135 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1133 1136 queue_template = Unicode('#PBS -q {queue}')
1134 1137
1135 1138
1136 1139 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1137 1140 """Launch a controller using PBS."""
1138 1141
1139 1142 batch_file_name = Unicode(u'pbs_controller', config=True,
1140 1143 help="batch file name for the controller job.")
1141 1144 default_template= Unicode("""#!/bin/sh
1142 1145 #PBS -V
1143 1146 #PBS -N ipcontroller
1144 1147 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1145 1148 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1146 1149
1147 1150
1148 1151 def start(self):
1149 1152 """Start the controller by profile or profile_dir."""
1150 1153 return super(PBSControllerLauncher, self).start(1)
1151 1154
1152 1155
1153 1156 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1154 1157 """Launch Engines using PBS"""
1155 1158 batch_file_name = Unicode(u'pbs_engines', config=True,
1156 1159 help="batch file name for the engine(s) job.")
1157 1160 default_template= Unicode(u"""#!/bin/sh
1158 1161 #PBS -V
1159 1162 #PBS -N ipengine
1160 1163 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1161 1164 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1162 1165
1163 1166 def start(self, n):
1164 1167 """Start n engines by profile or profile_dir."""
1165 1168 return super(PBSEngineSetLauncher, self).start(n)
1166 1169
1167 1170 #SGE is very similar to PBS
1168 1171
1169 1172 class SGELauncher(PBSLauncher):
1170 1173 """Sun GridEngine is a PBS clone with slightly different syntax"""
1171 1174 job_array_regexp = CRegExp('#\$\W+\-t')
1172 1175 job_array_template = Unicode('#$ -t 1-{n}')
1173 1176 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1174 1177 queue_template = Unicode('#$ -q {queue}')
1175 1178
1176 1179 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1177 1180 """Launch a controller using SGE."""
1178 1181
1179 1182 batch_file_name = Unicode(u'sge_controller', config=True,
1180 1183 help="batch file name for the ipontroller job.")
1181 1184 default_template= Unicode(u"""#$ -V
1182 1185 #$ -S /bin/sh
1183 1186 #$ -N ipcontroller
1184 1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1185 1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1186 1189
1187 1190 def start(self):
1188 1191 """Start the controller by profile or profile_dir."""
1189 1192 return super(SGEControllerLauncher, self).start(1)
1190 1193
1191 1194 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1192 1195 """Launch Engines with SGE"""
1193 1196 batch_file_name = Unicode(u'sge_engines', config=True,
1194 1197 help="batch file name for the engine(s) job.")
1195 1198 default_template = Unicode("""#$ -V
1196 1199 #$ -S /bin/sh
1197 1200 #$ -N ipengine
1198 1201 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1199 1202 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1200 1203
1201 1204 def start(self, n):
1202 1205 """Start n engines by profile or profile_dir."""
1203 1206 return super(SGEEngineSetLauncher, self).start(n)
1204 1207
1205 1208
1206 1209 # LSF launchers
1207 1210
1208 1211 class LSFLauncher(BatchSystemLauncher):
1209 1212 """A BatchSystemLauncher subclass for LSF."""
1210 1213
1211 1214 submit_command = List(['bsub'], config=True,
1212 1215 help="The PBS submit command ['bsub']")
1213 1216 delete_command = List(['bkill'], config=True,
1214 1217 help="The PBS delete command ['bkill']")
1215 1218 job_id_regexp = CRegExp(r'\d+', config=True,
1216 1219 help="Regular expresion for identifying the job ID [r'\d+']")
1217 1220
1218 1221 batch_file = Unicode(u'')
1219 1222 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1220 1223 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1221 1224 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1222 1225 queue_template = Unicode('#BSUB -q {queue}')
1223 1226
1224 1227 def start(self, n):
1225 1228 """Start n copies of the process using LSF batch system.
1226 1229 This cant inherit from the base class because bsub expects
1227 1230 to be piped a shell script in order to honor the #BSUB directives :
1228 1231 bsub < script
1229 1232 """
1230 1233 # Here we save profile_dir in the context so they
1231 1234 # can be used in the batch script template as {profile_dir}
1232 1235 self.write_batch_script(n)
1233 1236 #output = check_output(self.args, env=os.environ)
1234 1237 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1235 1238 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1236 1239 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1237 1240 output,err = p.communicate()
1238 1241 job_id = self.parse_job_id(output)
1239 1242 self.notify_start(job_id)
1240 1243 return job_id
1241 1244
1242 1245
1243 1246 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1244 1247 """Launch a controller using LSF."""
1245 1248
1246 1249 batch_file_name = Unicode(u'lsf_controller', config=True,
1247 1250 help="batch file name for the controller job.")
1248 1251 default_template= Unicode("""#!/bin/sh
1249 1252 #BSUB -J ipcontroller
1250 1253 #BSUB -oo ipcontroller.o.%%J
1251 1254 #BSUB -eo ipcontroller.e.%%J
1252 1255 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1253 1256 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1254 1257
1255 1258 def start(self):
1256 1259 """Start the controller by profile or profile_dir."""
1257 1260 return super(LSFControllerLauncher, self).start(1)
1258 1261
1259 1262
1260 1263 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1261 1264 """Launch Engines using LSF"""
1262 1265 batch_file_name = Unicode(u'lsf_engines', config=True,
1263 1266 help="batch file name for the engine(s) job.")
1264 1267 default_template= Unicode(u"""#!/bin/sh
1265 1268 #BSUB -oo ipengine.o.%%J
1266 1269 #BSUB -eo ipengine.e.%%J
1267 1270 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1268 1271 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1269 1272
1270 1273 def start(self, n):
1271 1274 """Start n engines by profile or profile_dir."""
1272 1275 return super(LSFEngineSetLauncher, self).start(n)
1273 1276
1274 1277
1275 1278 #-----------------------------------------------------------------------------
1276 1279 # A launcher for ipcluster itself!
1277 1280 #-----------------------------------------------------------------------------
1278 1281
1279 1282
1280 1283 class IPClusterLauncher(LocalProcessLauncher):
1281 1284 """Launch the ipcluster program in an external process."""
1282 1285
1283 1286 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1284 1287 help="Popen command for ipcluster")
1285 1288 ipcluster_args = List(
1286 1289 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1287 1290 help="Command line arguments to pass to ipcluster.")
1288 1291 ipcluster_subcommand = Unicode('start')
1289 1292 profile = Unicode('default')
1290 1293 n = Integer(2)
1291 1294
1292 1295 def find_args(self):
1293 1296 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1294 1297 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1295 1298 self.ipcluster_args
1296 1299
1297 1300 def start(self):
1298 1301 return super(IPClusterLauncher, self).start()
1299 1302
1300 1303 #-----------------------------------------------------------------------------
1301 1304 # Collections of launchers
1302 1305 #-----------------------------------------------------------------------------
1303 1306
1304 1307 local_launchers = [
1305 1308 LocalControllerLauncher,
1306 1309 LocalEngineLauncher,
1307 1310 LocalEngineSetLauncher,
1308 1311 ]
1309 1312 mpi_launchers = [
1310 1313 MPILauncher,
1311 1314 MPIControllerLauncher,
1312 1315 MPIEngineSetLauncher,
1313 1316 ]
1314 1317 ssh_launchers = [
1315 1318 SSHLauncher,
1316 1319 SSHControllerLauncher,
1317 1320 SSHEngineLauncher,
1318 1321 SSHEngineSetLauncher,
1319 1322 ]
1320 1323 winhpc_launchers = [
1321 1324 WindowsHPCLauncher,
1322 1325 WindowsHPCControllerLauncher,
1323 1326 WindowsHPCEngineSetLauncher,
1324 1327 ]
1325 1328 pbs_launchers = [
1326 1329 PBSLauncher,
1327 1330 PBSControllerLauncher,
1328 1331 PBSEngineSetLauncher,
1329 1332 ]
1330 1333 sge_launchers = [
1331 1334 SGELauncher,
1332 1335 SGEControllerLauncher,
1333 1336 SGEEngineSetLauncher,
1334 1337 ]
1335 1338 lsf_launchers = [
1336 1339 LSFLauncher,
1337 1340 LSFControllerLauncher,
1338 1341 LSFEngineSetLauncher,
1339 1342 ]
1340 1343 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1341 1344 + pbs_launchers + sge_launchers + lsf_launchers
1342 1345
General Comments 0
You need to be logged in to leave comments. Login now