##// END OF EJS Templates
incremental improvements to SSH launchers...
MinRK -
Show More
@@ -1,1223 +1,1348 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27 import time
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import (
61 61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 62 )
63 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path, get_home_dir
64 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65 65
66 66 from .win32support import forward_read_events
67 67
68 68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 69
70 70 WINDOWS = os.name == 'nt'
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Paths to the kernel apps
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 78 'IPython.parallel.apps.ipclusterapp'
79 79 ))
80 80
81 81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 82 'IPython.parallel.apps.ipengineapp'
83 83 ))
84 84
85 85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 86 'IPython.parallel.apps.ipcontrollerapp'
87 87 ))
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # Base launchers and errors
91 91 #-----------------------------------------------------------------------------
92 92
93 93
94 94 class LauncherError(Exception):
95 95 pass
96 96
97 97
98 98 class ProcessStateError(LauncherError):
99 99 pass
100 100
101 101
102 102 class UnknownStatus(LauncherError):
103 103 pass
104 104
105 105
106 106 class BaseLauncher(LoggingConfigurable):
107 107 """An asbtraction for starting, stopping and signaling a process."""
108 108
109 109 # In all of the launchers, the work_dir is where child processes will be
110 110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 111 # passed into the __init__ method will override the config value.
112 112 # This should not be used to set the work_dir for the actual engine
113 113 # and controller. Instead, use their own config files or the
114 114 # controller_args, engine_args attributes of the launchers to add
115 115 # the work_dir option.
116 116 work_dir = Unicode(u'.')
117 117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 118
119 119 start_data = Any()
120 120 stop_data = Any()
121 121
122 122 def _loop_default(self):
123 123 return ioloop.IOLoop.instance()
124 124
125 125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 127 self.state = 'before' # can be before, running, after
128 128 self.stop_callbacks = []
129 129 self.start_data = None
130 130 self.stop_data = None
131 131
132 132 @property
133 133 def args(self):
134 134 """A list of cmd and args that will be used to start the process.
135 135
136 136 This is what is passed to :func:`spawnProcess` and the first element
137 137 will be the process name.
138 138 """
139 139 return self.find_args()
140 140
141 141 def find_args(self):
142 142 """The ``.args`` property calls this to find the args list.
143 143
144 144 Subcommand should implement this to construct the cmd and args.
145 145 """
146 146 raise NotImplementedError('find_args must be implemented in a subclass')
147 147
148 148 @property
149 149 def arg_str(self):
150 150 """The string form of the program arguments."""
151 151 return ' '.join(self.args)
152 152
153 153 @property
154 154 def running(self):
155 155 """Am I running."""
156 156 if self.state == 'running':
157 157 return True
158 158 else:
159 159 return False
160 160
161 161 def start(self):
162 162 """Start the process."""
163 163 raise NotImplementedError('start must be implemented in a subclass')
164 164
165 165 def stop(self):
166 166 """Stop the process and notify observers of stopping.
167 167
168 168 This method will return None immediately.
169 169 To observe the actual process stopping, see :meth:`on_stop`.
170 170 """
171 171 raise NotImplementedError('stop must be implemented in a subclass')
172 172
173 173 def on_stop(self, f):
174 174 """Register a callback to be called with this Launcher's stop_data
175 175 when the process actually finishes.
176 176 """
177 177 if self.state=='after':
178 178 return f(self.stop_data)
179 179 else:
180 180 self.stop_callbacks.append(f)
181 181
182 182 def notify_start(self, data):
183 183 """Call this to trigger startup actions.
184 184
185 185 This logs the process startup and sets the state to 'running'. It is
186 186 a pass-through so it can be used as a callback.
187 187 """
188 188
189 189 self.log.debug('Process %r started: %r', self.args[0], data)
190 190 self.start_data = data
191 191 self.state = 'running'
192 192 return data
193 193
194 194 def notify_stop(self, data):
195 195 """Call this to trigger process stop actions.
196 196
197 197 This logs the process stopping and sets the state to 'after'. Call
198 198 this to trigger callbacks registered via :meth:`on_stop`."""
199 199
200 200 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 201 self.stop_data = data
202 202 self.state = 'after'
203 203 for i in range(len(self.stop_callbacks)):
204 204 d = self.stop_callbacks.pop()
205 205 d(data)
206 206 return data
207 207
208 208 def signal(self, sig):
209 209 """Signal the process.
210 210
211 211 Parameters
212 212 ----------
213 213 sig : str or int
214 214 'KILL', 'INT', etc., or any signal number
215 215 """
216 216 raise NotImplementedError('signal must be implemented in a subclass')
217 217
218 218 class ClusterAppMixin(HasTraits):
219 219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 220 profile_dir=Unicode('')
222 221 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
222
223 @property
224 def cluster_args(self):
225 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
230 226
231 227 class ControllerMixin(ClusterAppMixin):
232 228 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 229 help="""Popen command to launch ipcontroller.""")
234 230 # Command line arguments to ipcontroller.
235 231 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 232 help="""command-line args to pass to ipcontroller""")
237 233
238 234 class EngineMixin(ClusterAppMixin):
239 235 engine_cmd = List(ipengine_cmd_argv, config=True,
240 236 help="""command to launch the Engine.""")
241 237 # Command line arguments for ipengine.
242 238 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 239 help="command-line arguments to pass to ipengine"
244 240 )
245 241
242
246 243 #-----------------------------------------------------------------------------
247 244 # Local process launchers
248 245 #-----------------------------------------------------------------------------
249 246
250 247
251 248 class LocalProcessLauncher(BaseLauncher):
252 249 """Start and stop an external process in an asynchronous manner.
253 250
254 251 This will launch the external process with a working directory of
255 252 ``self.work_dir``.
256 253 """
257 254
258 255 # This is used to to construct self.args, which is passed to
259 256 # spawnProcess.
260 257 cmd_and_args = List([])
261 258 poll_frequency = Integer(100) # in ms
262 259
263 260 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 261 super(LocalProcessLauncher, self).__init__(
265 262 work_dir=work_dir, config=config, **kwargs
266 263 )
267 264 self.process = None
268 265 self.poller = None
269 266
270 267 def find_args(self):
271 268 return self.cmd_and_args
272 269
273 270 def start(self):
274 271 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
275 272 if self.state == 'before':
276 273 self.process = Popen(self.args,
277 274 stdout=PIPE,stderr=PIPE,stdin=PIPE,
278 275 env=os.environ,
279 276 cwd=self.work_dir
280 277 )
281 278 if WINDOWS:
282 279 self.stdout = forward_read_events(self.process.stdout)
283 280 self.stderr = forward_read_events(self.process.stderr)
284 281 else:
285 282 self.stdout = self.process.stdout.fileno()
286 283 self.stderr = self.process.stderr.fileno()
287 284 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
288 285 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
289 286 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
290 287 self.poller.start()
291 288 self.notify_start(self.process.pid)
292 289 else:
293 290 s = 'The process was already started and has state: %r' % self.state
294 291 raise ProcessStateError(s)
295 292
296 293 def stop(self):
297 294 return self.interrupt_then_kill()
298 295
299 296 def signal(self, sig):
300 297 if self.state == 'running':
301 298 if WINDOWS and sig != SIGINT:
302 299 # use Windows tree-kill for better child cleanup
303 300 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
304 301 else:
305 302 self.process.send_signal(sig)
306 303
307 304 def interrupt_then_kill(self, delay=2.0):
308 305 """Send INT, wait a delay and then send KILL."""
309 306 try:
310 307 self.signal(SIGINT)
311 308 except Exception:
312 309 self.log.debug("interrupt failed")
313 310 pass
314 311 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
315 312 self.killer.start()
316 313
317 314 # callbacks, etc:
318 315
319 316 def handle_stdout(self, fd, events):
320 317 if WINDOWS:
321 318 line = self.stdout.recv()
322 319 else:
323 320 line = self.process.stdout.readline()
324 321 # a stopped process will be readable but return empty strings
325 322 if line:
326 323 self.log.debug(line[:-1])
327 324 else:
328 325 self.poll()
329 326
330 327 def handle_stderr(self, fd, events):
331 328 if WINDOWS:
332 329 line = self.stderr.recv()
333 330 else:
334 331 line = self.process.stderr.readline()
335 332 # a stopped process will be readable but return empty strings
336 333 if line:
337 334 self.log.debug(line[:-1])
338 335 else:
339 336 self.poll()
340 337
341 338 def poll(self):
342 339 status = self.process.poll()
343 340 if status is not None:
344 341 self.poller.stop()
345 342 self.loop.remove_handler(self.stdout)
346 343 self.loop.remove_handler(self.stderr)
347 344 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
348 345 return status
349 346
350 347 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
351 348 """Launch a controller as a regular external process."""
352 349
353 350 def find_args(self):
354 351 return self.controller_cmd + self.cluster_args + self.controller_args
355 352
356 353 def start(self):
357 354 """Start the controller by profile_dir."""
358 355 return super(LocalControllerLauncher, self).start()
359 356
360 357
361 358 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 359 """Launch a single engine as a regular externall process."""
363 360
364 361 def find_args(self):
365 362 return self.engine_cmd + self.cluster_args + self.engine_args
366 363
367 364
368 365 class LocalEngineSetLauncher(LocalEngineLauncher):
369 366 """Launch a set of engines as regular external processes."""
370 367
371 368 delay = CFloat(0.1, config=True,
372 369 help="""delay (in seconds) between starting each engine after the first.
373 370 This can help force the engines to get their ids in order, or limit
374 371 process flood when starting many engines."""
375 372 )
376 373
377 374 # launcher class
378 375 launcher_class = LocalEngineLauncher
379 376
380 377 launchers = Dict()
381 378 stop_data = Dict()
382 379
383 380 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 381 super(LocalEngineSetLauncher, self).__init__(
385 382 work_dir=work_dir, config=config, **kwargs
386 383 )
387 384 self.stop_data = {}
388 385
389 386 def start(self, n):
390 387 """Start n engines by profile or profile_dir."""
391 388 dlist = []
392 389 for i in range(n):
393 390 if i > 0:
394 391 time.sleep(self.delay)
395 392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 393 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 394 )
398 395
399 396 # Copy the engine args over to each engine launcher.
400 397 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 398 el.engine_args = copy.deepcopy(self.engine_args)
402 399 el.on_stop(self._notice_engine_stopped)
403 400 d = el.start()
404 401 self.launchers[i] = el
405 402 dlist.append(d)
406 403 self.notify_start(dlist)
407 404 return dlist
408 405
409 406 def find_args(self):
410 407 return ['engine set']
411 408
412 409 def signal(self, sig):
413 410 dlist = []
414 411 for el in self.launchers.itervalues():
415 412 d = el.signal(sig)
416 413 dlist.append(d)
417 414 return dlist
418 415
419 416 def interrupt_then_kill(self, delay=1.0):
420 417 dlist = []
421 418 for el in self.launchers.itervalues():
422 419 d = el.interrupt_then_kill(delay)
423 420 dlist.append(d)
424 421 return dlist
425 422
426 423 def stop(self):
427 424 return self.interrupt_then_kill()
428 425
429 426 def _notice_engine_stopped(self, data):
430 427 pid = data['pid']
431 428 for idx,el in self.launchers.iteritems():
432 429 if el.process.pid == pid:
433 430 break
434 431 self.launchers.pop(idx)
435 432 self.stop_data[idx] = data
436 433 if not self.launchers:
437 434 self.notify_stop(self.stop_data)
438 435
439 436
440 437 #-----------------------------------------------------------------------------
441 438 # MPI launchers
442 439 #-----------------------------------------------------------------------------
443 440
444 441
445 442 class MPILauncher(LocalProcessLauncher):
446 443 """Launch an external process using mpiexec."""
447 444
448 445 mpi_cmd = List(['mpiexec'], config=True,
449 446 help="The mpiexec command to use in starting the process."
450 447 )
451 448 mpi_args = List([], config=True,
452 449 help="The command line arguments to pass to mpiexec."
453 450 )
454 451 program = List(['date'],
455 452 help="The program to start via mpiexec.")
456 453 program_args = List([],
457 454 help="The command line argument to the program."
458 455 )
459 456 n = Integer(1)
460 457
461 458 def __init__(self, *args, **kwargs):
462 459 # deprecation for old MPIExec names:
463 460 config = kwargs.get('config', {})
464 461 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
465 462 deprecated = config.get(oldname)
466 463 if deprecated:
467 464 newname = oldname.replace('MPIExec', 'MPI')
468 465 config[newname].update(deprecated)
469 466 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
470 467
471 468 super(MPILauncher, self).__init__(*args, **kwargs)
472 469
473 470 def find_args(self):
474 471 """Build self.args using all the fields."""
475 472 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
476 473 self.program + self.program_args
477 474
478 475 def start(self, n):
479 476 """Start n instances of the program using mpiexec."""
480 477 self.n = n
481 478 return super(MPILauncher, self).start()
482 479
483 480
484 481 class MPIControllerLauncher(MPILauncher, ControllerMixin):
485 482 """Launch a controller using mpiexec."""
486 483
487 484 # alias back to *non-configurable* program[_args] for use in find_args()
488 485 # this way all Controller/EngineSetLaunchers have the same form, rather
489 486 # than *some* having `program_args` and others `controller_args`
490 487 @property
491 488 def program(self):
492 489 return self.controller_cmd
493 490
494 491 @property
495 492 def program_args(self):
496 493 return self.cluster_args + self.controller_args
497 494
498 495 def start(self):
499 496 """Start the controller by profile_dir."""
500 497 return super(MPIControllerLauncher, self).start(1)
501 498
502 499
503 500 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
504 501 """Launch engines using mpiexec"""
505 502
506 503 # alias back to *non-configurable* program[_args] for use in find_args()
507 504 # this way all Controller/EngineSetLaunchers have the same form, rather
508 505 # than *some* having `program_args` and others `controller_args`
509 506 @property
510 507 def program(self):
511 508 return self.engine_cmd
512 509
513 510 @property
514 511 def program_args(self):
515 512 return self.cluster_args + self.engine_args
516 513
517 514 def start(self, n):
518 515 """Start n engines by profile or profile_dir."""
519 516 self.n = n
520 517 return super(MPIEngineSetLauncher, self).start(n)
521 518
522 519 # deprecated MPIExec names
523 520 class DeprecatedMPILauncher(object):
524 521 def warn(self):
525 522 oldname = self.__class__.__name__
526 523 newname = oldname.replace('MPIExec', 'MPI')
527 524 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
528 525
529 526 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
530 527 """Deprecated, use MPILauncher"""
531 528 def __init__(self, *args, **kwargs):
532 529 super(MPIExecLauncher, self).__init__(*args, **kwargs)
533 530 self.warn()
534 531
535 532 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
536 533 """Deprecated, use MPIControllerLauncher"""
537 534 def __init__(self, *args, **kwargs):
538 535 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
539 536 self.warn()
540 537
541 538 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
542 539 """Deprecated, use MPIEngineSetLauncher"""
543 540 def __init__(self, *args, **kwargs):
544 541 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
545 542 self.warn()
546 543
547 544
548 545 #-----------------------------------------------------------------------------
549 546 # SSH launchers
550 547 #-----------------------------------------------------------------------------
551 548
552 549 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
553 550
554 551 class SSHLauncher(LocalProcessLauncher):
555 552 """A minimal launcher for ssh.
556 553
557 554 To be useful this will probably have to be extended to use the ``sshx``
558 555 idea for environment variables. There could be other things this needs
559 556 as well.
560 557 """
561 558
562 559 ssh_cmd = List(['ssh'], config=True,
563 560 help="command for starting ssh")
564 561 ssh_args = List(['-tt'], config=True,
565 562 help="args to pass to ssh")
563 scp_cmd = List(['scp'], config=True,
564 help="command for sending files")
566 565 program = List(['date'],
567 566 help="Program to launch via ssh")
568 567 program_args = List([],
569 568 help="args to pass to remote program")
570 569 hostname = Unicode('', config=True,
571 570 help="hostname on which to launch the program")
572 571 user = Unicode('', config=True,
573 572 help="username for ssh")
574 573 location = Unicode('', config=True,
575 574 help="user@hostname location for ssh in one setting")
575 to_fetch = List([], config=True,
576 help="List of (remote, local) files to fetch after starting")
577 to_send = List([], config=True,
578 help="List of (local, remote) files to send before starting")
576 579
577 580 def _hostname_changed(self, name, old, new):
578 581 if self.user:
579 582 self.location = u'%s@%s' % (self.user, new)
580 583 else:
581 584 self.location = new
582 585
583 586 def _user_changed(self, name, old, new):
584 587 self.location = u'%s@%s' % (new, self.hostname)
585 588
586 589 def find_args(self):
587 590 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 591 self.program + self.program_args
592
593 def _send_file(self, local, remote):
594 """send a single file"""
595 remote = "%s:%s" % (self.location, remote)
596 for i in range(10):
597 if not os.path.exists(local):
598 self.log.debug("waiting for %s" % local)
599 time.sleep(1)
600 else:
601 break
602 self.log.info("sending %s to %s", local, remote)
603 check_output(self.scp_cmd + [local, remote])
604
605 def send_files(self):
606 """send our files"""
607 if not self.send_files:
608 return
609 for local_file, remote_file in self.to_send:
610 self._send_file(local_file, remote_file)
611
612 def _fetch_file(self, remote, local):
613 """send a single file"""
614 full_remote = "%s:%s" % (self.location, remote)
615 self.log.info("fetching %s from %s", local, full_remote)
616 for i in range(10):
617 # wait up to 10s for remote file to exist
618 check = check_output(self.ssh_cmd + self.ssh_args + \
619 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 check = check.strip()
621 if check.strip() == 'no':
622 time.sleep(1)
623 elif check.strip() == 'yes':
624 break
625 check_output(self.scp_cmd + [full_remote, local])
626
627 def fetch_files(self):
628 """override in subclass"""
629 if not self.fetch_files:
630 return
631 for remote_file, local_file in self.to_fetch:
632 self._fetch_file(remote_file, local_file)
589 633
590 634 def start(self, hostname=None, user=None):
591 635 if hostname is not None:
592 636 self.hostname = hostname
593 637 if user is not None:
594 638 self.user = user
595
596 return super(SSHLauncher, self).start()
639
640 self.send_files()
641 super(SSHLauncher, self).start()
642 self.fetch_files()
597 643
598 644 def signal(self, sig):
599 645 if self.state == 'running':
600 646 # send escaped ssh connection-closer
601 647 self.process.stdin.write('~.')
602 648 self.process.stdin.flush()
603 649
650 class SSHClusterLauncher(SSHLauncher):
651
652 remote_profile_dir = Unicode('', config=True,
653 help="""The remote profile_dir to use.
654
655 If not specified, use calling profile, stripping out possible leading homedir.
656 """)
657
658 def _remote_profie_dir_default(self):
659 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
660 """
661 home = get_home_dir()
662 if not home.endswith('/'):
663 home = home+'/'
664
665 if self.profile_dir.startswith(home):
666 return self.profile_dir[len(home):]
667 else:
668 return self.profile_dir
669
670 def _cluster_id_changed(self, name, old, new):
671 if new:
672 raise ValueError("cluster id not supported by SSH launchers")
673
674 @property
675 def cluster_args(self):
676 return ['--profile-dir', self.remote_profile_dir]
604 677
605
606 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
607 679
608 680 # alias back to *non-configurable* program[_args] for use in find_args()
609 681 # this way all Controller/EngineSetLaunchers have the same form, rather
610 682 # than *some* having `program_args` and others `controller_args`
683
684 def _controller_cmd_default(self):
685 return ['ipcontroller']
686
611 687 @property
612 688 def program(self):
613 689 return self.controller_cmd
614
690
615 691 @property
616 692 def program_args(self):
617 693 return self.cluster_args + self.controller_args
618 694
695 def _to_fetch_default(self):
696 return [
697 (os.path.join(self.remote_profile_dir, 'security', cf),
698 os.path.join(self.profile_dir, 'security', cf),)
699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
700 ]
619 701
620 class SSHEngineLauncher(SSHLauncher, EngineMixin):
702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
621 703
622 704 # alias back to *non-configurable* program[_args] for use in find_args()
623 705 # this way all Controller/EngineSetLaunchers have the same form, rather
624 706 # than *some* having `program_args` and others `controller_args`
707
708 def _engine_cmd_default(self):
709 return ['ipengine']
710
625 711 @property
626 712 def program(self):
627 713 return self.engine_cmd
628 714
629 715 @property
630 716 def program_args(self):
631 717 return self.cluster_args + self.engine_args
718
719 def _to_send_default(self):
720 return [
721 (os.path.join(self.profile_dir, 'security', cf),
722 os.path.join(self.remote_profile_dir, 'security', cf))
723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
724 ]
632 725
633 726
634 727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
635 728 launcher_class = SSHEngineLauncher
636 729 engines = Dict(config=True,
637 730 help="""dict of engines to launch. This is a dict by hostname of ints,
638 731 corresponding to the number of engines to start on that host.""")
639 732
640 733 @property
641 734 def engine_count(self):
642 735 """determine engine count from `engines` dict"""
643 736 count = 0
644 737 for n in self.engines.itervalues():
645 738 if isinstance(n, (tuple,list)):
646 739 n,args = n
647 740 count += n
648 741 return count
649 742
650 743 def start(self, n):
651 744 """Start engines by profile or profile_dir.
652 745 `n` is ignored, and the `engines` config property is used instead.
653 746 """
654 747
655 748 dlist = []
656 749 for host, n in self.engines.iteritems():
657 750 if isinstance(n, (tuple, list)):
658 751 n, args = n
659 752 else:
660 753 args = copy.deepcopy(self.engine_args)
661 754
662 755 if '@' in host:
663 756 user,host = host.split('@',1)
664 757 else:
665 758 user=None
666 759 for i in range(n):
667 760 if i > 0:
668 761 time.sleep(self.delay)
669 762 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
670 763 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
671 764 )
765 if i > 0:
766 # only send files for the first engine on each host
767 el.to_send = []
672 768
673 769 # Copy the engine args over to each engine launcher.
674 770 el.engine_cmd = self.engine_cmd
675 771 el.engine_args = args
676 772 el.on_stop(self._notice_engine_stopped)
677 773 d = el.start(user=user, hostname=host)
678 774 self.launchers[ "%s/%i" % (host,i) ] = el
679 775 dlist.append(d)
680 776 self.notify_start(dlist)
681 777 return dlist
682 778
683 779
780 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
781 """Launcher for calling
782 `ipcluster engines` on a remote machine.
783
784 Requires that remote profile is already configured.
785 """
786
787 n = Integer()
788 ipcluster_cmd = List(['ipcluster'], config=True)
789
790 @property
791 def program(self):
792 return self.ipcluster_cmd + ['engines']
793
794 @property
795 def program_args(self):
796 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
797
798 def _to_send_default(self):
799 return [
800 (os.path.join(self.profile_dir, 'security', cf),
801 os.path.join(self.remote_profile_dir, 'security', cf))
802 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
803 ]
804
805 def start(self, n):
806 self.n = n
807 super(SSHProxyEngineSetLauncher, self).start()
808
684 809
685 810 #-----------------------------------------------------------------------------
686 811 # Windows HPC Server 2008 scheduler launchers
687 812 #-----------------------------------------------------------------------------
688 813
689 814
690 815 # This is only used on Windows.
691 816 def find_job_cmd():
692 817 if WINDOWS:
693 818 try:
694 819 return find_cmd('job')
695 820 except (FindCmdError, ImportError):
696 821 # ImportError will be raised if win32api is not installed
697 822 return 'job'
698 823 else:
699 824 return 'job'
700 825
701 826
702 827 class WindowsHPCLauncher(BaseLauncher):
703 828
704 829 job_id_regexp = Unicode(r'\d+', config=True,
705 830 help="""A regular expression used to get the job id from the output of the
706 831 submit_command. """
707 832 )
708 833 job_file_name = Unicode(u'ipython_job.xml', config=True,
709 834 help="The filename of the instantiated job script.")
710 835 # The full path to the instantiated job script. This gets made dynamically
711 836 # by combining the work_dir with the job_file_name.
712 837 job_file = Unicode(u'')
713 838 scheduler = Unicode('', config=True,
714 839 help="The hostname of the scheduler to submit the job to.")
715 840 job_cmd = Unicode(find_job_cmd(), config=True,
716 841 help="The command for submitting jobs.")
717 842
718 843 def __init__(self, work_dir=u'.', config=None, **kwargs):
719 844 super(WindowsHPCLauncher, self).__init__(
720 845 work_dir=work_dir, config=config, **kwargs
721 846 )
722 847
723 848 @property
724 849 def job_file(self):
725 850 return os.path.join(self.work_dir, self.job_file_name)
726 851
727 852 def write_job_file(self, n):
728 853 raise NotImplementedError("Implement write_job_file in a subclass.")
729 854
730 855 def find_args(self):
731 856 return [u'job.exe']
732 857
733 858 def parse_job_id(self, output):
734 859 """Take the output of the submit command and return the job id."""
735 860 m = re.search(self.job_id_regexp, output)
736 861 if m is not None:
737 862 job_id = m.group()
738 863 else:
739 864 raise LauncherError("Job id couldn't be determined: %s" % output)
740 865 self.job_id = job_id
741 866 self.log.info('Job started with id: %r', job_id)
742 867 return job_id
743 868
744 869 def start(self, n):
745 870 """Start n copies of the process using the Win HPC job scheduler."""
746 871 self.write_job_file(n)
747 872 args = [
748 873 'submit',
749 874 '/jobfile:%s' % self.job_file,
750 875 '/scheduler:%s' % self.scheduler
751 876 ]
752 877 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
753 878
754 879 output = check_output([self.job_cmd]+args,
755 880 env=os.environ,
756 881 cwd=self.work_dir,
757 882 stderr=STDOUT
758 883 )
759 884 job_id = self.parse_job_id(output)
760 885 self.notify_start(job_id)
761 886 return job_id
762 887
763 888 def stop(self):
764 889 args = [
765 890 'cancel',
766 891 self.job_id,
767 892 '/scheduler:%s' % self.scheduler
768 893 ]
769 894 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
770 895 try:
771 896 output = check_output([self.job_cmd]+args,
772 897 env=os.environ,
773 898 cwd=self.work_dir,
774 899 stderr=STDOUT
775 900 )
776 901 except:
777 902 output = 'The job already appears to be stoppped: %r' % self.job_id
778 903 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
779 904 return output
780 905
781 906
782 907 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
783 908
784 909 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
785 910 help="WinHPC xml job file.")
786 911 controller_args = List([], config=False,
787 912 help="extra args to pass to ipcontroller")
788 913
789 914 def write_job_file(self, n):
790 915 job = IPControllerJob(config=self.config)
791 916
792 917 t = IPControllerTask(config=self.config)
793 918 # The tasks work directory is *not* the actual work directory of
794 919 # the controller. It is used as the base path for the stdout/stderr
795 920 # files that the scheduler redirects to.
796 921 t.work_directory = self.profile_dir
797 922 # Add the profile_dir and from self.start().
798 923 t.controller_args.extend(self.cluster_args)
799 924 t.controller_args.extend(self.controller_args)
800 925 job.add_task(t)
801 926
802 927 self.log.debug("Writing job description file: %s", self.job_file)
803 928 job.write(self.job_file)
804 929
805 930 @property
806 931 def job_file(self):
807 932 return os.path.join(self.profile_dir, self.job_file_name)
808 933
809 934 def start(self):
810 935 """Start the controller by profile_dir."""
811 936 return super(WindowsHPCControllerLauncher, self).start(1)
812 937
813 938
814 939 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
815 940
816 941 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
817 942 help="jobfile for ipengines job")
818 943 engine_args = List([], config=False,
819 944 help="extra args to pas to ipengine")
820 945
821 946 def write_job_file(self, n):
822 947 job = IPEngineSetJob(config=self.config)
823 948
824 949 for i in range(n):
825 950 t = IPEngineTask(config=self.config)
826 951 # The tasks work directory is *not* the actual work directory of
827 952 # the engine. It is used as the base path for the stdout/stderr
828 953 # files that the scheduler redirects to.
829 954 t.work_directory = self.profile_dir
830 955 # Add the profile_dir and from self.start().
831 956 t.engine_args.extend(self.cluster_args)
832 957 t.engine_args.extend(self.engine_args)
833 958 job.add_task(t)
834 959
835 960 self.log.debug("Writing job description file: %s", self.job_file)
836 961 job.write(self.job_file)
837 962
838 963 @property
839 964 def job_file(self):
840 965 return os.path.join(self.profile_dir, self.job_file_name)
841 966
842 967 def start(self, n):
843 968 """Start the controller by profile_dir."""
844 969 return super(WindowsHPCEngineSetLauncher, self).start(n)
845 970
846 971
847 972 #-----------------------------------------------------------------------------
848 973 # Batch (PBS) system launchers
849 974 #-----------------------------------------------------------------------------
850 975
851 976 class BatchClusterAppMixin(ClusterAppMixin):
852 977 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
853 978 def _profile_dir_changed(self, name, old, new):
854 979 self.context[name] = new
855 980 _cluster_id_changed = _profile_dir_changed
856 981
857 982 def _profile_dir_default(self):
858 983 self.context['profile_dir'] = ''
859 984 return ''
860 985 def _cluster_id_default(self):
861 986 self.context['cluster_id'] = ''
862 987 return ''
863 988
864 989
865 990 class BatchSystemLauncher(BaseLauncher):
866 991 """Launch an external process using a batch system.
867 992
868 993 This class is designed to work with UNIX batch systems like PBS, LSF,
869 994 GridEngine, etc. The overall model is that there are different commands
870 995 like qsub, qdel, etc. that handle the starting and stopping of the process.
871 996
872 997 This class also has the notion of a batch script. The ``batch_template``
873 998 attribute can be set to a string that is a template for the batch script.
874 999 This template is instantiated using string formatting. Thus the template can
875 1000 use {n} fot the number of instances. Subclasses can add additional variables
876 1001 to the template dict.
877 1002 """
878 1003
879 1004 # Subclasses must fill these in. See PBSEngineSet
880 1005 submit_command = List([''], config=True,
881 1006 help="The name of the command line program used to submit jobs.")
882 1007 delete_command = List([''], config=True,
883 1008 help="The name of the command line program used to delete jobs.")
884 1009 job_id_regexp = Unicode('', config=True,
885 1010 help="""A regular expression used to get the job id from the output of the
886 1011 submit_command.""")
887 1012 batch_template = Unicode('', config=True,
888 1013 help="The string that is the batch script template itself.")
889 1014 batch_template_file = Unicode(u'', config=True,
890 1015 help="The file that contains the batch template.")
891 1016 batch_file_name = Unicode(u'batch_script', config=True,
892 1017 help="The filename of the instantiated batch script.")
893 1018 queue = Unicode(u'', config=True,
894 1019 help="The PBS Queue.")
895 1020
896 1021 def _queue_changed(self, name, old, new):
897 1022 self.context[name] = new
898 1023
899 1024 n = Integer(1)
900 1025 _n_changed = _queue_changed
901 1026
902 1027 # not configurable, override in subclasses
903 1028 # PBS Job Array regex
904 1029 job_array_regexp = Unicode('')
905 1030 job_array_template = Unicode('')
906 1031 # PBS Queue regex
907 1032 queue_regexp = Unicode('')
908 1033 queue_template = Unicode('')
909 1034 # The default batch template, override in subclasses
910 1035 default_template = Unicode('')
911 1036 # The full path to the instantiated batch script.
912 1037 batch_file = Unicode(u'')
913 1038 # the format dict used with batch_template:
914 1039 context = Dict()
915 1040 def _context_default(self):
916 1041 """load the default context with the default values for the basic keys
917 1042
918 1043 because the _trait_changed methods only load the context if they
919 1044 are set to something other than the default value.
920 1045 """
921 1046 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
922 1047
923 1048 # the Formatter instance for rendering the templates:
924 1049 formatter = Instance(EvalFormatter, (), {})
925 1050
926 1051
927 1052 def find_args(self):
928 1053 return self.submit_command + [self.batch_file]
929 1054
930 1055 def __init__(self, work_dir=u'.', config=None, **kwargs):
931 1056 super(BatchSystemLauncher, self).__init__(
932 1057 work_dir=work_dir, config=config, **kwargs
933 1058 )
934 1059 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
935 1060
936 1061 def parse_job_id(self, output):
937 1062 """Take the output of the submit command and return the job id."""
938 1063 m = re.search(self.job_id_regexp, output)
939 1064 if m is not None:
940 1065 job_id = m.group()
941 1066 else:
942 1067 raise LauncherError("Job id couldn't be determined: %s" % output)
943 1068 self.job_id = job_id
944 1069 self.log.info('Job submitted with job id: %r', job_id)
945 1070 return job_id
946 1071
947 1072 def write_batch_script(self, n):
948 1073 """Instantiate and write the batch script to the work_dir."""
949 1074 self.n = n
950 1075 # first priority is batch_template if set
951 1076 if self.batch_template_file and not self.batch_template:
952 1077 # second priority is batch_template_file
953 1078 with open(self.batch_template_file) as f:
954 1079 self.batch_template = f.read()
955 1080 if not self.batch_template:
956 1081 # third (last) priority is default_template
957 1082 self.batch_template = self.default_template
958 1083
959 1084 # add jobarray or queue lines to user-specified template
960 1085 # note that this is *only* when user did not specify a template.
961 1086 regex = re.compile(self.job_array_regexp)
962 1087 # print regex.search(self.batch_template)
963 1088 if not regex.search(self.batch_template):
964 1089 self.log.debug("adding job array settings to batch script")
965 1090 firstline, rest = self.batch_template.split('\n',1)
966 1091 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
967 1092
968 1093 regex = re.compile(self.queue_regexp)
969 1094 # print regex.search(self.batch_template)
970 1095 if self.queue and not regex.search(self.batch_template):
971 1096 self.log.debug("adding PBS queue settings to batch script")
972 1097 firstline, rest = self.batch_template.split('\n',1)
973 1098 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
974 1099
975 1100 script_as_string = self.formatter.format(self.batch_template, **self.context)
976 1101 self.log.debug('Writing batch script: %s', self.batch_file)
977 1102
978 1103 with open(self.batch_file, 'w') as f:
979 1104 f.write(script_as_string)
980 1105 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
981 1106
982 1107 def start(self, n):
983 1108 """Start n copies of the process using a batch system."""
984 1109 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
985 1110 # Here we save profile_dir in the context so they
986 1111 # can be used in the batch script template as {profile_dir}
987 1112 self.write_batch_script(n)
988 1113 output = check_output(self.args, env=os.environ)
989 1114
990 1115 job_id = self.parse_job_id(output)
991 1116 self.notify_start(job_id)
992 1117 return job_id
993 1118
994 1119 def stop(self):
995 1120 output = check_output(self.delete_command+[self.job_id], env=os.environ)
996 1121 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
997 1122 return output
998 1123
999 1124
1000 1125 class PBSLauncher(BatchSystemLauncher):
1001 1126 """A BatchSystemLauncher subclass for PBS."""
1002 1127
1003 1128 submit_command = List(['qsub'], config=True,
1004 1129 help="The PBS submit command ['qsub']")
1005 1130 delete_command = List(['qdel'], config=True,
1006 1131 help="The PBS delete command ['qsub']")
1007 1132 job_id_regexp = Unicode(r'\d+', config=True,
1008 1133 help="Regular expresion for identifying the job ID [r'\d+']")
1009 1134
1010 1135 batch_file = Unicode(u'')
1011 1136 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1012 1137 job_array_template = Unicode('#PBS -t 1-{n}')
1013 1138 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1014 1139 queue_template = Unicode('#PBS -q {queue}')
1015 1140
1016 1141
1017 1142 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1018 1143 """Launch a controller using PBS."""
1019 1144
1020 1145 batch_file_name = Unicode(u'pbs_controller', config=True,
1021 1146 help="batch file name for the controller job.")
1022 1147 default_template= Unicode("""#!/bin/sh
1023 1148 #PBS -V
1024 1149 #PBS -N ipcontroller
1025 1150 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1026 1151 """%(' '.join(ipcontroller_cmd_argv)))
1027 1152
1028 1153
1029 1154 def start(self):
1030 1155 """Start the controller by profile or profile_dir."""
1031 1156 return super(PBSControllerLauncher, self).start(1)
1032 1157
1033 1158
1034 1159 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1035 1160 """Launch Engines using PBS"""
1036 1161 batch_file_name = Unicode(u'pbs_engines', config=True,
1037 1162 help="batch file name for the engine(s) job.")
1038 1163 default_template= Unicode(u"""#!/bin/sh
1039 1164 #PBS -V
1040 1165 #PBS -N ipengine
1041 1166 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1042 1167 """%(' '.join(ipengine_cmd_argv)))
1043 1168
1044 1169 def start(self, n):
1045 1170 """Start n engines by profile or profile_dir."""
1046 1171 return super(PBSEngineSetLauncher, self).start(n)
1047 1172
1048 1173 #SGE is very similar to PBS
1049 1174
1050 1175 class SGELauncher(PBSLauncher):
1051 1176 """Sun GridEngine is a PBS clone with slightly different syntax"""
1052 1177 job_array_regexp = Unicode('#\$\W+\-t')
1053 1178 job_array_template = Unicode('#$ -t 1-{n}')
1054 1179 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1055 1180 queue_template = Unicode('#$ -q {queue}')
1056 1181
1057 1182 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1058 1183 """Launch a controller using SGE."""
1059 1184
1060 1185 batch_file_name = Unicode(u'sge_controller', config=True,
1061 1186 help="batch file name for the ipontroller job.")
1062 1187 default_template= Unicode(u"""#$ -V
1063 1188 #$ -S /bin/sh
1064 1189 #$ -N ipcontroller
1065 1190 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1066 1191 """%(' '.join(ipcontroller_cmd_argv)))
1067 1192
1068 1193 def start(self):
1069 1194 """Start the controller by profile or profile_dir."""
1070 1195 return super(SGEControllerLauncher, self).start(1)
1071 1196
1072 1197 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1073 1198 """Launch Engines with SGE"""
1074 1199 batch_file_name = Unicode(u'sge_engines', config=True,
1075 1200 help="batch file name for the engine(s) job.")
1076 1201 default_template = Unicode("""#$ -V
1077 1202 #$ -S /bin/sh
1078 1203 #$ -N ipengine
1079 1204 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1080 1205 """%(' '.join(ipengine_cmd_argv)))
1081 1206
1082 1207 def start(self, n):
1083 1208 """Start n engines by profile or profile_dir."""
1084 1209 return super(SGEEngineSetLauncher, self).start(n)
1085 1210
1086 1211
1087 1212 # LSF launchers
1088 1213
1089 1214 class LSFLauncher(BatchSystemLauncher):
1090 1215 """A BatchSystemLauncher subclass for LSF."""
1091 1216
1092 1217 submit_command = List(['bsub'], config=True,
1093 1218 help="The PBS submit command ['bsub']")
1094 1219 delete_command = List(['bkill'], config=True,
1095 1220 help="The PBS delete command ['bkill']")
1096 1221 job_id_regexp = Unicode(r'\d+', config=True,
1097 1222 help="Regular expresion for identifying the job ID [r'\d+']")
1098 1223
1099 1224 batch_file = Unicode(u'')
1100 1225 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1101 1226 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1102 1227 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1103 1228 queue_template = Unicode('#BSUB -q {queue}')
1104 1229
1105 1230 def start(self, n):
1106 1231 """Start n copies of the process using LSF batch system.
1107 1232 This cant inherit from the base class because bsub expects
1108 1233 to be piped a shell script in order to honor the #BSUB directives :
1109 1234 bsub < script
1110 1235 """
1111 1236 # Here we save profile_dir in the context so they
1112 1237 # can be used in the batch script template as {profile_dir}
1113 1238 self.write_batch_script(n)
1114 1239 #output = check_output(self.args, env=os.environ)
1115 1240 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1116 1241 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1117 1242 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1118 1243 output,err = p.communicate()
1119 1244 job_id = self.parse_job_id(output)
1120 1245 self.notify_start(job_id)
1121 1246 return job_id
1122 1247
1123 1248
1124 1249 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1125 1250 """Launch a controller using LSF."""
1126 1251
1127 1252 batch_file_name = Unicode(u'lsf_controller', config=True,
1128 1253 help="batch file name for the controller job.")
1129 1254 default_template= Unicode("""#!/bin/sh
1130 1255 #BSUB -J ipcontroller
1131 1256 #BSUB -oo ipcontroller.o.%%J
1132 1257 #BSUB -eo ipcontroller.e.%%J
1133 1258 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1134 1259 """%(' '.join(ipcontroller_cmd_argv)))
1135 1260
1136 1261 def start(self):
1137 1262 """Start the controller by profile or profile_dir."""
1138 1263 return super(LSFControllerLauncher, self).start(1)
1139 1264
1140 1265
1141 1266 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1142 1267 """Launch Engines using LSF"""
1143 1268 batch_file_name = Unicode(u'lsf_engines', config=True,
1144 1269 help="batch file name for the engine(s) job.")
1145 1270 default_template= Unicode(u"""#!/bin/sh
1146 1271 #BSUB -oo ipengine.o.%%J
1147 1272 #BSUB -eo ipengine.e.%%J
1148 1273 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1149 1274 """%(' '.join(ipengine_cmd_argv)))
1150 1275
1151 1276 def start(self, n):
1152 1277 """Start n engines by profile or profile_dir."""
1153 1278 return super(LSFEngineSetLauncher, self).start(n)
1154 1279
1155 1280
1156 1281 #-----------------------------------------------------------------------------
1157 1282 # A launcher for ipcluster itself!
1158 1283 #-----------------------------------------------------------------------------
1159 1284
1160 1285
1161 1286 class IPClusterLauncher(LocalProcessLauncher):
1162 1287 """Launch the ipcluster program in an external process."""
1163 1288
1164 1289 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1165 1290 help="Popen command for ipcluster")
1166 1291 ipcluster_args = List(
1167 1292 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1168 1293 help="Command line arguments to pass to ipcluster.")
1169 1294 ipcluster_subcommand = Unicode('start')
1170 1295 profile = Unicode('default')
1171 1296 n = Integer(2)
1172 1297
1173 1298 def find_args(self):
1174 1299 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1175 1300 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1176 1301 self.ipcluster_args
1177 1302
1178 1303 def start(self):
1179 1304 return super(IPClusterLauncher, self).start()
1180 1305
1181 1306 #-----------------------------------------------------------------------------
1182 1307 # Collections of launchers
1183 1308 #-----------------------------------------------------------------------------
1184 1309
1185 1310 local_launchers = [
1186 1311 LocalControllerLauncher,
1187 1312 LocalEngineLauncher,
1188 1313 LocalEngineSetLauncher,
1189 1314 ]
1190 1315 mpi_launchers = [
1191 1316 MPILauncher,
1192 1317 MPIControllerLauncher,
1193 1318 MPIEngineSetLauncher,
1194 1319 ]
1195 1320 ssh_launchers = [
1196 1321 SSHLauncher,
1197 1322 SSHControllerLauncher,
1198 1323 SSHEngineLauncher,
1199 1324 SSHEngineSetLauncher,
1200 1325 ]
1201 1326 winhpc_launchers = [
1202 1327 WindowsHPCLauncher,
1203 1328 WindowsHPCControllerLauncher,
1204 1329 WindowsHPCEngineSetLauncher,
1205 1330 ]
1206 1331 pbs_launchers = [
1207 1332 PBSLauncher,
1208 1333 PBSControllerLauncher,
1209 1334 PBSEngineSetLauncher,
1210 1335 ]
1211 1336 sge_launchers = [
1212 1337 SGELauncher,
1213 1338 SGEControllerLauncher,
1214 1339 SGEEngineSetLauncher,
1215 1340 ]
1216 1341 lsf_launchers = [
1217 1342 LSFLauncher,
1218 1343 LSFControllerLauncher,
1219 1344 LSFEngineSetLauncher,
1220 1345 ]
1221 1346 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1222 1347 + pbs_launchers + sge_launchers + lsf_launchers
1223 1348
General Comments 0
You need to be logged in to leave comments. Login now