##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Brian Granger -
Show More
@@ -400,7 +400,6 b' class EngineService(object, service.Service):'
400 400
401 401 # The IEngine methods. See the interface for documentation.
402 402
403 @profile
404 403 def execute(self, lines):
405 404 msg = {'engineid':self.id,
406 405 'method':'execute',
@@ -552,7 +552,6 b' class SynchronousMultiEngine(PendingDeferredManager):'
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 @profile
556 555 @two_phase
557 556 def execute(self, lines, targets='all'):
558 557 d = self.multiengine.execute(lines, targets)
@@ -320,83 +320,140 b' class PBSEngineSet(BatchEngineSet):'
320 320 def __init__(self, template_file, **kwargs):
321 321 BatchEngineSet.__init__(self, template_file, **kwargs)
322 322
323 class SSHEngineSet(object):
324 sshx_template="""#!/bin/sh
323
324 sshx_template="""#!/bin/sh
325 325 "$@" &> /dev/null &
326 echo $!"""
327
328 engine_killer_template="""#!/bin/sh
326 echo $!
327 """
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
329 332
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
331 336
332 337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
333 346 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
347 if sshx is not None:
335 348 self.sshx = sshx
336 349 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
338 355 f = open(self.sshx, 'w')
339 356 f.writelines(self.sshx_template)
340 357 f.close()
341 358 self.engine_command = ipengine
342 359 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
344 365 f = open(self.engine_killer, 'w')
345 366 f.writelines(self.engine_killer_template)
346 367 f.close()
347 368
348 369 def start(self, send_furl=False):
370 dlist = []
349 371 for host in self.engine_hosts.keys():
350 372 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
352
353 def killall(self):
354 for host in self.engine_hosts.keys():
355 self._killall(host)
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
356 376
357 def _start(self, host_name, count=1, send_furl=False):
358
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
361 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
365
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
368 cmds = exec_engine.split()
369 print cmds
370 for i in range(count):
371 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
372
377 def _start(self, hostname, count=1, send_furl=False):
373 378 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
379 d = self._scp_furl(hostname)
380 380 else:
381 _scp_sshx(d=None)
382
383 def _killall(self, host_name):
384 def _exec_err(d):
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
387 385
388 def _exec_kill(d):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
390 kill_cmd = kill_cmd.split()
391 print kill_cmd
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
393 d.addErrback(_exec_err)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
395 439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
396 441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
398 d.addErrback(_exec_err)
442 return d
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
399 454
455 def _exec_err(self, r):
456 log.msg(r)
400 457
401 458 #-----------------------------------------------------------------------------
402 459 # Main functions for the different types of clusters
@@ -518,11 +575,17 b' def main_pbs(args):'
518 575 dstart.addErrback(lambda f: f.raiseException())
519 576
520 577
521 # currently the ssh launcher only launches the controller on localhost.
522 578 def main_ssh(args):
523 # the clusterfile should look like:
524 # send_furl = False # True, if you want
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
579 """Start a controller on localhost and engines using ssh.
580
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
526 589 clusterfile = {}
527 590 execfile(args.clusterfile, clusterfile)
528 591 if not clusterfile.has_key('send_furl'):
@@ -530,21 +593,24 b' def main_ssh(args):'
530 593
531 594 cont_args = []
532 595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
534 cont_args.append('-x')
535 if args.y:
536 cont_args.append('-y')
596
597 # Check security settings before proceeding
598 if not check_security(args, cont_args):
599 return
600
537 601 cl = ControllerLauncher(extra_args=cont_args)
538 602 dstart = cl.start()
539 603 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
542 605 def shutdown(signum, frame):
543 est.killall()
544 cl.interrupt_then_kill(0.5)
606 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
545 609 reactor.callLater(2.0, reactor.stop)
546 610 signal.signal(signal.SIGINT,shutdown)
547
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
548 614 def delay_start(cont_pid):
549 615 reactor.callLater(1.0, start_engines, cont_pid)
550 616
@@ -640,8 +706,7 b' def get_args():'
640 706 '--sshx',
641 707 type=str,
642 708 dest='sshx',
643 help='sshx launcher helper',
644 default='sshx.sh',
709 help='sshx launcher helper'
645 710 )
646 711 parser_ssh.set_defaults(func=main_ssh)
647 712
@@ -27,6 +27,9 b' Release dev'
27 27 New features
28 28 ------------
29 29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 34 to Matt Foster for this patch.
32 35
@@ -59,6 +62,8 b' New features'
59 62 Bug fixes
60 63 ---------
61 64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 67 * The ipengine and ipcontroller scripts now handle missing furl files
63 68 more gracefully by giving better error messages.
64 69
@@ -202,11 +202,9 b' Two helper shell scripts are used to start and stop :command:`ipengine` on remot'
202 202 * sshx.sh
203 203 * engine_killer.sh
204 204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
208 206
209 The sshx.sh is as simple as:
207 The default sshx.sh is the following:
210 208
211 209 .. sourcecode:: bash
212 210
@@ -231,7 +229,7 b' Current limitations of the SSH mode of :command:`ipcluster` are:'
231 229 Also, we are using shell scripts to setup and execute commands on remote
232 230 hosts.
233 231 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
232 on a remote node.
235 233
236 234 Using the :command:`ipcontroller` and :command:`ipengine` commands
237 235 ==================================================================
General Comments 0
You need to be logged in to leave comments. Login now