##// END OF EJS Templates
Merging in vvatsa's ssh mode for ipcluster with some changes....
Brian Granger -
Show More
@@ -400,7 +400,6 b' class EngineService(object, service.Service):'
400
400
401 # The IEngine methods. See the interface for documentation.
401 # The IEngine methods. See the interface for documentation.
402
402
403 @profile
404 def execute(self, lines):
403 def execute(self, lines):
405 msg = {'engineid':self.id,
404 msg = {'engineid':self.id,
406 'method':'execute',
405 'method':'execute',
@@ -552,7 +552,6 b' class SynchronousMultiEngine(PendingDeferredManager):'
552 # Decorated pending deferred methods
552 # Decorated pending deferred methods
553 #---------------------------------------------------------------------------
553 #---------------------------------------------------------------------------
554
554
555 @profile
556 @two_phase
555 @two_phase
557 def execute(self, lines, targets='all'):
556 def execute(self, lines, targets='all'):
558 d = self.multiengine.execute(lines, targets)
557 d = self.multiengine.execute(lines, targets)
@@ -320,83 +320,140 b' class PBSEngineSet(BatchEngineSet):'
320 def __init__(self, template_file, **kwargs):
320 def __init__(self, template_file, **kwargs):
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
322
322
323 class SSHEngineSet(object):
323
324 sshx_template="""#!/bin/sh
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
325 "$@" &> /dev/null &
326 echo $!"""
326 echo $!
327
327 """
328 engine_killer_template="""#!/bin/sh
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
329
332
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
331
336
332 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
333 self.temp_dir = tempfile.gettempdir()
346 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
347 if sshx is not None:
335 self.sshx = sshx
348 self.sshx = sshx
336 else:
349 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
338 f = open(self.sshx, 'w')
355 f = open(self.sshx, 'w')
339 f.writelines(self.sshx_template)
356 f.writelines(self.sshx_template)
340 f.close()
357 f.close()
341 self.engine_command = ipengine
358 self.engine_command = ipengine
342 self.engine_hosts = engine_hosts
359 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
344 f = open(self.engine_killer, 'w')
365 f = open(self.engine_killer, 'w')
345 f.writelines(self.engine_killer_template)
366 f.writelines(self.engine_killer_template)
346 f.close()
367 f.close()
347
368
348 def start(self, send_furl=False):
369 def start(self, send_furl=False):
370 dlist = []
349 for host in self.engine_hosts.keys():
371 for host in self.engine_hosts.keys():
350 count = self.engine_hosts[host]
372 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
373 d = self._start(host, count, send_furl)
352
374 dlist.append(d)
353 def killall(self):
375 return gatherBoth(dlist, consumeErrors=True)
354 for host in self.engine_hosts.keys():
355 self._killall(host)
356
376
357 def _start(self, host_name, count=1, send_furl=False):
377 def _start(self, hostname, count=1, send_furl=False):
358
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
361 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
365
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
368 cmds = exec_engine.split()
369 print cmds
370 for i in range(count):
371 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
372
373 if send_furl:
378 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
379 d = self._scp_furl(hostname)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
380 else:
380 else:
381 _scp_sshx(d=None)
381 d = defer.succeed(None)
382
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 def _killall(self, host_name):
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 def _exec_err(d):
384 return d
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
387
385
388 def _exec_kill(d):
386 def _scp_furl(self, hostname):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
390 kill_cmd = kill_cmd.split()
388 cmd_list = scp_cmd.split()
391 print kill_cmd
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
390 log.msg('Copying furl file: %s' % scp_cmd)
393 d.addErrback(_exec_err)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
395 cmds = scp_cmd.split()
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
396 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
442 return d
398 d.addErrback(_exec_err)
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
399
454
455 def _exec_err(self, r):
456 log.msg(r)
400
457
401 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
402 # Main functions for the different types of clusters
459 # Main functions for the different types of clusters
@@ -518,11 +575,17 b' def main_pbs(args):'
518 dstart.addErrback(lambda f: f.raiseException())
575 dstart.addErrback(lambda f: f.raiseException())
519
576
520
577
521 # currently the ssh launcher only launches the controller on localhost.
522 def main_ssh(args):
578 def main_ssh(args):
523 # the clusterfile should look like:
579 """Start a controller on localhost and engines using ssh.
524 # send_furl = False # True, if you want
580
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
526 clusterfile = {}
589 clusterfile = {}
527 execfile(args.clusterfile, clusterfile)
590 execfile(args.clusterfile, clusterfile)
528 if not clusterfile.has_key('send_furl'):
591 if not clusterfile.has_key('send_furl'):
@@ -530,21 +593,24 b' def main_ssh(args):'
530
593
531 cont_args = []
594 cont_args = []
532 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
596
534 cont_args.append('-x')
597 # Check security settings before proceeding
535 if args.y:
598 if not check_security(args, cont_args):
536 cont_args.append('-y')
599 return
600
537 cl = ControllerLauncher(extra_args=cont_args)
601 cl = ControllerLauncher(extra_args=cont_args)
538 dstart = cl.start()
602 dstart = cl.start()
539 def start_engines(cont_pid):
603 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
542 def shutdown(signum, frame):
605 def shutdown(signum, frame):
543 est.killall()
606 d = ssh_set.kill()
544 cl.interrupt_then_kill(0.5)
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
545 reactor.callLater(2.0, reactor.stop)
609 reactor.callLater(2.0, reactor.stop)
546 signal.signal(signal.SIGINT,shutdown)
610 signal.signal(signal.SIGINT,shutdown)
547
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
548 def delay_start(cont_pid):
614 def delay_start(cont_pid):
549 reactor.callLater(1.0, start_engines, cont_pid)
615 reactor.callLater(1.0, start_engines, cont_pid)
550
616
@@ -640,8 +706,7 b' def get_args():'
640 '--sshx',
706 '--sshx',
641 type=str,
707 type=str,
642 dest='sshx',
708 dest='sshx',
643 help='sshx launcher helper',
709 help='sshx launcher helper'
644 default='sshx.sh',
645 )
710 )
646 parser_ssh.set_defaults(func=main_ssh)
711 parser_ssh.set_defaults(func=main_ssh)
647
712
@@ -27,6 +27,9 b' Release dev'
27 New features
27 New features
28 ------------
28 ------------
29
29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 to Matt Foster for this patch.
34 to Matt Foster for this patch.
32
35
@@ -59,6 +62,8 b' New features'
59 Bug fixes
62 Bug fixes
60 ---------
63 ---------
61
64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 * The ipengine and ipcontroller scripts now handle missing furl files
67 * The ipengine and ipcontroller scripts now handle missing furl files
63 more gracefully by giving better error messages.
68 more gracefully by giving better error messages.
64
69
@@ -202,11 +202,9 b' Two helper shell scripts are used to start and stop :command:`ipengine` on remot'
202 * sshx.sh
202 * sshx.sh
203 * engine_killer.sh
203 * engine_killer.sh
204
204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
208
206
209 The sshx.sh is as simple as:
207 The default sshx.sh is the following:
210
208
211 .. sourcecode:: bash
209 .. sourcecode:: bash
212
210
@@ -231,7 +229,7 b' Current limitations of the SSH mode of :command:`ipcluster` are:'
231 Also, we are using shell scripts to setup and execute commands on remote
229 Also, we are using shell scripts to setup and execute commands on remote
232 hosts.
230 hosts.
233 * :command:`ipcontroller` is started on localhost, with no option to start it
231 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
232 on a remote node.
235
233
236 Using the :command:`ipcontroller` and :command:`ipengine` commands
234 Using the :command:`ipcontroller` and :command:`ipengine` commands
237 ==================================================================
235 ==================================================================
General Comments 0
You need to be logged in to leave comments. Login now