##// END OF EJS Templates
Backport PR #2511: trigger default remote_profile_dir when profile_dir is set...
MinRK -
Show More
@@ -1,1345 +1,1354 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import pipes
26 26 import stat
27 27 import sys
28 28 import time
29 29
30 30 # signal imports, handling various platforms, versions
31 31
32 32 from signal import SIGINT, SIGTERM
33 33 try:
34 34 from signal import SIGKILL
35 35 except ImportError:
36 36 # Windows
37 37 SIGKILL=SIGTERM
38 38
39 39 try:
40 40 # Windows >= 2.7, 3.2
41 41 from signal import CTRL_C_EVENT as SIGINT
42 42 except ImportError:
43 43 pass
44 44
45 45 from subprocess import Popen, PIPE, STDOUT
46 46 try:
47 47 from subprocess import check_output
48 48 except ImportError:
49 49 # pre-2.7, define check_output with Popen
50 50 def check_output(*args, **kwargs):
51 51 kwargs.update(dict(stdout=PIPE))
52 52 p = Popen(*args, **kwargs)
53 53 out,err = p.communicate()
54 54 return out
55 55
56 56 from zmq.eventloop import ioloop
57 57
58 58 from IPython.config.application import Application
59 59 from IPython.config.configurable import LoggingConfigurable
60 60 from IPython.utils.text import EvalFormatter
61 61 from IPython.utils.traitlets import (
62 62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 63 )
64 64 from IPython.utils.path import get_home_dir
65 65 from IPython.utils.process import find_cmd, FindCmdError
66 66
67 67 from .win32support import forward_read_events
68 68
69 69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70 70
71 71 WINDOWS = os.name == 'nt'
72 72
73 73 #-----------------------------------------------------------------------------
74 74 # Paths to the kernel apps
75 75 #-----------------------------------------------------------------------------
76 76
77 77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78 78
79 79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80 80
81 81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 82
83 83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84 84
85 85 #-----------------------------------------------------------------------------
86 86 # Base launchers and errors
87 87 #-----------------------------------------------------------------------------
88 88
89 89
90 90 class LauncherError(Exception):
91 91 pass
92 92
93 93
94 94 class ProcessStateError(LauncherError):
95 95 pass
96 96
97 97
98 98 class UnknownStatus(LauncherError):
99 99 pass
100 100
101 101
102 102 class BaseLauncher(LoggingConfigurable):
103 103 """An asbtraction for starting, stopping and signaling a process."""
104 104
105 105 # In all of the launchers, the work_dir is where child processes will be
106 106 # run. This will usually be the profile_dir, but may not be. any work_dir
107 107 # passed into the __init__ method will override the config value.
108 108 # This should not be used to set the work_dir for the actual engine
109 109 # and controller. Instead, use their own config files or the
110 110 # controller_args, engine_args attributes of the launchers to add
111 111 # the work_dir option.
112 112 work_dir = Unicode(u'.')
113 113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114 114
115 115 start_data = Any()
116 116 stop_data = Any()
117 117
118 118 def _loop_default(self):
119 119 return ioloop.IOLoop.instance()
120 120
121 121 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 123 self.state = 'before' # can be before, running, after
124 124 self.stop_callbacks = []
125 125 self.start_data = None
126 126 self.stop_data = None
127 127
128 128 @property
129 129 def args(self):
130 130 """A list of cmd and args that will be used to start the process.
131 131
132 132 This is what is passed to :func:`spawnProcess` and the first element
133 133 will be the process name.
134 134 """
135 135 return self.find_args()
136 136
137 137 def find_args(self):
138 138 """The ``.args`` property calls this to find the args list.
139 139
140 140 Subcommand should implement this to construct the cmd and args.
141 141 """
142 142 raise NotImplementedError('find_args must be implemented in a subclass')
143 143
144 144 @property
145 145 def arg_str(self):
146 146 """The string form of the program arguments."""
147 147 return ' '.join(self.args)
148 148
149 149 @property
150 150 def running(self):
151 151 """Am I running."""
152 152 if self.state == 'running':
153 153 return True
154 154 else:
155 155 return False
156 156
157 157 def start(self):
158 158 """Start the process."""
159 159 raise NotImplementedError('start must be implemented in a subclass')
160 160
161 161 def stop(self):
162 162 """Stop the process and notify observers of stopping.
163 163
164 164 This method will return None immediately.
165 165 To observe the actual process stopping, see :meth:`on_stop`.
166 166 """
167 167 raise NotImplementedError('stop must be implemented in a subclass')
168 168
169 169 def on_stop(self, f):
170 170 """Register a callback to be called with this Launcher's stop_data
171 171 when the process actually finishes.
172 172 """
173 173 if self.state=='after':
174 174 return f(self.stop_data)
175 175 else:
176 176 self.stop_callbacks.append(f)
177 177
178 178 def notify_start(self, data):
179 179 """Call this to trigger startup actions.
180 180
181 181 This logs the process startup and sets the state to 'running'. It is
182 182 a pass-through so it can be used as a callback.
183 183 """
184 184
185 185 self.log.debug('Process %r started: %r', self.args[0], data)
186 186 self.start_data = data
187 187 self.state = 'running'
188 188 return data
189 189
190 190 def notify_stop(self, data):
191 191 """Call this to trigger process stop actions.
192 192
193 193 This logs the process stopping and sets the state to 'after'. Call
194 194 this to trigger callbacks registered via :meth:`on_stop`."""
195 195
196 196 self.log.debug('Process %r stopped: %r', self.args[0], data)
197 197 self.stop_data = data
198 198 self.state = 'after'
199 199 for i in range(len(self.stop_callbacks)):
200 200 d = self.stop_callbacks.pop()
201 201 d(data)
202 202 return data
203 203
204 204 def signal(self, sig):
205 205 """Signal the process.
206 206
207 207 Parameters
208 208 ----------
209 209 sig : str or int
210 210 'KILL', 'INT', etc., or any signal number
211 211 """
212 212 raise NotImplementedError('signal must be implemented in a subclass')
213 213
214 214 class ClusterAppMixin(HasTraits):
215 215 """MixIn for cluster args as traits"""
216 216 profile_dir=Unicode('')
217 217 cluster_id=Unicode('')
218 218
219 219 @property
220 220 def cluster_args(self):
221 221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
222 222
223 223 class ControllerMixin(ClusterAppMixin):
224 224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
225 225 help="""Popen command to launch ipcontroller.""")
226 226 # Command line arguments to ipcontroller.
227 227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
228 228 help="""command-line args to pass to ipcontroller""")
229 229
230 230 class EngineMixin(ClusterAppMixin):
231 231 engine_cmd = List(ipengine_cmd_argv, config=True,
232 232 help="""command to launch the Engine.""")
233 233 # Command line arguments for ipengine.
234 234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 235 help="command-line arguments to pass to ipengine"
236 236 )
237 237
238 238
239 239 #-----------------------------------------------------------------------------
240 240 # Local process launchers
241 241 #-----------------------------------------------------------------------------
242 242
243 243
244 244 class LocalProcessLauncher(BaseLauncher):
245 245 """Start and stop an external process in an asynchronous manner.
246 246
247 247 This will launch the external process with a working directory of
248 248 ``self.work_dir``.
249 249 """
250 250
251 251 # This is used to to construct self.args, which is passed to
252 252 # spawnProcess.
253 253 cmd_and_args = List([])
254 254 poll_frequency = Integer(100) # in ms
255 255
256 256 def __init__(self, work_dir=u'.', config=None, **kwargs):
257 257 super(LocalProcessLauncher, self).__init__(
258 258 work_dir=work_dir, config=config, **kwargs
259 259 )
260 260 self.process = None
261 261 self.poller = None
262 262
263 263 def find_args(self):
264 264 return self.cmd_and_args
265 265
266 266 def start(self):
267 267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
268 268 if self.state == 'before':
269 269 self.process = Popen(self.args,
270 270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
271 271 env=os.environ,
272 272 cwd=self.work_dir
273 273 )
274 274 if WINDOWS:
275 275 self.stdout = forward_read_events(self.process.stdout)
276 276 self.stderr = forward_read_events(self.process.stderr)
277 277 else:
278 278 self.stdout = self.process.stdout.fileno()
279 279 self.stderr = self.process.stderr.fileno()
280 280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
281 281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
282 282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
283 283 self.poller.start()
284 284 self.notify_start(self.process.pid)
285 285 else:
286 286 s = 'The process was already started and has state: %r' % self.state
287 287 raise ProcessStateError(s)
288 288
289 289 def stop(self):
290 290 return self.interrupt_then_kill()
291 291
292 292 def signal(self, sig):
293 293 if self.state == 'running':
294 294 if WINDOWS and sig != SIGINT:
295 295 # use Windows tree-kill for better child cleanup
296 296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
297 297 else:
298 298 self.process.send_signal(sig)
299 299
300 300 def interrupt_then_kill(self, delay=2.0):
301 301 """Send INT, wait a delay and then send KILL."""
302 302 try:
303 303 self.signal(SIGINT)
304 304 except Exception:
305 305 self.log.debug("interrupt failed")
306 306 pass
307 307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
308 308 self.killer.start()
309 309
310 310 # callbacks, etc:
311 311
312 312 def handle_stdout(self, fd, events):
313 313 if WINDOWS:
314 314 line = self.stdout.recv()
315 315 else:
316 316 line = self.process.stdout.readline()
317 317 # a stopped process will be readable but return empty strings
318 318 if line:
319 319 self.log.debug(line[:-1])
320 320 else:
321 321 self.poll()
322 322
323 323 def handle_stderr(self, fd, events):
324 324 if WINDOWS:
325 325 line = self.stderr.recv()
326 326 else:
327 327 line = self.process.stderr.readline()
328 328 # a stopped process will be readable but return empty strings
329 329 if line:
330 330 self.log.debug(line[:-1])
331 331 else:
332 332 self.poll()
333 333
334 334 def poll(self):
335 335 status = self.process.poll()
336 336 if status is not None:
337 337 self.poller.stop()
338 338 self.loop.remove_handler(self.stdout)
339 339 self.loop.remove_handler(self.stderr)
340 340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
341 341 return status
342 342
343 343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
344 344 """Launch a controller as a regular external process."""
345 345
346 346 def find_args(self):
347 347 return self.controller_cmd + self.cluster_args + self.controller_args
348 348
349 349 def start(self):
350 350 """Start the controller by profile_dir."""
351 351 return super(LocalControllerLauncher, self).start()
352 352
353 353
354 354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
355 355 """Launch a single engine as a regular externall process."""
356 356
357 357 def find_args(self):
358 358 return self.engine_cmd + self.cluster_args + self.engine_args
359 359
360 360
361 361 class LocalEngineSetLauncher(LocalEngineLauncher):
362 362 """Launch a set of engines as regular external processes."""
363 363
364 364 delay = CFloat(0.1, config=True,
365 365 help="""delay (in seconds) between starting each engine after the first.
366 366 This can help force the engines to get their ids in order, or limit
367 367 process flood when starting many engines."""
368 368 )
369 369
370 370 # launcher class
371 371 launcher_class = LocalEngineLauncher
372 372
373 373 launchers = Dict()
374 374 stop_data = Dict()
375 375
376 376 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 377 super(LocalEngineSetLauncher, self).__init__(
378 378 work_dir=work_dir, config=config, **kwargs
379 379 )
380 380 self.stop_data = {}
381 381
382 382 def start(self, n):
383 383 """Start n engines by profile or profile_dir."""
384 384 dlist = []
385 385 for i in range(n):
386 386 if i > 0:
387 387 time.sleep(self.delay)
388 388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
389 389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
390 390 )
391 391
392 392 # Copy the engine args over to each engine launcher.
393 393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
394 394 el.engine_args = copy.deepcopy(self.engine_args)
395 395 el.on_stop(self._notice_engine_stopped)
396 396 d = el.start()
397 397 self.launchers[i] = el
398 398 dlist.append(d)
399 399 self.notify_start(dlist)
400 400 return dlist
401 401
402 402 def find_args(self):
403 403 return ['engine set']
404 404
405 405 def signal(self, sig):
406 406 dlist = []
407 407 for el in self.launchers.itervalues():
408 408 d = el.signal(sig)
409 409 dlist.append(d)
410 410 return dlist
411 411
412 412 def interrupt_then_kill(self, delay=1.0):
413 413 dlist = []
414 414 for el in self.launchers.itervalues():
415 415 d = el.interrupt_then_kill(delay)
416 416 dlist.append(d)
417 417 return dlist
418 418
419 419 def stop(self):
420 420 return self.interrupt_then_kill()
421 421
422 422 def _notice_engine_stopped(self, data):
423 423 pid = data['pid']
424 424 for idx,el in self.launchers.iteritems():
425 425 if el.process.pid == pid:
426 426 break
427 427 self.launchers.pop(idx)
428 428 self.stop_data[idx] = data
429 429 if not self.launchers:
430 430 self.notify_stop(self.stop_data)
431 431
432 432
433 433 #-----------------------------------------------------------------------------
434 434 # MPI launchers
435 435 #-----------------------------------------------------------------------------
436 436
437 437
438 438 class MPILauncher(LocalProcessLauncher):
439 439 """Launch an external process using mpiexec."""
440 440
441 441 mpi_cmd = List(['mpiexec'], config=True,
442 442 help="The mpiexec command to use in starting the process."
443 443 )
444 444 mpi_args = List([], config=True,
445 445 help="The command line arguments to pass to mpiexec."
446 446 )
447 447 program = List(['date'],
448 448 help="The program to start via mpiexec.")
449 449 program_args = List([],
450 450 help="The command line argument to the program."
451 451 )
452 452 n = Integer(1)
453 453
454 454 def __init__(self, *args, **kwargs):
455 455 # deprecation for old MPIExec names:
456 456 config = kwargs.get('config', {})
457 457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
458 458 deprecated = config.get(oldname)
459 459 if deprecated:
460 460 newname = oldname.replace('MPIExec', 'MPI')
461 461 config[newname].update(deprecated)
462 462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
463 463
464 464 super(MPILauncher, self).__init__(*args, **kwargs)
465 465
466 466 def find_args(self):
467 467 """Build self.args using all the fields."""
468 468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 469 self.program + self.program_args
470 470
471 471 def start(self, n):
472 472 """Start n instances of the program using mpiexec."""
473 473 self.n = n
474 474 return super(MPILauncher, self).start()
475 475
476 476
477 477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
478 478 """Launch a controller using mpiexec."""
479 479
480 480 # alias back to *non-configurable* program[_args] for use in find_args()
481 481 # this way all Controller/EngineSetLaunchers have the same form, rather
482 482 # than *some* having `program_args` and others `controller_args`
483 483 @property
484 484 def program(self):
485 485 return self.controller_cmd
486 486
487 487 @property
488 488 def program_args(self):
489 489 return self.cluster_args + self.controller_args
490 490
491 491 def start(self):
492 492 """Start the controller by profile_dir."""
493 493 return super(MPIControllerLauncher, self).start(1)
494 494
495 495
496 496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
497 497 """Launch engines using mpiexec"""
498 498
499 499 # alias back to *non-configurable* program[_args] for use in find_args()
500 500 # this way all Controller/EngineSetLaunchers have the same form, rather
501 501 # than *some* having `program_args` and others `controller_args`
502 502 @property
503 503 def program(self):
504 504 return self.engine_cmd
505 505
506 506 @property
507 507 def program_args(self):
508 508 return self.cluster_args + self.engine_args
509 509
510 510 def start(self, n):
511 511 """Start n engines by profile or profile_dir."""
512 512 self.n = n
513 513 return super(MPIEngineSetLauncher, self).start(n)
514 514
515 515 # deprecated MPIExec names
516 516 class DeprecatedMPILauncher(object):
517 517 def warn(self):
518 518 oldname = self.__class__.__name__
519 519 newname = oldname.replace('MPIExec', 'MPI')
520 520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
521 521
522 522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
523 523 """Deprecated, use MPILauncher"""
524 524 def __init__(self, *args, **kwargs):
525 525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
526 526 self.warn()
527 527
528 528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
529 529 """Deprecated, use MPIControllerLauncher"""
530 530 def __init__(self, *args, **kwargs):
531 531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
532 532 self.warn()
533 533
534 534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
535 535 """Deprecated, use MPIEngineSetLauncher"""
536 536 def __init__(self, *args, **kwargs):
537 537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
538 538 self.warn()
539 539
540 540
541 541 #-----------------------------------------------------------------------------
542 542 # SSH launchers
543 543 #-----------------------------------------------------------------------------
544 544
545 545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
546 546
547 547 class SSHLauncher(LocalProcessLauncher):
548 548 """A minimal launcher for ssh.
549 549
550 550 To be useful this will probably have to be extended to use the ``sshx``
551 551 idea for environment variables. There could be other things this needs
552 552 as well.
553 553 """
554 554
555 555 ssh_cmd = List(['ssh'], config=True,
556 556 help="command for starting ssh")
557 557 ssh_args = List(['-tt'], config=True,
558 558 help="args to pass to ssh")
559 559 scp_cmd = List(['scp'], config=True,
560 560 help="command for sending files")
561 561 program = List(['date'],
562 562 help="Program to launch via ssh")
563 563 program_args = List([],
564 564 help="args to pass to remote program")
565 565 hostname = Unicode('', config=True,
566 566 help="hostname on which to launch the program")
567 567 user = Unicode('', config=True,
568 568 help="username for ssh")
569 569 location = Unicode('', config=True,
570 570 help="user@hostname location for ssh in one setting")
571 571 to_fetch = List([], config=True,
572 572 help="List of (remote, local) files to fetch after starting")
573 573 to_send = List([], config=True,
574 574 help="List of (local, remote) files to send before starting")
575 575
576 576 def _hostname_changed(self, name, old, new):
577 577 if self.user:
578 578 self.location = u'%s@%s' % (self.user, new)
579 579 else:
580 580 self.location = new
581 581
582 582 def _user_changed(self, name, old, new):
583 583 self.location = u'%s@%s' % (new, self.hostname)
584 584
585 585 def find_args(self):
586 586 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 587 list(map(pipes.quote, self.program + self.program_args))
588 588
589 589 def _send_file(self, local, remote):
590 590 """send a single file"""
591 591 remote = "%s:%s" % (self.location, remote)
592 592 for i in range(10):
593 593 if not os.path.exists(local):
594 594 self.log.debug("waiting for %s" % local)
595 595 time.sleep(1)
596 596 else:
597 597 break
598 598 self.log.info("sending %s to %s", local, remote)
599 599 check_output(self.scp_cmd + [local, remote])
600 600
601 601 def send_files(self):
602 602 """send our files (called before start)"""
603 603 if not self.to_send:
604 604 return
605 605 for local_file, remote_file in self.to_send:
606 606 self._send_file(local_file, remote_file)
607 607
608 608 def _fetch_file(self, remote, local):
609 609 """fetch a single file"""
610 610 full_remote = "%s:%s" % (self.location, remote)
611 611 self.log.info("fetching %s from %s", local, full_remote)
612 612 for i in range(10):
613 613 # wait up to 10s for remote file to exist
614 614 check = check_output(self.ssh_cmd + self.ssh_args + \
615 615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
616 616 check = check.strip()
617 617 if check == 'no':
618 618 time.sleep(1)
619 619 elif check == 'yes':
620 620 break
621 621 check_output(self.scp_cmd + [full_remote, local])
622 622
623 623 def fetch_files(self):
624 624 """fetch remote files (called after start)"""
625 625 if not self.to_fetch:
626 626 return
627 627 for remote_file, local_file in self.to_fetch:
628 628 self._fetch_file(remote_file, local_file)
629 629
630 630 def start(self, hostname=None, user=None):
631 631 if hostname is not None:
632 632 self.hostname = hostname
633 633 if user is not None:
634 634 self.user = user
635 635
636 636 self.send_files()
637 637 super(SSHLauncher, self).start()
638 638 self.fetch_files()
639 639
640 640 def signal(self, sig):
641 641 if self.state == 'running':
642 642 # send escaped ssh connection-closer
643 643 self.process.stdin.write('~.')
644 644 self.process.stdin.flush()
645 645
646 646 class SSHClusterLauncher(SSHLauncher):
647 647
648 648 remote_profile_dir = Unicode('', config=True,
649 649 help="""The remote profile_dir to use.
650 650
651 651 If not specified, use calling profile, stripping out possible leading homedir.
652 652 """)
653 653
654 def _remote_profile_dir_default(self):
655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
656 """
654 def _profile_dir_changed(self, name, old, new):
655 if not self.remote_profile_dir:
656 # trigger remote_profile_dir_default logic again,
657 # in case it was already triggered before profile_dir was set
658 self.remote_profile_dir = self._strip_home(new)
659
660 @staticmethod
661 def _strip_home(path):
662 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
657 663 home = get_home_dir()
658 664 if not home.endswith('/'):
659 665 home = home+'/'
660 666
661 if self.profile_dir.startswith(home):
662 return self.profile_dir[len(home):]
667 if path.startswith(home):
668 return path[len(home):]
663 669 else:
664 return self.profile_dir
670 return path
671
672 def _remote_profile_dir_default(self):
673 return self._strip_home(self.profile_dir)
665 674
666 675 def _cluster_id_changed(self, name, old, new):
667 676 if new:
668 677 raise ValueError("cluster id not supported by SSH launchers")
669 678
670 679 @property
671 680 def cluster_args(self):
672 681 return ['--profile-dir', self.remote_profile_dir]
673 682
674 683 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
675 684
676 685 # alias back to *non-configurable* program[_args] for use in find_args()
677 686 # this way all Controller/EngineSetLaunchers have the same form, rather
678 687 # than *some* having `program_args` and others `controller_args`
679 688
680 689 def _controller_cmd_default(self):
681 690 return ['ipcontroller']
682 691
683 692 @property
684 693 def program(self):
685 694 return self.controller_cmd
686 695
687 696 @property
688 697 def program_args(self):
689 698 return self.cluster_args + self.controller_args
690 699
691 700 def _to_fetch_default(self):
692 701 return [
693 702 (os.path.join(self.remote_profile_dir, 'security', cf),
694 703 os.path.join(self.profile_dir, 'security', cf),)
695 704 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
696 705 ]
697 706
698 707 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
699 708
700 709 # alias back to *non-configurable* program[_args] for use in find_args()
701 710 # this way all Controller/EngineSetLaunchers have the same form, rather
702 711 # than *some* having `program_args` and others `controller_args`
703 712
704 713 def _engine_cmd_default(self):
705 714 return ['ipengine']
706 715
707 716 @property
708 717 def program(self):
709 718 return self.engine_cmd
710 719
711 720 @property
712 721 def program_args(self):
713 722 return self.cluster_args + self.engine_args
714 723
715 724 def _to_send_default(self):
716 725 return [
717 726 (os.path.join(self.profile_dir, 'security', cf),
718 727 os.path.join(self.remote_profile_dir, 'security', cf))
719 728 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
720 729 ]
721 730
722 731
723 732 class SSHEngineSetLauncher(LocalEngineSetLauncher):
724 733 launcher_class = SSHEngineLauncher
725 734 engines = Dict(config=True,
726 735 help="""dict of engines to launch. This is a dict by hostname of ints,
727 736 corresponding to the number of engines to start on that host.""")
728 737
729 738 def _engine_cmd_default(self):
730 739 return ['ipengine']
731 740
732 741 @property
733 742 def engine_count(self):
734 743 """determine engine count from `engines` dict"""
735 744 count = 0
736 745 for n in self.engines.itervalues():
737 746 if isinstance(n, (tuple,list)):
738 747 n,args = n
739 748 count += n
740 749 return count
741 750
742 751 def start(self, n):
743 752 """Start engines by profile or profile_dir.
744 753 `n` is ignored, and the `engines` config property is used instead.
745 754 """
746 755
747 756 dlist = []
748 757 for host, n in self.engines.iteritems():
749 758 if isinstance(n, (tuple, list)):
750 759 n, args = n
751 760 else:
752 761 args = copy.deepcopy(self.engine_args)
753 762
754 763 if '@' in host:
755 764 user,host = host.split('@',1)
756 765 else:
757 766 user=None
758 767 for i in range(n):
759 768 if i > 0:
760 769 time.sleep(self.delay)
761 770 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
762 771 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
763 772 )
764 773 if i > 0:
765 774 # only send files for the first engine on each host
766 775 el.to_send = []
767 776
768 777 # Copy the engine args over to each engine launcher.
769 778 el.engine_cmd = self.engine_cmd
770 779 el.engine_args = args
771 780 el.on_stop(self._notice_engine_stopped)
772 781 d = el.start(user=user, hostname=host)
773 782 self.launchers[ "%s/%i" % (host,i) ] = el
774 783 dlist.append(d)
775 784 self.notify_start(dlist)
776 785 return dlist
777 786
778 787
779 788 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
780 789 """Launcher for calling
781 790 `ipcluster engines` on a remote machine.
782 791
783 792 Requires that remote profile is already configured.
784 793 """
785 794
786 795 n = Integer()
787 796 ipcluster_cmd = List(['ipcluster'], config=True)
788 797
789 798 @property
790 799 def program(self):
791 800 return self.ipcluster_cmd + ['engines']
792 801
793 802 @property
794 803 def program_args(self):
795 804 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
796 805
797 806 def _to_send_default(self):
798 807 return [
799 808 (os.path.join(self.profile_dir, 'security', cf),
800 809 os.path.join(self.remote_profile_dir, 'security', cf))
801 810 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
802 811 ]
803 812
804 813 def start(self, n):
805 814 self.n = n
806 815 super(SSHProxyEngineSetLauncher, self).start()
807 816
808 817
809 818 #-----------------------------------------------------------------------------
810 819 # Windows HPC Server 2008 scheduler launchers
811 820 #-----------------------------------------------------------------------------
812 821
813 822
814 823 # This is only used on Windows.
815 824 def find_job_cmd():
816 825 if WINDOWS:
817 826 try:
818 827 return find_cmd('job')
819 828 except (FindCmdError, ImportError):
820 829 # ImportError will be raised if win32api is not installed
821 830 return 'job'
822 831 else:
823 832 return 'job'
824 833
825 834
826 835 class WindowsHPCLauncher(BaseLauncher):
827 836
828 837 job_id_regexp = CRegExp(r'\d+', config=True,
829 838 help="""A regular expression used to get the job id from the output of the
830 839 submit_command. """
831 840 )
832 841 job_file_name = Unicode(u'ipython_job.xml', config=True,
833 842 help="The filename of the instantiated job script.")
834 843 # The full path to the instantiated job script. This gets made dynamically
835 844 # by combining the work_dir with the job_file_name.
836 845 job_file = Unicode(u'')
837 846 scheduler = Unicode('', config=True,
838 847 help="The hostname of the scheduler to submit the job to.")
839 848 job_cmd = Unicode(find_job_cmd(), config=True,
840 849 help="The command for submitting jobs.")
841 850
842 851 def __init__(self, work_dir=u'.', config=None, **kwargs):
843 852 super(WindowsHPCLauncher, self).__init__(
844 853 work_dir=work_dir, config=config, **kwargs
845 854 )
846 855
847 856 @property
848 857 def job_file(self):
849 858 return os.path.join(self.work_dir, self.job_file_name)
850 859
851 860 def write_job_file(self, n):
852 861 raise NotImplementedError("Implement write_job_file in a subclass.")
853 862
854 863 def find_args(self):
855 864 return [u'job.exe']
856 865
857 866 def parse_job_id(self, output):
858 867 """Take the output of the submit command and return the job id."""
859 868 m = self.job_id_regexp.search(output)
860 869 if m is not None:
861 870 job_id = m.group()
862 871 else:
863 872 raise LauncherError("Job id couldn't be determined: %s" % output)
864 873 self.job_id = job_id
865 874 self.log.info('Job started with id: %r', job_id)
866 875 return job_id
867 876
868 877 def start(self, n):
869 878 """Start n copies of the process using the Win HPC job scheduler."""
870 879 self.write_job_file(n)
871 880 args = [
872 881 'submit',
873 882 '/jobfile:%s' % self.job_file,
874 883 '/scheduler:%s' % self.scheduler
875 884 ]
876 885 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
877 886
878 887 output = check_output([self.job_cmd]+args,
879 888 env=os.environ,
880 889 cwd=self.work_dir,
881 890 stderr=STDOUT
882 891 )
883 892 job_id = self.parse_job_id(output)
884 893 self.notify_start(job_id)
885 894 return job_id
886 895
887 896 def stop(self):
888 897 args = [
889 898 'cancel',
890 899 self.job_id,
891 900 '/scheduler:%s' % self.scheduler
892 901 ]
893 902 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
894 903 try:
895 904 output = check_output([self.job_cmd]+args,
896 905 env=os.environ,
897 906 cwd=self.work_dir,
898 907 stderr=STDOUT
899 908 )
900 909 except:
901 910 output = 'The job already appears to be stoppped: %r' % self.job_id
902 911 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
903 912 return output
904 913
905 914
906 915 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
907 916
908 917 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
909 918 help="WinHPC xml job file.")
910 919 controller_args = List([], config=False,
911 920 help="extra args to pass to ipcontroller")
912 921
913 922 def write_job_file(self, n):
914 923 job = IPControllerJob(config=self.config)
915 924
916 925 t = IPControllerTask(config=self.config)
917 926 # The tasks work directory is *not* the actual work directory of
918 927 # the controller. It is used as the base path for the stdout/stderr
919 928 # files that the scheduler redirects to.
920 929 t.work_directory = self.profile_dir
921 930 # Add the profile_dir and from self.start().
922 931 t.controller_args.extend(self.cluster_args)
923 932 t.controller_args.extend(self.controller_args)
924 933 job.add_task(t)
925 934
926 935 self.log.debug("Writing job description file: %s", self.job_file)
927 936 job.write(self.job_file)
928 937
929 938 @property
930 939 def job_file(self):
931 940 return os.path.join(self.profile_dir, self.job_file_name)
932 941
933 942 def start(self):
934 943 """Start the controller by profile_dir."""
935 944 return super(WindowsHPCControllerLauncher, self).start(1)
936 945
937 946
938 947 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
939 948
940 949 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
941 950 help="jobfile for ipengines job")
942 951 engine_args = List([], config=False,
943 952 help="extra args to pas to ipengine")
944 953
945 954 def write_job_file(self, n):
946 955 job = IPEngineSetJob(config=self.config)
947 956
948 957 for i in range(n):
949 958 t = IPEngineTask(config=self.config)
950 959 # The tasks work directory is *not* the actual work directory of
951 960 # the engine. It is used as the base path for the stdout/stderr
952 961 # files that the scheduler redirects to.
953 962 t.work_directory = self.profile_dir
954 963 # Add the profile_dir and from self.start().
955 964 t.engine_args.extend(self.cluster_args)
956 965 t.engine_args.extend(self.engine_args)
957 966 job.add_task(t)
958 967
959 968 self.log.debug("Writing job description file: %s", self.job_file)
960 969 job.write(self.job_file)
961 970
962 971 @property
963 972 def job_file(self):
964 973 return os.path.join(self.profile_dir, self.job_file_name)
965 974
966 975 def start(self, n):
967 976 """Start the controller by profile_dir."""
968 977 return super(WindowsHPCEngineSetLauncher, self).start(n)
969 978
970 979
971 980 #-----------------------------------------------------------------------------
972 981 # Batch (PBS) system launchers
973 982 #-----------------------------------------------------------------------------
974 983
975 984 class BatchClusterAppMixin(ClusterAppMixin):
976 985 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
977 986 def _profile_dir_changed(self, name, old, new):
978 987 self.context[name] = new
979 988 _cluster_id_changed = _profile_dir_changed
980 989
981 990 def _profile_dir_default(self):
982 991 self.context['profile_dir'] = ''
983 992 return ''
984 993 def _cluster_id_default(self):
985 994 self.context['cluster_id'] = ''
986 995 return ''
987 996
988 997
989 998 class BatchSystemLauncher(BaseLauncher):
990 999 """Launch an external process using a batch system.
991 1000
992 1001 This class is designed to work with UNIX batch systems like PBS, LSF,
993 1002 GridEngine, etc. The overall model is that there are different commands
994 1003 like qsub, qdel, etc. that handle the starting and stopping of the process.
995 1004
996 1005 This class also has the notion of a batch script. The ``batch_template``
997 1006 attribute can be set to a string that is a template for the batch script.
998 1007 This template is instantiated using string formatting. Thus the template can
999 1008 use {n} fot the number of instances. Subclasses can add additional variables
1000 1009 to the template dict.
1001 1010 """
1002 1011
1003 1012 # Subclasses must fill these in. See PBSEngineSet
1004 1013 submit_command = List([''], config=True,
1005 1014 help="The name of the command line program used to submit jobs.")
1006 1015 delete_command = List([''], config=True,
1007 1016 help="The name of the command line program used to delete jobs.")
1008 1017 job_id_regexp = CRegExp('', config=True,
1009 1018 help="""A regular expression used to get the job id from the output of the
1010 1019 submit_command.""")
1011 1020 batch_template = Unicode('', config=True,
1012 1021 help="The string that is the batch script template itself.")
1013 1022 batch_template_file = Unicode(u'', config=True,
1014 1023 help="The file that contains the batch template.")
1015 1024 batch_file_name = Unicode(u'batch_script', config=True,
1016 1025 help="The filename of the instantiated batch script.")
1017 1026 queue = Unicode(u'', config=True,
1018 1027 help="The PBS Queue.")
1019 1028
1020 1029 def _queue_changed(self, name, old, new):
1021 1030 self.context[name] = new
1022 1031
1023 1032 n = Integer(1)
1024 1033 _n_changed = _queue_changed
1025 1034
1026 1035 # not configurable, override in subclasses
1027 1036 # PBS Job Array regex
1028 1037 job_array_regexp = CRegExp('')
1029 1038 job_array_template = Unicode('')
1030 1039 # PBS Queue regex
1031 1040 queue_regexp = CRegExp('')
1032 1041 queue_template = Unicode('')
1033 1042 # The default batch template, override in subclasses
1034 1043 default_template = Unicode('')
1035 1044 # The full path to the instantiated batch script.
1036 1045 batch_file = Unicode(u'')
1037 1046 # the format dict used with batch_template:
1038 1047 context = Dict()
1039 1048 def _context_default(self):
1040 1049 """load the default context with the default values for the basic keys
1041 1050
1042 1051 because the _trait_changed methods only load the context if they
1043 1052 are set to something other than the default value.
1044 1053 """
1045 1054 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1046 1055
1047 1056 # the Formatter instance for rendering the templates:
1048 1057 formatter = Instance(EvalFormatter, (), {})
1049 1058
1050 1059
1051 1060 def find_args(self):
1052 1061 return self.submit_command + [self.batch_file]
1053 1062
1054 1063 def __init__(self, work_dir=u'.', config=None, **kwargs):
1055 1064 super(BatchSystemLauncher, self).__init__(
1056 1065 work_dir=work_dir, config=config, **kwargs
1057 1066 )
1058 1067 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1059 1068
1060 1069 def parse_job_id(self, output):
1061 1070 """Take the output of the submit command and return the job id."""
1062 1071 m = self.job_id_regexp.search(output)
1063 1072 if m is not None:
1064 1073 job_id = m.group()
1065 1074 else:
1066 1075 raise LauncherError("Job id couldn't be determined: %s" % output)
1067 1076 self.job_id = job_id
1068 1077 self.log.info('Job submitted with job id: %r', job_id)
1069 1078 return job_id
1070 1079
1071 1080 def write_batch_script(self, n):
1072 1081 """Instantiate and write the batch script to the work_dir."""
1073 1082 self.n = n
1074 1083 # first priority is batch_template if set
1075 1084 if self.batch_template_file and not self.batch_template:
1076 1085 # second priority is batch_template_file
1077 1086 with open(self.batch_template_file) as f:
1078 1087 self.batch_template = f.read()
1079 1088 if not self.batch_template:
1080 1089 # third (last) priority is default_template
1081 1090 self.batch_template = self.default_template
1082 1091
1083 1092 # add jobarray or queue lines to user-specified template
1084 1093 # note that this is *only* when user did not specify a template.
1085 1094 # print self.job_array_regexp.search(self.batch_template)
1086 1095 if not self.job_array_regexp.search(self.batch_template):
1087 1096 self.log.debug("adding job array settings to batch script")
1088 1097 firstline, rest = self.batch_template.split('\n',1)
1089 1098 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1090 1099
1091 1100 # print self.queue_regexp.search(self.batch_template)
1092 1101 if self.queue and not self.queue_regexp.search(self.batch_template):
1093 1102 self.log.debug("adding PBS queue settings to batch script")
1094 1103 firstline, rest = self.batch_template.split('\n',1)
1095 1104 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1096 1105
1097 1106 script_as_string = self.formatter.format(self.batch_template, **self.context)
1098 1107 self.log.debug('Writing batch script: %s', self.batch_file)
1099 1108
1100 1109 with open(self.batch_file, 'w') as f:
1101 1110 f.write(script_as_string)
1102 1111 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1103 1112
1104 1113 def start(self, n):
1105 1114 """Start n copies of the process using a batch system."""
1106 1115 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1107 1116 # Here we save profile_dir in the context so they
1108 1117 # can be used in the batch script template as {profile_dir}
1109 1118 self.write_batch_script(n)
1110 1119 output = check_output(self.args, env=os.environ)
1111 1120
1112 1121 job_id = self.parse_job_id(output)
1113 1122 self.notify_start(job_id)
1114 1123 return job_id
1115 1124
1116 1125 def stop(self):
1117 1126 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1118 1127 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1119 1128 return output
1120 1129
1121 1130
1122 1131 class PBSLauncher(BatchSystemLauncher):
1123 1132 """A BatchSystemLauncher subclass for PBS."""
1124 1133
1125 1134 submit_command = List(['qsub'], config=True,
1126 1135 help="The PBS submit command ['qsub']")
1127 1136 delete_command = List(['qdel'], config=True,
1128 1137 help="The PBS delete command ['qsub']")
1129 1138 job_id_regexp = CRegExp(r'\d+', config=True,
1130 1139 help="Regular expresion for identifying the job ID [r'\d+']")
1131 1140
1132 1141 batch_file = Unicode(u'')
1133 1142 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1134 1143 job_array_template = Unicode('#PBS -t 1-{n}')
1135 1144 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1136 1145 queue_template = Unicode('#PBS -q {queue}')
1137 1146
1138 1147
1139 1148 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1140 1149 """Launch a controller using PBS."""
1141 1150
1142 1151 batch_file_name = Unicode(u'pbs_controller', config=True,
1143 1152 help="batch file name for the controller job.")
1144 1153 default_template= Unicode("""#!/bin/sh
1145 1154 #PBS -V
1146 1155 #PBS -N ipcontroller
1147 1156 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1148 1157 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1149 1158
1150 1159
1151 1160 def start(self):
1152 1161 """Start the controller by profile or profile_dir."""
1153 1162 return super(PBSControllerLauncher, self).start(1)
1154 1163
1155 1164
1156 1165 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1157 1166 """Launch Engines using PBS"""
1158 1167 batch_file_name = Unicode(u'pbs_engines', config=True,
1159 1168 help="batch file name for the engine(s) job.")
1160 1169 default_template= Unicode(u"""#!/bin/sh
1161 1170 #PBS -V
1162 1171 #PBS -N ipengine
1163 1172 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1164 1173 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1165 1174
1166 1175 def start(self, n):
1167 1176 """Start n engines by profile or profile_dir."""
1168 1177 return super(PBSEngineSetLauncher, self).start(n)
1169 1178
1170 1179 #SGE is very similar to PBS
1171 1180
1172 1181 class SGELauncher(PBSLauncher):
1173 1182 """Sun GridEngine is a PBS clone with slightly different syntax"""
1174 1183 job_array_regexp = CRegExp('#\$\W+\-t')
1175 1184 job_array_template = Unicode('#$ -t 1-{n}')
1176 1185 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1177 1186 queue_template = Unicode('#$ -q {queue}')
1178 1187
1179 1188 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1180 1189 """Launch a controller using SGE."""
1181 1190
1182 1191 batch_file_name = Unicode(u'sge_controller', config=True,
1183 1192 help="batch file name for the ipontroller job.")
1184 1193 default_template= Unicode(u"""#$ -V
1185 1194 #$ -S /bin/sh
1186 1195 #$ -N ipcontroller
1187 1196 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 1197 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1189 1198
1190 1199 def start(self):
1191 1200 """Start the controller by profile or profile_dir."""
1192 1201 return super(SGEControllerLauncher, self).start(1)
1193 1202
1194 1203 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1195 1204 """Launch Engines with SGE"""
1196 1205 batch_file_name = Unicode(u'sge_engines', config=True,
1197 1206 help="batch file name for the engine(s) job.")
1198 1207 default_template = Unicode("""#$ -V
1199 1208 #$ -S /bin/sh
1200 1209 #$ -N ipengine
1201 1210 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1202 1211 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1203 1212
1204 1213 def start(self, n):
1205 1214 """Start n engines by profile or profile_dir."""
1206 1215 return super(SGEEngineSetLauncher, self).start(n)
1207 1216
1208 1217
1209 1218 # LSF launchers
1210 1219
1211 1220 class LSFLauncher(BatchSystemLauncher):
1212 1221 """A BatchSystemLauncher subclass for LSF."""
1213 1222
1214 1223 submit_command = List(['bsub'], config=True,
1215 1224 help="The PBS submit command ['bsub']")
1216 1225 delete_command = List(['bkill'], config=True,
1217 1226 help="The PBS delete command ['bkill']")
1218 1227 job_id_regexp = CRegExp(r'\d+', config=True,
1219 1228 help="Regular expresion for identifying the job ID [r'\d+']")
1220 1229
1221 1230 batch_file = Unicode(u'')
1222 1231 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1223 1232 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1224 1233 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1225 1234 queue_template = Unicode('#BSUB -q {queue}')
1226 1235
1227 1236 def start(self, n):
1228 1237 """Start n copies of the process using LSF batch system.
1229 1238 This cant inherit from the base class because bsub expects
1230 1239 to be piped a shell script in order to honor the #BSUB directives :
1231 1240 bsub < script
1232 1241 """
1233 1242 # Here we save profile_dir in the context so they
1234 1243 # can be used in the batch script template as {profile_dir}
1235 1244 self.write_batch_script(n)
1236 1245 #output = check_output(self.args, env=os.environ)
1237 1246 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1238 1247 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1239 1248 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1240 1249 output,err = p.communicate()
1241 1250 job_id = self.parse_job_id(output)
1242 1251 self.notify_start(job_id)
1243 1252 return job_id
1244 1253
1245 1254
1246 1255 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1247 1256 """Launch a controller using LSF."""
1248 1257
1249 1258 batch_file_name = Unicode(u'lsf_controller', config=True,
1250 1259 help="batch file name for the controller job.")
1251 1260 default_template= Unicode("""#!/bin/sh
1252 1261 #BSUB -J ipcontroller
1253 1262 #BSUB -oo ipcontroller.o.%%J
1254 1263 #BSUB -eo ipcontroller.e.%%J
1255 1264 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1256 1265 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1257 1266
1258 1267 def start(self):
1259 1268 """Start the controller by profile or profile_dir."""
1260 1269 return super(LSFControllerLauncher, self).start(1)
1261 1270
1262 1271
1263 1272 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1264 1273 """Launch Engines using LSF"""
1265 1274 batch_file_name = Unicode(u'lsf_engines', config=True,
1266 1275 help="batch file name for the engine(s) job.")
1267 1276 default_template= Unicode(u"""#!/bin/sh
1268 1277 #BSUB -oo ipengine.o.%%J
1269 1278 #BSUB -eo ipengine.e.%%J
1270 1279 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1271 1280 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1272 1281
1273 1282 def start(self, n):
1274 1283 """Start n engines by profile or profile_dir."""
1275 1284 return super(LSFEngineSetLauncher, self).start(n)
1276 1285
1277 1286
1278 1287 #-----------------------------------------------------------------------------
1279 1288 # A launcher for ipcluster itself!
1280 1289 #-----------------------------------------------------------------------------
1281 1290
1282 1291
1283 1292 class IPClusterLauncher(LocalProcessLauncher):
1284 1293 """Launch the ipcluster program in an external process."""
1285 1294
1286 1295 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1287 1296 help="Popen command for ipcluster")
1288 1297 ipcluster_args = List(
1289 1298 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1290 1299 help="Command line arguments to pass to ipcluster.")
1291 1300 ipcluster_subcommand = Unicode('start')
1292 1301 profile = Unicode('default')
1293 1302 n = Integer(2)
1294 1303
1295 1304 def find_args(self):
1296 1305 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1297 1306 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1298 1307 self.ipcluster_args
1299 1308
1300 1309 def start(self):
1301 1310 return super(IPClusterLauncher, self).start()
1302 1311
1303 1312 #-----------------------------------------------------------------------------
1304 1313 # Collections of launchers
1305 1314 #-----------------------------------------------------------------------------
1306 1315
1307 1316 local_launchers = [
1308 1317 LocalControllerLauncher,
1309 1318 LocalEngineLauncher,
1310 1319 LocalEngineSetLauncher,
1311 1320 ]
1312 1321 mpi_launchers = [
1313 1322 MPILauncher,
1314 1323 MPIControllerLauncher,
1315 1324 MPIEngineSetLauncher,
1316 1325 ]
1317 1326 ssh_launchers = [
1318 1327 SSHLauncher,
1319 1328 SSHControllerLauncher,
1320 1329 SSHEngineLauncher,
1321 1330 SSHEngineSetLauncher,
1322 1331 ]
1323 1332 winhpc_launchers = [
1324 1333 WindowsHPCLauncher,
1325 1334 WindowsHPCControllerLauncher,
1326 1335 WindowsHPCEngineSetLauncher,
1327 1336 ]
1328 1337 pbs_launchers = [
1329 1338 PBSLauncher,
1330 1339 PBSControllerLauncher,
1331 1340 PBSEngineSetLauncher,
1332 1341 ]
1333 1342 sge_launchers = [
1334 1343 SGELauncher,
1335 1344 SGEControllerLauncher,
1336 1345 SGEEngineSetLauncher,
1337 1346 ]
1338 1347 lsf_launchers = [
1339 1348 LSFLauncher,
1340 1349 LSFControllerLauncher,
1341 1350 LSFEngineSetLauncher,
1342 1351 ]
1343 1352 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1344 1353 + pbs_launchers + sge_launchers + lsf_launchers
1345 1354
General Comments 0
You need to be logged in to leave comments. Login now