##// END OF EJS Templates
Backport PR #2126: ipcluster broken with any batch (PBS/LSF/SGE)...
MinRK -
Show More
@@ -1,1341 +1,1342 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 import pipes
25 26 import stat
26 27 import sys
27 28 import time
28 29
29 30 # signal imports, handling various platforms, versions
30 31
31 32 from signal import SIGINT, SIGTERM
32 33 try:
33 34 from signal import SIGKILL
34 35 except ImportError:
35 36 # Windows
36 37 SIGKILL=SIGTERM
37 38
38 39 try:
39 40 # Windows >= 2.7, 3.2
40 41 from signal import CTRL_C_EVENT as SIGINT
41 42 except ImportError:
42 43 pass
43 44
44 45 from subprocess import Popen, PIPE, STDOUT
45 46 try:
46 47 from subprocess import check_output
47 48 except ImportError:
48 49 # pre-2.7, define check_output with Popen
49 50 def check_output(*args, **kwargs):
50 51 kwargs.update(dict(stdout=PIPE))
51 52 p = Popen(*args, **kwargs)
52 53 out,err = p.communicate()
53 54 return out
54 55
55 56 from zmq.eventloop import ioloop
56 57
57 58 from IPython.config.application import Application
58 59 from IPython.config.configurable import LoggingConfigurable
59 60 from IPython.utils.text import EvalFormatter
60 61 from IPython.utils.traitlets import (
61 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 63 )
63 64 from IPython.utils.path import get_home_dir
64 65 from IPython.utils.process import find_cmd, FindCmdError
65 66
66 67 from .win32support import forward_read_events
67 68
68 69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 70
70 71 WINDOWS = os.name == 'nt'
71 72
72 73 #-----------------------------------------------------------------------------
73 74 # Paths to the kernel apps
74 75 #-----------------------------------------------------------------------------
75 76
76 77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
77 78
78 79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
79 80
80 81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
81 82
82 83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
83 84
84 85 #-----------------------------------------------------------------------------
85 86 # Base launchers and errors
86 87 #-----------------------------------------------------------------------------
87 88
88 89
89 90 class LauncherError(Exception):
90 91 pass
91 92
92 93
93 94 class ProcessStateError(LauncherError):
94 95 pass
95 96
96 97
97 98 class UnknownStatus(LauncherError):
98 99 pass
99 100
100 101
101 102 class BaseLauncher(LoggingConfigurable):
102 103 """An asbtraction for starting, stopping and signaling a process."""
103 104
104 105 # In all of the launchers, the work_dir is where child processes will be
105 106 # run. This will usually be the profile_dir, but may not be. any work_dir
106 107 # passed into the __init__ method will override the config value.
107 108 # This should not be used to set the work_dir for the actual engine
108 109 # and controller. Instead, use their own config files or the
109 110 # controller_args, engine_args attributes of the launchers to add
110 111 # the work_dir option.
111 112 work_dir = Unicode(u'.')
112 113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113 114
114 115 start_data = Any()
115 116 stop_data = Any()
116 117
117 118 def _loop_default(self):
118 119 return ioloop.IOLoop.instance()
119 120
120 121 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 123 self.state = 'before' # can be before, running, after
123 124 self.stop_callbacks = []
124 125 self.start_data = None
125 126 self.stop_data = None
126 127
127 128 @property
128 129 def args(self):
129 130 """A list of cmd and args that will be used to start the process.
130 131
131 132 This is what is passed to :func:`spawnProcess` and the first element
132 133 will be the process name.
133 134 """
134 135 return self.find_args()
135 136
136 137 def find_args(self):
137 138 """The ``.args`` property calls this to find the args list.
138 139
139 140 Subcommand should implement this to construct the cmd and args.
140 141 """
141 142 raise NotImplementedError('find_args must be implemented in a subclass')
142 143
143 144 @property
144 145 def arg_str(self):
145 146 """The string form of the program arguments."""
146 147 return ' '.join(self.args)
147 148
148 149 @property
149 150 def running(self):
150 151 """Am I running."""
151 152 if self.state == 'running':
152 153 return True
153 154 else:
154 155 return False
155 156
156 157 def start(self):
157 158 """Start the process."""
158 159 raise NotImplementedError('start must be implemented in a subclass')
159 160
160 161 def stop(self):
161 162 """Stop the process and notify observers of stopping.
162 163
163 164 This method will return None immediately.
164 165 To observe the actual process stopping, see :meth:`on_stop`.
165 166 """
166 167 raise NotImplementedError('stop must be implemented in a subclass')
167 168
168 169 def on_stop(self, f):
169 170 """Register a callback to be called with this Launcher's stop_data
170 171 when the process actually finishes.
171 172 """
172 173 if self.state=='after':
173 174 return f(self.stop_data)
174 175 else:
175 176 self.stop_callbacks.append(f)
176 177
177 178 def notify_start(self, data):
178 179 """Call this to trigger startup actions.
179 180
180 181 This logs the process startup and sets the state to 'running'. It is
181 182 a pass-through so it can be used as a callback.
182 183 """
183 184
184 185 self.log.debug('Process %r started: %r', self.args[0], data)
185 186 self.start_data = data
186 187 self.state = 'running'
187 188 return data
188 189
189 190 def notify_stop(self, data):
190 191 """Call this to trigger process stop actions.
191 192
192 193 This logs the process stopping and sets the state to 'after'. Call
193 194 this to trigger callbacks registered via :meth:`on_stop`."""
194 195
195 196 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 197 self.stop_data = data
197 198 self.state = 'after'
198 199 for i in range(len(self.stop_callbacks)):
199 200 d = self.stop_callbacks.pop()
200 201 d(data)
201 202 return data
202 203
203 204 def signal(self, sig):
204 205 """Signal the process.
205 206
206 207 Parameters
207 208 ----------
208 209 sig : str or int
209 210 'KILL', 'INT', etc., or any signal number
210 211 """
211 212 raise NotImplementedError('signal must be implemented in a subclass')
212 213
213 214 class ClusterAppMixin(HasTraits):
214 215 """MixIn for cluster args as traits"""
215 216 profile_dir=Unicode('')
216 217 cluster_id=Unicode('')
217 218
218 219 @property
219 220 def cluster_args(self):
220 221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221 222
222 223 class ControllerMixin(ClusterAppMixin):
223 224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 225 help="""Popen command to launch ipcontroller.""")
225 226 # Command line arguments to ipcontroller.
226 227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 228 help="""command-line args to pass to ipcontroller""")
228 229
229 230 class EngineMixin(ClusterAppMixin):
230 231 engine_cmd = List(ipengine_cmd_argv, config=True,
231 232 help="""command to launch the Engine.""")
232 233 # Command line arguments for ipengine.
233 234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 235 help="command-line arguments to pass to ipengine"
235 236 )
236 237
237 238
238 239 #-----------------------------------------------------------------------------
239 240 # Local process launchers
240 241 #-----------------------------------------------------------------------------
241 242
242 243
243 244 class LocalProcessLauncher(BaseLauncher):
244 245 """Start and stop an external process in an asynchronous manner.
245 246
246 247 This will launch the external process with a working directory of
247 248 ``self.work_dir``.
248 249 """
249 250
250 251 # This is used to to construct self.args, which is passed to
251 252 # spawnProcess.
252 253 cmd_and_args = List([])
253 254 poll_frequency = Integer(100) # in ms
254 255
255 256 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 257 super(LocalProcessLauncher, self).__init__(
257 258 work_dir=work_dir, config=config, **kwargs
258 259 )
259 260 self.process = None
260 261 self.poller = None
261 262
262 263 def find_args(self):
263 264 return self.cmd_and_args
264 265
265 266 def start(self):
266 267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 268 if self.state == 'before':
268 269 self.process = Popen(self.args,
269 270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 271 env=os.environ,
271 272 cwd=self.work_dir
272 273 )
273 274 if WINDOWS:
274 275 self.stdout = forward_read_events(self.process.stdout)
275 276 self.stderr = forward_read_events(self.process.stderr)
276 277 else:
277 278 self.stdout = self.process.stdout.fileno()
278 279 self.stderr = self.process.stderr.fileno()
279 280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 283 self.poller.start()
283 284 self.notify_start(self.process.pid)
284 285 else:
285 286 s = 'The process was already started and has state: %r' % self.state
286 287 raise ProcessStateError(s)
287 288
288 289 def stop(self):
289 290 return self.interrupt_then_kill()
290 291
291 292 def signal(self, sig):
292 293 if self.state == 'running':
293 294 if WINDOWS and sig != SIGINT:
294 295 # use Windows tree-kill for better child cleanup
295 296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 297 else:
297 298 self.process.send_signal(sig)
298 299
299 300 def interrupt_then_kill(self, delay=2.0):
300 301 """Send INT, wait a delay and then send KILL."""
301 302 try:
302 303 self.signal(SIGINT)
303 304 except Exception:
304 305 self.log.debug("interrupt failed")
305 306 pass
306 307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 308 self.killer.start()
308 309
309 310 # callbacks, etc:
310 311
311 312 def handle_stdout(self, fd, events):
312 313 if WINDOWS:
313 314 line = self.stdout.recv()
314 315 else:
315 316 line = self.process.stdout.readline()
316 317 # a stopped process will be readable but return empty strings
317 318 if line:
318 319 self.log.debug(line[:-1])
319 320 else:
320 321 self.poll()
321 322
322 323 def handle_stderr(self, fd, events):
323 324 if WINDOWS:
324 325 line = self.stderr.recv()
325 326 else:
326 327 line = self.process.stderr.readline()
327 328 # a stopped process will be readable but return empty strings
328 329 if line:
329 330 self.log.debug(line[:-1])
330 331 else:
331 332 self.poll()
332 333
333 334 def poll(self):
334 335 status = self.process.poll()
335 336 if status is not None:
336 337 self.poller.stop()
337 338 self.loop.remove_handler(self.stdout)
338 339 self.loop.remove_handler(self.stderr)
339 340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 341 return status
341 342
342 343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 344 """Launch a controller as a regular external process."""
344 345
345 346 def find_args(self):
346 347 return self.controller_cmd + self.cluster_args + self.controller_args
347 348
348 349 def start(self):
349 350 """Start the controller by profile_dir."""
350 351 return super(LocalControllerLauncher, self).start()
351 352
352 353
353 354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 355 """Launch a single engine as a regular externall process."""
355 356
356 357 def find_args(self):
357 358 return self.engine_cmd + self.cluster_args + self.engine_args
358 359
359 360
360 361 class LocalEngineSetLauncher(LocalEngineLauncher):
361 362 """Launch a set of engines as regular external processes."""
362 363
363 364 delay = CFloat(0.1, config=True,
364 365 help="""delay (in seconds) between starting each engine after the first.
365 366 This can help force the engines to get their ids in order, or limit
366 367 process flood when starting many engines."""
367 368 )
368 369
369 370 # launcher class
370 371 launcher_class = LocalEngineLauncher
371 372
372 373 launchers = Dict()
373 374 stop_data = Dict()
374 375
375 376 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 377 super(LocalEngineSetLauncher, self).__init__(
377 378 work_dir=work_dir, config=config, **kwargs
378 379 )
379 380 self.stop_data = {}
380 381
381 382 def start(self, n):
382 383 """Start n engines by profile or profile_dir."""
383 384 dlist = []
384 385 for i in range(n):
385 386 if i > 0:
386 387 time.sleep(self.delay)
387 388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 390 )
390 391
391 392 # Copy the engine args over to each engine launcher.
392 393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 394 el.engine_args = copy.deepcopy(self.engine_args)
394 395 el.on_stop(self._notice_engine_stopped)
395 396 d = el.start()
396 397 self.launchers[i] = el
397 398 dlist.append(d)
398 399 self.notify_start(dlist)
399 400 return dlist
400 401
401 402 def find_args(self):
402 403 return ['engine set']
403 404
404 405 def signal(self, sig):
405 406 dlist = []
406 407 for el in self.launchers.itervalues():
407 408 d = el.signal(sig)
408 409 dlist.append(d)
409 410 return dlist
410 411
411 412 def interrupt_then_kill(self, delay=1.0):
412 413 dlist = []
413 414 for el in self.launchers.itervalues():
414 415 d = el.interrupt_then_kill(delay)
415 416 dlist.append(d)
416 417 return dlist
417 418
418 419 def stop(self):
419 420 return self.interrupt_then_kill()
420 421
421 422 def _notice_engine_stopped(self, data):
422 423 pid = data['pid']
423 424 for idx,el in self.launchers.iteritems():
424 425 if el.process.pid == pid:
425 426 break
426 427 self.launchers.pop(idx)
427 428 self.stop_data[idx] = data
428 429 if not self.launchers:
429 430 self.notify_stop(self.stop_data)
430 431
431 432
432 433 #-----------------------------------------------------------------------------
433 434 # MPI launchers
434 435 #-----------------------------------------------------------------------------
435 436
436 437
437 438 class MPILauncher(LocalProcessLauncher):
438 439 """Launch an external process using mpiexec."""
439 440
440 441 mpi_cmd = List(['mpiexec'], config=True,
441 442 help="The mpiexec command to use in starting the process."
442 443 )
443 444 mpi_args = List([], config=True,
444 445 help="The command line arguments to pass to mpiexec."
445 446 )
446 447 program = List(['date'],
447 448 help="The program to start via mpiexec.")
448 449 program_args = List([],
449 450 help="The command line argument to the program."
450 451 )
451 452 n = Integer(1)
452 453
453 454 def __init__(self, *args, **kwargs):
454 455 # deprecation for old MPIExec names:
455 456 config = kwargs.get('config', {})
456 457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 458 deprecated = config.get(oldname)
458 459 if deprecated:
459 460 newname = oldname.replace('MPIExec', 'MPI')
460 461 config[newname].update(deprecated)
461 462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462 463
463 464 super(MPILauncher, self).__init__(*args, **kwargs)
464 465
465 466 def find_args(self):
466 467 """Build self.args using all the fields."""
467 468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 469 self.program + self.program_args
469 470
470 471 def start(self, n):
471 472 """Start n instances of the program using mpiexec."""
472 473 self.n = n
473 474 return super(MPILauncher, self).start()
474 475
475 476
476 477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 478 """Launch a controller using mpiexec."""
478 479
479 480 # alias back to *non-configurable* program[_args] for use in find_args()
480 481 # this way all Controller/EngineSetLaunchers have the same form, rather
481 482 # than *some* having `program_args` and others `controller_args`
482 483 @property
483 484 def program(self):
484 485 return self.controller_cmd
485 486
486 487 @property
487 488 def program_args(self):
488 489 return self.cluster_args + self.controller_args
489 490
490 491 def start(self):
491 492 """Start the controller by profile_dir."""
492 493 return super(MPIControllerLauncher, self).start(1)
493 494
494 495
495 496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 497 """Launch engines using mpiexec"""
497 498
498 499 # alias back to *non-configurable* program[_args] for use in find_args()
499 500 # this way all Controller/EngineSetLaunchers have the same form, rather
500 501 # than *some* having `program_args` and others `controller_args`
501 502 @property
502 503 def program(self):
503 504 return self.engine_cmd
504 505
505 506 @property
506 507 def program_args(self):
507 508 return self.cluster_args + self.engine_args
508 509
509 510 def start(self, n):
510 511 """Start n engines by profile or profile_dir."""
511 512 self.n = n
512 513 return super(MPIEngineSetLauncher, self).start(n)
513 514
514 515 # deprecated MPIExec names
515 516 class DeprecatedMPILauncher(object):
516 517 def warn(self):
517 518 oldname = self.__class__.__name__
518 519 newname = oldname.replace('MPIExec', 'MPI')
519 520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520 521
521 522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 523 """Deprecated, use MPILauncher"""
523 524 def __init__(self, *args, **kwargs):
524 525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 526 self.warn()
526 527
527 528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 529 """Deprecated, use MPIControllerLauncher"""
529 530 def __init__(self, *args, **kwargs):
530 531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 532 self.warn()
532 533
533 534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 535 """Deprecated, use MPIEngineSetLauncher"""
535 536 def __init__(self, *args, **kwargs):
536 537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 538 self.warn()
538 539
539 540
540 541 #-----------------------------------------------------------------------------
541 542 # SSH launchers
542 543 #-----------------------------------------------------------------------------
543 544
544 545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545 546
546 547 class SSHLauncher(LocalProcessLauncher):
547 548 """A minimal launcher for ssh.
548 549
549 550 To be useful this will probably have to be extended to use the ``sshx``
550 551 idea for environment variables. There could be other things this needs
551 552 as well.
552 553 """
553 554
554 555 ssh_cmd = List(['ssh'], config=True,
555 556 help="command for starting ssh")
556 557 ssh_args = List(['-tt'], config=True,
557 558 help="args to pass to ssh")
558 559 scp_cmd = List(['scp'], config=True,
559 560 help="command for sending files")
560 561 program = List(['date'],
561 562 help="Program to launch via ssh")
562 563 program_args = List([],
563 564 help="args to pass to remote program")
564 565 hostname = Unicode('', config=True,
565 566 help="hostname on which to launch the program")
566 567 user = Unicode('', config=True,
567 568 help="username for ssh")
568 569 location = Unicode('', config=True,
569 570 help="user@hostname location for ssh in one setting")
570 571 to_fetch = List([], config=True,
571 572 help="List of (remote, local) files to fetch after starting")
572 573 to_send = List([], config=True,
573 574 help="List of (local, remote) files to send before starting")
574 575
575 576 def _hostname_changed(self, name, old, new):
576 577 if self.user:
577 578 self.location = u'%s@%s' % (self.user, new)
578 579 else:
579 580 self.location = new
580 581
581 582 def _user_changed(self, name, old, new):
582 583 self.location = u'%s@%s' % (new, self.hostname)
583 584
584 585 def find_args(self):
585 586 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 587 self.program + self.program_args
587 588
588 589 def _send_file(self, local, remote):
589 590 """send a single file"""
590 591 remote = "%s:%s" % (self.location, remote)
591 592 for i in range(10):
592 593 if not os.path.exists(local):
593 594 self.log.debug("waiting for %s" % local)
594 595 time.sleep(1)
595 596 else:
596 597 break
597 598 self.log.info("sending %s to %s", local, remote)
598 599 check_output(self.scp_cmd + [local, remote])
599 600
600 601 def send_files(self):
601 602 """send our files (called before start)"""
602 603 if not self.to_send:
603 604 return
604 605 for local_file, remote_file in self.to_send:
605 606 self._send_file(local_file, remote_file)
606 607
607 608 def _fetch_file(self, remote, local):
608 609 """fetch a single file"""
609 610 full_remote = "%s:%s" % (self.location, remote)
610 611 self.log.info("fetching %s from %s", local, full_remote)
611 612 for i in range(10):
612 613 # wait up to 10s for remote file to exist
613 614 check = check_output(self.ssh_cmd + self.ssh_args + \
614 615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 616 check = check.strip()
616 617 if check == 'no':
617 618 time.sleep(1)
618 619 elif check == 'yes':
619 620 break
620 621 check_output(self.scp_cmd + [full_remote, local])
621 622
622 623 def fetch_files(self):
623 624 """fetch remote files (called after start)"""
624 625 if not self.to_fetch:
625 626 return
626 627 for remote_file, local_file in self.to_fetch:
627 628 self._fetch_file(remote_file, local_file)
628 629
629 630 def start(self, hostname=None, user=None):
630 631 if hostname is not None:
631 632 self.hostname = hostname
632 633 if user is not None:
633 634 self.user = user
634 635
635 636 self.send_files()
636 637 super(SSHLauncher, self).start()
637 638 self.fetch_files()
638 639
639 640 def signal(self, sig):
640 641 if self.state == 'running':
641 642 # send escaped ssh connection-closer
642 643 self.process.stdin.write('~.')
643 644 self.process.stdin.flush()
644 645
645 646 class SSHClusterLauncher(SSHLauncher):
646 647
647 648 remote_profile_dir = Unicode('', config=True,
648 649 help="""The remote profile_dir to use.
649 650
650 651 If not specified, use calling profile, stripping out possible leading homedir.
651 652 """)
652 653
653 654 def _remote_profie_dir_default(self):
654 655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
655 656 """
656 657 home = get_home_dir()
657 658 if not home.endswith('/'):
658 659 home = home+'/'
659 660
660 661 if self.profile_dir.startswith(home):
661 662 return self.profile_dir[len(home):]
662 663 else:
663 664 return self.profile_dir
664 665
665 666 def _cluster_id_changed(self, name, old, new):
666 667 if new:
667 668 raise ValueError("cluster id not supported by SSH launchers")
668 669
669 670 @property
670 671 def cluster_args(self):
671 672 return ['--profile-dir', self.remote_profile_dir]
672 673
673 674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
674 675
675 676 # alias back to *non-configurable* program[_args] for use in find_args()
676 677 # this way all Controller/EngineSetLaunchers have the same form, rather
677 678 # than *some* having `program_args` and others `controller_args`
678 679
679 680 def _controller_cmd_default(self):
680 681 return ['ipcontroller']
681 682
682 683 @property
683 684 def program(self):
684 685 return self.controller_cmd
685 686
686 687 @property
687 688 def program_args(self):
688 689 return self.cluster_args + self.controller_args
689 690
690 691 def _to_fetch_default(self):
691 692 return [
692 693 (os.path.join(self.remote_profile_dir, 'security', cf),
693 694 os.path.join(self.profile_dir, 'security', cf),)
694 695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
695 696 ]
696 697
697 698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
698 699
699 700 # alias back to *non-configurable* program[_args] for use in find_args()
700 701 # this way all Controller/EngineSetLaunchers have the same form, rather
701 702 # than *some* having `program_args` and others `controller_args`
702 703
703 704 def _engine_cmd_default(self):
704 705 return ['ipengine']
705 706
706 707 @property
707 708 def program(self):
708 709 return self.engine_cmd
709 710
710 711 @property
711 712 def program_args(self):
712 713 return self.cluster_args + self.engine_args
713 714
714 715 def _to_send_default(self):
715 716 return [
716 717 (os.path.join(self.profile_dir, 'security', cf),
717 718 os.path.join(self.remote_profile_dir, 'security', cf))
718 719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
719 720 ]
720 721
721 722
722 723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
723 724 launcher_class = SSHEngineLauncher
724 725 engines = Dict(config=True,
725 726 help="""dict of engines to launch. This is a dict by hostname of ints,
726 727 corresponding to the number of engines to start on that host.""")
727 728
728 729 @property
729 730 def engine_count(self):
730 731 """determine engine count from `engines` dict"""
731 732 count = 0
732 733 for n in self.engines.itervalues():
733 734 if isinstance(n, (tuple,list)):
734 735 n,args = n
735 736 count += n
736 737 return count
737 738
738 739 def start(self, n):
739 740 """Start engines by profile or profile_dir.
740 741 `n` is ignored, and the `engines` config property is used instead.
741 742 """
742 743
743 744 dlist = []
744 745 for host, n in self.engines.iteritems():
745 746 if isinstance(n, (tuple, list)):
746 747 n, args = n
747 748 else:
748 749 args = copy.deepcopy(self.engine_args)
749 750
750 751 if '@' in host:
751 752 user,host = host.split('@',1)
752 753 else:
753 754 user=None
754 755 for i in range(n):
755 756 if i > 0:
756 757 time.sleep(self.delay)
757 758 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
758 759 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
759 760 )
760 761 if i > 0:
761 762 # only send files for the first engine on each host
762 763 el.to_send = []
763 764
764 765 # Copy the engine args over to each engine launcher.
765 766 el.engine_cmd = self.engine_cmd
766 767 el.engine_args = args
767 768 el.on_stop(self._notice_engine_stopped)
768 769 d = el.start(user=user, hostname=host)
769 770 self.launchers[ "%s/%i" % (host,i) ] = el
770 771 dlist.append(d)
771 772 self.notify_start(dlist)
772 773 return dlist
773 774
774 775
775 776 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
776 777 """Launcher for calling
777 778 `ipcluster engines` on a remote machine.
778 779
779 780 Requires that remote profile is already configured.
780 781 """
781 782
782 783 n = Integer()
783 784 ipcluster_cmd = List(['ipcluster'], config=True)
784 785
785 786 @property
786 787 def program(self):
787 788 return self.ipcluster_cmd + ['engines']
788 789
789 790 @property
790 791 def program_args(self):
791 792 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
792 793
793 794 def _to_send_default(self):
794 795 return [
795 796 (os.path.join(self.profile_dir, 'security', cf),
796 797 os.path.join(self.remote_profile_dir, 'security', cf))
797 798 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
798 799 ]
799 800
800 801 def start(self, n):
801 802 self.n = n
802 803 super(SSHProxyEngineSetLauncher, self).start()
803 804
804 805
805 806 #-----------------------------------------------------------------------------
806 807 # Windows HPC Server 2008 scheduler launchers
807 808 #-----------------------------------------------------------------------------
808 809
809 810
810 811 # This is only used on Windows.
811 812 def find_job_cmd():
812 813 if WINDOWS:
813 814 try:
814 815 return find_cmd('job')
815 816 except (FindCmdError, ImportError):
816 817 # ImportError will be raised if win32api is not installed
817 818 return 'job'
818 819 else:
819 820 return 'job'
820 821
821 822
822 823 class WindowsHPCLauncher(BaseLauncher):
823 824
824 825 job_id_regexp = CRegExp(r'\d+', config=True,
825 826 help="""A regular expression used to get the job id from the output of the
826 827 submit_command. """
827 828 )
828 829 job_file_name = Unicode(u'ipython_job.xml', config=True,
829 830 help="The filename of the instantiated job script.")
830 831 # The full path to the instantiated job script. This gets made dynamically
831 832 # by combining the work_dir with the job_file_name.
832 833 job_file = Unicode(u'')
833 834 scheduler = Unicode('', config=True,
834 835 help="The hostname of the scheduler to submit the job to.")
835 836 job_cmd = Unicode(find_job_cmd(), config=True,
836 837 help="The command for submitting jobs.")
837 838
838 839 def __init__(self, work_dir=u'.', config=None, **kwargs):
839 840 super(WindowsHPCLauncher, self).__init__(
840 841 work_dir=work_dir, config=config, **kwargs
841 842 )
842 843
843 844 @property
844 845 def job_file(self):
845 846 return os.path.join(self.work_dir, self.job_file_name)
846 847
847 848 def write_job_file(self, n):
848 849 raise NotImplementedError("Implement write_job_file in a subclass.")
849 850
850 851 def find_args(self):
851 852 return [u'job.exe']
852 853
853 854 def parse_job_id(self, output):
854 855 """Take the output of the submit command and return the job id."""
855 856 m = self.job_id_regexp.search(output)
856 857 if m is not None:
857 858 job_id = m.group()
858 859 else:
859 860 raise LauncherError("Job id couldn't be determined: %s" % output)
860 861 self.job_id = job_id
861 862 self.log.info('Job started with id: %r', job_id)
862 863 return job_id
863 864
864 865 def start(self, n):
865 866 """Start n copies of the process using the Win HPC job scheduler."""
866 867 self.write_job_file(n)
867 868 args = [
868 869 'submit',
869 870 '/jobfile:%s' % self.job_file,
870 871 '/scheduler:%s' % self.scheduler
871 872 ]
872 873 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
873 874
874 875 output = check_output([self.job_cmd]+args,
875 876 env=os.environ,
876 877 cwd=self.work_dir,
877 878 stderr=STDOUT
878 879 )
879 880 job_id = self.parse_job_id(output)
880 881 self.notify_start(job_id)
881 882 return job_id
882 883
883 884 def stop(self):
884 885 args = [
885 886 'cancel',
886 887 self.job_id,
887 888 '/scheduler:%s' % self.scheduler
888 889 ]
889 890 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
890 891 try:
891 892 output = check_output([self.job_cmd]+args,
892 893 env=os.environ,
893 894 cwd=self.work_dir,
894 895 stderr=STDOUT
895 896 )
896 897 except:
897 898 output = 'The job already appears to be stoppped: %r' % self.job_id
898 899 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
899 900 return output
900 901
901 902
902 903 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
903 904
904 905 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
905 906 help="WinHPC xml job file.")
906 907 controller_args = List([], config=False,
907 908 help="extra args to pass to ipcontroller")
908 909
909 910 def write_job_file(self, n):
910 911 job = IPControllerJob(config=self.config)
911 912
912 913 t = IPControllerTask(config=self.config)
913 914 # The tasks work directory is *not* the actual work directory of
914 915 # the controller. It is used as the base path for the stdout/stderr
915 916 # files that the scheduler redirects to.
916 917 t.work_directory = self.profile_dir
917 918 # Add the profile_dir and from self.start().
918 919 t.controller_args.extend(self.cluster_args)
919 920 t.controller_args.extend(self.controller_args)
920 921 job.add_task(t)
921 922
922 923 self.log.debug("Writing job description file: %s", self.job_file)
923 924 job.write(self.job_file)
924 925
925 926 @property
926 927 def job_file(self):
927 928 return os.path.join(self.profile_dir, self.job_file_name)
928 929
929 930 def start(self):
930 931 """Start the controller by profile_dir."""
931 932 return super(WindowsHPCControllerLauncher, self).start(1)
932 933
933 934
934 935 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
935 936
936 937 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
937 938 help="jobfile for ipengines job")
938 939 engine_args = List([], config=False,
939 940 help="extra args to pas to ipengine")
940 941
941 942 def write_job_file(self, n):
942 943 job = IPEngineSetJob(config=self.config)
943 944
944 945 for i in range(n):
945 946 t = IPEngineTask(config=self.config)
946 947 # The tasks work directory is *not* the actual work directory of
947 948 # the engine. It is used as the base path for the stdout/stderr
948 949 # files that the scheduler redirects to.
949 950 t.work_directory = self.profile_dir
950 951 # Add the profile_dir and from self.start().
951 952 t.engine_args.extend(self.cluster_args)
952 953 t.engine_args.extend(self.engine_args)
953 954 job.add_task(t)
954 955
955 956 self.log.debug("Writing job description file: %s", self.job_file)
956 957 job.write(self.job_file)
957 958
958 959 @property
959 960 def job_file(self):
960 961 return os.path.join(self.profile_dir, self.job_file_name)
961 962
962 963 def start(self, n):
963 964 """Start the controller by profile_dir."""
964 965 return super(WindowsHPCEngineSetLauncher, self).start(n)
965 966
966 967
967 968 #-----------------------------------------------------------------------------
968 969 # Batch (PBS) system launchers
969 970 #-----------------------------------------------------------------------------
970 971
971 972 class BatchClusterAppMixin(ClusterAppMixin):
972 973 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
973 974 def _profile_dir_changed(self, name, old, new):
974 975 self.context[name] = new
975 976 _cluster_id_changed = _profile_dir_changed
976 977
977 978 def _profile_dir_default(self):
978 979 self.context['profile_dir'] = ''
979 980 return ''
980 981 def _cluster_id_default(self):
981 982 self.context['cluster_id'] = ''
982 983 return ''
983 984
984 985
985 986 class BatchSystemLauncher(BaseLauncher):
986 987 """Launch an external process using a batch system.
987 988
988 989 This class is designed to work with UNIX batch systems like PBS, LSF,
989 990 GridEngine, etc. The overall model is that there are different commands
990 991 like qsub, qdel, etc. that handle the starting and stopping of the process.
991 992
992 993 This class also has the notion of a batch script. The ``batch_template``
993 994 attribute can be set to a string that is a template for the batch script.
994 995 This template is instantiated using string formatting. Thus the template can
995 996 use {n} fot the number of instances. Subclasses can add additional variables
996 997 to the template dict.
997 998 """
998 999
999 1000 # Subclasses must fill these in. See PBSEngineSet
1000 1001 submit_command = List([''], config=True,
1001 1002 help="The name of the command line program used to submit jobs.")
1002 1003 delete_command = List([''], config=True,
1003 1004 help="The name of the command line program used to delete jobs.")
1004 1005 job_id_regexp = CRegExp('', config=True,
1005 1006 help="""A regular expression used to get the job id from the output of the
1006 1007 submit_command.""")
1007 1008 batch_template = Unicode('', config=True,
1008 1009 help="The string that is the batch script template itself.")
1009 1010 batch_template_file = Unicode(u'', config=True,
1010 1011 help="The file that contains the batch template.")
1011 1012 batch_file_name = Unicode(u'batch_script', config=True,
1012 1013 help="The filename of the instantiated batch script.")
1013 1014 queue = Unicode(u'', config=True,
1014 1015 help="The PBS Queue.")
1015 1016
1016 1017 def _queue_changed(self, name, old, new):
1017 1018 self.context[name] = new
1018 1019
1019 1020 n = Integer(1)
1020 1021 _n_changed = _queue_changed
1021 1022
1022 1023 # not configurable, override in subclasses
1023 1024 # PBS Job Array regex
1024 1025 job_array_regexp = CRegExp('')
1025 1026 job_array_template = Unicode('')
1026 1027 # PBS Queue regex
1027 1028 queue_regexp = CRegExp('')
1028 1029 queue_template = Unicode('')
1029 1030 # The default batch template, override in subclasses
1030 1031 default_template = Unicode('')
1031 1032 # The full path to the instantiated batch script.
1032 1033 batch_file = Unicode(u'')
1033 1034 # the format dict used with batch_template:
1034 1035 context = Dict()
1035 1036 def _context_default(self):
1036 1037 """load the default context with the default values for the basic keys
1037 1038
1038 1039 because the _trait_changed methods only load the context if they
1039 1040 are set to something other than the default value.
1040 1041 """
1041 1042 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1042 1043
1043 1044 # the Formatter instance for rendering the templates:
1044 1045 formatter = Instance(EvalFormatter, (), {})
1045 1046
1046 1047
1047 1048 def find_args(self):
1048 1049 return self.submit_command + [self.batch_file]
1049 1050
1050 1051 def __init__(self, work_dir=u'.', config=None, **kwargs):
1051 1052 super(BatchSystemLauncher, self).__init__(
1052 1053 work_dir=work_dir, config=config, **kwargs
1053 1054 )
1054 1055 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1055 1056
1056 1057 def parse_job_id(self, output):
1057 1058 """Take the output of the submit command and return the job id."""
1058 1059 m = self.job_id_regexp.search(output)
1059 1060 if m is not None:
1060 1061 job_id = m.group()
1061 1062 else:
1062 1063 raise LauncherError("Job id couldn't be determined: %s" % output)
1063 1064 self.job_id = job_id
1064 1065 self.log.info('Job submitted with job id: %r', job_id)
1065 1066 return job_id
1066 1067
1067 1068 def write_batch_script(self, n):
1068 1069 """Instantiate and write the batch script to the work_dir."""
1069 1070 self.n = n
1070 1071 # first priority is batch_template if set
1071 1072 if self.batch_template_file and not self.batch_template:
1072 1073 # second priority is batch_template_file
1073 1074 with open(self.batch_template_file) as f:
1074 1075 self.batch_template = f.read()
1075 1076 if not self.batch_template:
1076 1077 # third (last) priority is default_template
1077 1078 self.batch_template = self.default_template
1078 1079
1079 1080 # add jobarray or queue lines to user-specified template
1080 1081 # note that this is *only* when user did not specify a template.
1081 1082 # print self.job_array_regexp.search(self.batch_template)
1082 1083 if not self.job_array_regexp.search(self.batch_template):
1083 1084 self.log.debug("adding job array settings to batch script")
1084 1085 firstline, rest = self.batch_template.split('\n',1)
1085 1086 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1086 1087
1087 1088 # print self.queue_regexp.search(self.batch_template)
1088 1089 if self.queue and not self.queue_regexp.search(self.batch_template):
1089 1090 self.log.debug("adding PBS queue settings to batch script")
1090 1091 firstline, rest = self.batch_template.split('\n',1)
1091 1092 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1092 1093
1093 1094 script_as_string = self.formatter.format(self.batch_template, **self.context)
1094 1095 self.log.debug('Writing batch script: %s', self.batch_file)
1095 1096
1096 1097 with open(self.batch_file, 'w') as f:
1097 1098 f.write(script_as_string)
1098 1099 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1099 1100
1100 1101 def start(self, n):
1101 1102 """Start n copies of the process using a batch system."""
1102 1103 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1103 1104 # Here we save profile_dir in the context so they
1104 1105 # can be used in the batch script template as {profile_dir}
1105 1106 self.write_batch_script(n)
1106 1107 output = check_output(self.args, env=os.environ)
1107 1108
1108 1109 job_id = self.parse_job_id(output)
1109 1110 self.notify_start(job_id)
1110 1111 return job_id
1111 1112
1112 1113 def stop(self):
1113 1114 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1114 1115 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1115 1116 return output
1116 1117
1117 1118
1118 1119 class PBSLauncher(BatchSystemLauncher):
1119 1120 """A BatchSystemLauncher subclass for PBS."""
1120 1121
1121 1122 submit_command = List(['qsub'], config=True,
1122 1123 help="The PBS submit command ['qsub']")
1123 1124 delete_command = List(['qdel'], config=True,
1124 1125 help="The PBS delete command ['qsub']")
1125 1126 job_id_regexp = CRegExp(r'\d+', config=True,
1126 1127 help="Regular expresion for identifying the job ID [r'\d+']")
1127 1128
1128 1129 batch_file = Unicode(u'')
1129 1130 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1130 1131 job_array_template = Unicode('#PBS -t 1-{n}')
1131 1132 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1132 1133 queue_template = Unicode('#PBS -q {queue}')
1133 1134
1134 1135
1135 1136 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1136 1137 """Launch a controller using PBS."""
1137 1138
1138 1139 batch_file_name = Unicode(u'pbs_controller', config=True,
1139 1140 help="batch file name for the controller job.")
1140 1141 default_template= Unicode("""#!/bin/sh
1141 1142 #PBS -V
1142 1143 #PBS -N ipcontroller
1143 1144 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1144 """%(' '.join(ipcontroller_cmd_argv)))
1145 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1145 1146
1146 1147
1147 1148 def start(self):
1148 1149 """Start the controller by profile or profile_dir."""
1149 1150 return super(PBSControllerLauncher, self).start(1)
1150 1151
1151 1152
1152 1153 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1153 1154 """Launch Engines using PBS"""
1154 1155 batch_file_name = Unicode(u'pbs_engines', config=True,
1155 1156 help="batch file name for the engine(s) job.")
1156 1157 default_template= Unicode(u"""#!/bin/sh
1157 1158 #PBS -V
1158 1159 #PBS -N ipengine
1159 1160 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1160 """%(' '.join(ipengine_cmd_argv)))
1161 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1161 1162
1162 1163 def start(self, n):
1163 1164 """Start n engines by profile or profile_dir."""
1164 1165 return super(PBSEngineSetLauncher, self).start(n)
1165 1166
1166 1167 #SGE is very similar to PBS
1167 1168
1168 1169 class SGELauncher(PBSLauncher):
1169 1170 """Sun GridEngine is a PBS clone with slightly different syntax"""
1170 1171 job_array_regexp = CRegExp('#\$\W+\-t')
1171 1172 job_array_template = Unicode('#$ -t 1-{n}')
1172 1173 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1173 1174 queue_template = Unicode('#$ -q {queue}')
1174 1175
1175 1176 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1176 1177 """Launch a controller using SGE."""
1177 1178
1178 1179 batch_file_name = Unicode(u'sge_controller', config=True,
1179 1180 help="batch file name for the ipontroller job.")
1180 1181 default_template= Unicode(u"""#$ -V
1181 1182 #$ -S /bin/sh
1182 1183 #$ -N ipcontroller
1183 1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1184 """%(' '.join(ipcontroller_cmd_argv)))
1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1185 1186
1186 1187 def start(self):
1187 1188 """Start the controller by profile or profile_dir."""
1188 1189 return super(SGEControllerLauncher, self).start(1)
1189 1190
1190 1191 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1191 1192 """Launch Engines with SGE"""
1192 1193 batch_file_name = Unicode(u'sge_engines', config=True,
1193 1194 help="batch file name for the engine(s) job.")
1194 1195 default_template = Unicode("""#$ -V
1195 1196 #$ -S /bin/sh
1196 1197 #$ -N ipengine
1197 1198 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1198 """%(' '.join(ipengine_cmd_argv)))
1199 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1199 1200
1200 1201 def start(self, n):
1201 1202 """Start n engines by profile or profile_dir."""
1202 1203 return super(SGEEngineSetLauncher, self).start(n)
1203 1204
1204 1205
1205 1206 # LSF launchers
1206 1207
1207 1208 class LSFLauncher(BatchSystemLauncher):
1208 1209 """A BatchSystemLauncher subclass for LSF."""
1209 1210
1210 1211 submit_command = List(['bsub'], config=True,
1211 1212 help="The PBS submit command ['bsub']")
1212 1213 delete_command = List(['bkill'], config=True,
1213 1214 help="The PBS delete command ['bkill']")
1214 1215 job_id_regexp = CRegExp(r'\d+', config=True,
1215 1216 help="Regular expresion for identifying the job ID [r'\d+']")
1216 1217
1217 1218 batch_file = Unicode(u'')
1218 1219 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1219 1220 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1220 1221 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1221 1222 queue_template = Unicode('#BSUB -q {queue}')
1222 1223
1223 1224 def start(self, n):
1224 1225 """Start n copies of the process using LSF batch system.
1225 1226 This cant inherit from the base class because bsub expects
1226 1227 to be piped a shell script in order to honor the #BSUB directives :
1227 1228 bsub < script
1228 1229 """
1229 1230 # Here we save profile_dir in the context so they
1230 1231 # can be used in the batch script template as {profile_dir}
1231 1232 self.write_batch_script(n)
1232 1233 #output = check_output(self.args, env=os.environ)
1233 1234 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1234 1235 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1235 1236 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1236 1237 output,err = p.communicate()
1237 1238 job_id = self.parse_job_id(output)
1238 1239 self.notify_start(job_id)
1239 1240 return job_id
1240 1241
1241 1242
1242 1243 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1243 1244 """Launch a controller using LSF."""
1244 1245
1245 1246 batch_file_name = Unicode(u'lsf_controller', config=True,
1246 1247 help="batch file name for the controller job.")
1247 1248 default_template= Unicode("""#!/bin/sh
1248 1249 #BSUB -J ipcontroller
1249 1250 #BSUB -oo ipcontroller.o.%%J
1250 1251 #BSUB -eo ipcontroller.e.%%J
1251 1252 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1252 """%(' '.join(ipcontroller_cmd_argv)))
1253 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1253 1254
1254 1255 def start(self):
1255 1256 """Start the controller by profile or profile_dir."""
1256 1257 return super(LSFControllerLauncher, self).start(1)
1257 1258
1258 1259
1259 1260 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1260 1261 """Launch Engines using LSF"""
1261 1262 batch_file_name = Unicode(u'lsf_engines', config=True,
1262 1263 help="batch file name for the engine(s) job.")
1263 1264 default_template= Unicode(u"""#!/bin/sh
1264 1265 #BSUB -oo ipengine.o.%%J
1265 1266 #BSUB -eo ipengine.e.%%J
1266 1267 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1267 """%(' '.join(ipengine_cmd_argv)))
1268 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1268 1269
1269 1270 def start(self, n):
1270 1271 """Start n engines by profile or profile_dir."""
1271 1272 return super(LSFEngineSetLauncher, self).start(n)
1272 1273
1273 1274
1274 1275 #-----------------------------------------------------------------------------
1275 1276 # A launcher for ipcluster itself!
1276 1277 #-----------------------------------------------------------------------------
1277 1278
1278 1279
1279 1280 class IPClusterLauncher(LocalProcessLauncher):
1280 1281 """Launch the ipcluster program in an external process."""
1281 1282
1282 1283 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1283 1284 help="Popen command for ipcluster")
1284 1285 ipcluster_args = List(
1285 1286 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1286 1287 help="Command line arguments to pass to ipcluster.")
1287 1288 ipcluster_subcommand = Unicode('start')
1288 1289 profile = Unicode('default')
1289 1290 n = Integer(2)
1290 1291
1291 1292 def find_args(self):
1292 1293 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1293 1294 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1294 1295 self.ipcluster_args
1295 1296
1296 1297 def start(self):
1297 1298 return super(IPClusterLauncher, self).start()
1298 1299
1299 1300 #-----------------------------------------------------------------------------
1300 1301 # Collections of launchers
1301 1302 #-----------------------------------------------------------------------------
1302 1303
1303 1304 local_launchers = [
1304 1305 LocalControllerLauncher,
1305 1306 LocalEngineLauncher,
1306 1307 LocalEngineSetLauncher,
1307 1308 ]
1308 1309 mpi_launchers = [
1309 1310 MPILauncher,
1310 1311 MPIControllerLauncher,
1311 1312 MPIEngineSetLauncher,
1312 1313 ]
1313 1314 ssh_launchers = [
1314 1315 SSHLauncher,
1315 1316 SSHControllerLauncher,
1316 1317 SSHEngineLauncher,
1317 1318 SSHEngineSetLauncher,
1318 1319 ]
1319 1320 winhpc_launchers = [
1320 1321 WindowsHPCLauncher,
1321 1322 WindowsHPCControllerLauncher,
1322 1323 WindowsHPCEngineSetLauncher,
1323 1324 ]
1324 1325 pbs_launchers = [
1325 1326 PBSLauncher,
1326 1327 PBSControllerLauncher,
1327 1328 PBSEngineSetLauncher,
1328 1329 ]
1329 1330 sge_launchers = [
1330 1331 SGELauncher,
1331 1332 SGEControllerLauncher,
1332 1333 SGEEngineSetLauncher,
1333 1334 ]
1334 1335 lsf_launchers = [
1335 1336 LSFLauncher,
1336 1337 LSFControllerLauncher,
1337 1338 LSFEngineSetLauncher,
1338 1339 ]
1339 1340 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1340 1341 + pbs_launchers + sge_launchers + lsf_launchers
1341 1342
General Comments 0
You need to be logged in to leave comments. Login now