Show More
@@ -400,7 +400,6 b' class EngineService(object, service.Service):' | |||
|
400 | 400 | |
|
401 | 401 | # The IEngine methods. See the interface for documentation. |
|
402 | 402 | |
|
403 | @profile | |
|
404 | 403 | def execute(self, lines): |
|
405 | 404 | msg = {'engineid':self.id, |
|
406 | 405 | 'method':'execute', |
@@ -552,7 +552,6 b' class SynchronousMultiEngine(PendingDeferredManager):' | |||
|
552 | 552 | # Decorated pending deferred methods |
|
553 | 553 | #--------------------------------------------------------------------------- |
|
554 | 554 | |
|
555 | @profile | |
|
556 | 555 | @two_phase |
|
557 | 556 | def execute(self, lines, targets='all'): |
|
558 | 557 | d = self.multiengine.execute(lines, targets) |
@@ -320,83 +320,140 b' class PBSEngineSet(BatchEngineSet):' | |||
|
320 | 320 | def __init__(self, template_file, **kwargs): |
|
321 | 321 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
322 | 322 | |
|
323 | class SSHEngineSet(object): | |
|
324 |
|
|
|
323 | ||
|
324 | sshx_template="""#!/bin/sh | |
|
325 | 325 | "$@" &> /dev/null & |
|
326 |
echo $! |
|
|
327 | ||
|
328 | engine_killer_template="""#!/bin/sh | |
|
326 | echo $! | |
|
327 | """ | |
|
328 | ||
|
329 | engine_killer_template="""#!/bin/sh | |
|
330 | ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM | |
|
331 | """ | |
|
329 | 332 | |
|
330 | ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM""" | |
|
333 | class SSHEngineSet(object): | |
|
334 | sshx_template=sshx_template | |
|
335 | engine_killer_template=engine_killer_template | |
|
331 | 336 | |
|
332 | 337 | def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): |
|
338 | """Start a controller on localhost and engines using ssh. | |
|
339 | ||
|
340 | The engine_hosts argument is a dict with hostnames as keys and | |
|
341 | the number of engine (int) as values. sshx is the name of a local | |
|
342 | file that will be used to run remote commands. This file is used | |
|
343 | to setup the environment properly. | |
|
344 | """ | |
|
345 | ||
|
333 | 346 | self.temp_dir = tempfile.gettempdir() |
|
334 |
if sshx |
|
|
347 | if sshx is not None: | |
|
335 | 348 | self.sshx = sshx |
|
336 | 349 | else: |
|
337 | self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER']) | |
|
350 | # Write the sshx.sh file locally from our template. | |
|
351 | self.sshx = os.path.join( | |
|
352 | self.temp_dir, | |
|
353 | '%s-main-sshx.sh' % os.environ['USER'] | |
|
354 | ) | |
|
338 | 355 | f = open(self.sshx, 'w') |
|
339 | 356 | f.writelines(self.sshx_template) |
|
340 | 357 | f.close() |
|
341 | 358 | self.engine_command = ipengine |
|
342 | 359 | self.engine_hosts = engine_hosts |
|
343 | self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER']) | |
|
360 | # Write the engine killer script file locally from our template. | |
|
361 | self.engine_killer = os.path.join( | |
|
362 | self.temp_dir, | |
|
363 | '%s-local-engine_killer.sh' % os.environ['USER'] | |
|
364 | ) | |
|
344 | 365 | f = open(self.engine_killer, 'w') |
|
345 | 366 | f.writelines(self.engine_killer_template) |
|
346 | 367 | f.close() |
|
347 | 368 | |
|
348 | 369 | def start(self, send_furl=False): |
|
370 | dlist = [] | |
|
349 | 371 | for host in self.engine_hosts.keys(): |
|
350 | 372 | count = self.engine_hosts[host] |
|
351 | self._start(host, count, send_furl) | |
|
352 | ||
|
353 | def killall(self): | |
|
354 | for host in self.engine_hosts.keys(): | |
|
355 | self._killall(host) | |
|
373 | d = self._start(host, count, send_furl) | |
|
374 | dlist.append(d) | |
|
375 | return gatherBoth(dlist, consumeErrors=True) | |
|
356 | 376 | |
|
357 |
def _start(self, host |
|
|
358 | ||
|
359 | def _scp_sshx(d): | |
|
360 | scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER']) | |
|
361 | sshx_scp = scp_cmd.split() | |
|
362 | print sshx_scp | |
|
363 | d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | |
|
364 | d.addCallback(_exec_engine) | |
|
365 | ||
|
366 | def _exec_engine(d): | |
|
367 | exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command) | |
|
368 | cmds = exec_engine.split() | |
|
369 | print cmds | |
|
370 | for i in range(count): | |
|
371 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
372 | ||
|
377 | def _start(self, hostname, count=1, send_furl=False): | |
|
373 | 378 | if send_furl: |
|
374 | scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name) | |
|
375 | cmd_list = scp_cmd.split() | |
|
376 | cmd_list[1] = os.path.expanduser(cmd_list[1]) | |
|
377 | print cmd_list | |
|
378 | d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | |
|
379 | d.addCallback(_scp_sshx) | |
|
379 | d = self._scp_furl(hostname) | |
|
380 | 380 | else: |
|
381 |
|
|
|
382 | ||
|
383 | def _killall(self, host_name): | |
|
384 | def _exec_err(d): | |
|
385 | if d.getErrorMessage()[-18:] != "No such process\\n\'": | |
|
386 | raise d | |
|
381 | d = defer.succeed(None) | |
|
382 | d.addCallback(lambda r: self._scp_sshx(hostname)) | |
|
383 | d.addCallback(lambda r: self._ssh_engine(hostname, count)) | |
|
384 | return d | |
|
387 | 385 | |
|
388 | def _exec_kill(d): | |
|
389 | kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER']) | |
|
390 |
|
|
|
391 | print kill_cmd | |
|
392 | d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | |
|
393 | d.addErrback(_exec_err) | |
|
394 | scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER']) | |
|
386 | def _scp_furl(self, hostname): | |
|
387 | scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) | |
|
388 | cmd_list = scp_cmd.split() | |
|
389 | cmd_list[1] = os.path.expanduser(cmd_list[1]) | |
|
390 | log.msg('Copying furl file: %s' % scp_cmd) | |
|
391 | d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | |
|
392 | return d | |
|
393 | ||
|
394 | def _scp_sshx(self, hostname): | |
|
395 | scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( | |
|
396 | self.sshx, hostname, | |
|
397 | self.temp_dir, os.environ['USER'] | |
|
398 | ) | |
|
399 | ||
|
400 | log.msg("Copying sshx: %s" % scp_cmd) | |
|
401 | sshx_scp = scp_cmd.split() | |
|
402 | d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | |
|
403 | return d | |
|
404 | ||
|
405 | def _ssh_engine(self, hostname, count): | |
|
406 | exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( | |
|
407 | hostname, self.temp_dir, | |
|
408 | os.environ['USER'], self.engine_command | |
|
409 | ) | |
|
410 | cmds = exec_engine.split() | |
|
411 | dlist = [] | |
|
412 | log.msg("about to start engines...") | |
|
413 | for i in range(count): | |
|
414 | log.msg('Starting engines: %s' % exec_engine) | |
|
415 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
416 | dlist.append(d) | |
|
417 | return gatherBoth(dlist, consumeErrors=True) | |
|
418 | ||
|
419 | def kill(self): | |
|
420 | dlist = [] | |
|
421 | for host in self.engine_hosts.keys(): | |
|
422 | d = self._killall(host) | |
|
423 | dlist.append(d) | |
|
424 | return gatherBoth(dlist, consumeErrors=True) | |
|
425 | ||
|
426 | def _killall(self, hostname): | |
|
427 | d = self._scp_engine_killer(hostname) | |
|
428 | d.addCallback(lambda r: self._ssh_kill(hostname)) | |
|
429 | # d.addErrback(self._exec_err) | |
|
430 | return d | |
|
431 | ||
|
432 | def _scp_engine_killer(self, hostname): | |
|
433 | scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( | |
|
434 | self.engine_killer, | |
|
435 | hostname, | |
|
436 | self.temp_dir, | |
|
437 | os.environ['USER'] | |
|
438 | ) | |
|
395 | 439 | cmds = scp_cmd.split() |
|
440 | log.msg('Copying engine_killer: %s' % scp_cmd) | |
|
396 | 441 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) |
|
397 | d.addCallback(_exec_kill) | |
|
398 | d.addErrback(_exec_err) | |
|
442 | return d | |
|
443 | ||
|
444 | def _ssh_kill(self, hostname): | |
|
445 | kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( | |
|
446 | hostname, | |
|
447 | self.temp_dir, | |
|
448 | os.environ['USER'] | |
|
449 | ) | |
|
450 | log.msg('Killing engine: %s' % kill_cmd) | |
|
451 | kill_cmd = kill_cmd.split() | |
|
452 | d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | |
|
453 | return d | |
|
399 | 454 | |
|
455 | def _exec_err(self, r): | |
|
456 | log.msg(r) | |
|
400 | 457 | |
|
401 | 458 | #----------------------------------------------------------------------------- |
|
402 | 459 | # Main functions for the different types of clusters |
@@ -518,11 +575,17 b' def main_pbs(args):' | |||
|
518 | 575 | dstart.addErrback(lambda f: f.raiseException()) |
|
519 | 576 | |
|
520 | 577 | |
|
521 | # currently the ssh launcher only launches the controller on localhost. | |
|
522 | 578 | def main_ssh(args): |
|
523 | # the clusterfile should look like: | |
|
524 | # send_furl = False # True, if you want | |
|
525 | # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2} | |
|
579 | """Start a controller on localhost and engines using ssh. | |
|
580 | ||
|
581 | Your clusterfile should look like:: | |
|
582 | ||
|
583 | send_furl = False # True, if you want | |
|
584 | engines = { | |
|
585 | 'engine_host1' : engine_count, | |
|
586 | 'engine_host2' : engine_count2 | |
|
587 | } | |
|
588 | """ | |
|
526 | 589 | clusterfile = {} |
|
527 | 590 | execfile(args.clusterfile, clusterfile) |
|
528 | 591 | if not clusterfile.has_key('send_furl'): |
@@ -530,21 +593,24 b' def main_ssh(args):' | |||
|
530 | 593 | |
|
531 | 594 | cont_args = [] |
|
532 | 595 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
533 | if args.x: | |
|
534 | cont_args.append('-x') | |
|
535 | if args.y: | |
|
536 | cont_args.append('-y') | |
|
596 | ||
|
597 | # Check security settings before proceeding | |
|
598 | if not check_security(args, cont_args): | |
|
599 | return | |
|
600 | ||
|
537 | 601 | cl = ControllerLauncher(extra_args=cont_args) |
|
538 | 602 | dstart = cl.start() |
|
539 | 603 | def start_engines(cont_pid): |
|
540 |
|
|
|
541 | est.start(clusterfile['send_furl']) | |
|
604 | ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) | |
|
542 | 605 | def shutdown(signum, frame): |
|
543 |
|
|
|
544 | cl.interrupt_then_kill(0.5) | |
|
606 | d = ssh_set.kill() | |
|
607 | # d.addErrback(log.err) | |
|
608 | cl.interrupt_then_kill(1.0) | |
|
545 | 609 | reactor.callLater(2.0, reactor.stop) |
|
546 | 610 | signal.signal(signal.SIGINT,shutdown) |
|
547 | ||
|
611 | d = ssh_set.start(clusterfile['send_furl']) | |
|
612 | return d | |
|
613 | ||
|
548 | 614 | def delay_start(cont_pid): |
|
549 | 615 | reactor.callLater(1.0, start_engines, cont_pid) |
|
550 | 616 | |
@@ -640,8 +706,7 b' def get_args():' | |||
|
640 | 706 | '--sshx', |
|
641 | 707 | type=str, |
|
642 | 708 | dest='sshx', |
|
643 |
help='sshx launcher helper' |
|
|
644 | default='sshx.sh', | |
|
709 | help='sshx launcher helper' | |
|
645 | 710 | ) |
|
646 | 711 | parser_ssh.set_defaults(func=main_ssh) |
|
647 | 712 |
@@ -27,6 +27,9 b' Release dev' | |||
|
27 | 27 | New features |
|
28 | 28 | ------------ |
|
29 | 29 | |
|
30 | * The new ipcluster now has a fully working ssh mode that should work on | |
|
31 | Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this! | |
|
32 | ||
|
30 | 33 | * The wonderful TextMate editor can now be used with %edit on OS X. Thanks |
|
31 | 34 | to Matt Foster for this patch. |
|
32 | 35 | |
@@ -59,6 +62,8 b' New features' | |||
|
59 | 62 | Bug fixes |
|
60 | 63 | --------- |
|
61 | 64 | |
|
65 | * Numerous bugs on Windows with the new ipcluster have been fixed. | |
|
66 | ||
|
62 | 67 | * The ipengine and ipcontroller scripts now handle missing furl files |
|
63 | 68 | more gracefully by giving better error messages. |
|
64 | 69 |
@@ -202,11 +202,9 b' Two helper shell scripts are used to start and stop :command:`ipengine` on remot' | |||
|
202 | 202 | * sshx.sh |
|
203 | 203 | * engine_killer.sh |
|
204 | 204 | |
|
205 | Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a | |
|
206 | temp directory on the remote host and executed from there, on most Unix, Linux | |
|
207 | and OS X systems this is /tmp. | |
|
205 | Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp. | |
|
208 | 206 | |
|
209 | The sshx.sh is as simple as: | |
|
207 | The default sshx.sh is the following: | |
|
210 | 208 | |
|
211 | 209 | .. sourcecode:: bash |
|
212 | 210 | |
@@ -231,7 +229,7 b' Current limitations of the SSH mode of :command:`ipcluster` are:' | |||
|
231 | 229 | Also, we are using shell scripts to setup and execute commands on remote |
|
232 | 230 | hosts. |
|
233 | 231 | * :command:`ipcontroller` is started on localhost, with no option to start it |
|
234 |
on a remote node |
|
|
232 | on a remote node. | |
|
235 | 233 | |
|
236 | 234 | Using the :command:`ipcontroller` and :command:`ipengine` commands |
|
237 | 235 | ================================================================== |
General Comments 0
You need to be logged in to leave comments.
Login now