##// END OF EJS Templates
Merging vvatsa's ipcluster-dev branch....
Brian Granger -
r1833:e4b173fe merge
parent child Browse files
Show More
@@ -693,7 +693,7 b' class QueuedEngine(object):'
693 @queue
693 @queue
694 def execute(self, lines):
694 def execute(self, lines):
695 pass
695 pass
696
696
697 @queue
697 @queue
698 def push(self, namespace):
698 def push(self, namespace):
699 pass
699 pass
@@ -131,7 +131,7 b' class FCSynchronousMultiEngineFromMultiEngine(Referenceable):'
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 return did
133 return did
134
134
135 #---------------------------------------------------------------------------
135 #---------------------------------------------------------------------------
136 # IEngineMultiplexer related methods
136 # IEngineMultiplexer related methods
137 #---------------------------------------------------------------------------
137 #---------------------------------------------------------------------------
@@ -346,7 +346,7 b' class FCFullSynchronousMultiEngineClient(object):'
346 #---------------------------------------------------------------------------
346 #---------------------------------------------------------------------------
347 # IEngineMultiplexer related methods
347 # IEngineMultiplexer related methods
348 #---------------------------------------------------------------------------
348 #---------------------------------------------------------------------------
349
349
350 def execute(self, lines, targets='all', block=True):
350 def execute(self, lines, targets='all', block=True):
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 d.addCallback(self.unpackage)
352 d.addCallback(self.unpackage)
@@ -18,6 +18,7 b' import os'
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 pjoin = os.path.join
22 pjoin = os.path.join
22
23
23 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
@@ -81,10 +82,10 b' class LauncherProcessProtocol(ProcessProtocol):'
81 )
82 )
82 else:
83 else:
83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84
85
85 def outReceived(self, data):
86 def outReceived(self, data):
86 log.msg(data)
87 log.msg(data)
87
88
88 def errReceived(self, data):
89 def errReceived(self, data):
89 log.err(data)
90 log.err(data)
90
91
@@ -272,7 +273,7 b' class BatchEngineSet(object):'
272 self.context = {}
273 self.context = {}
273 self.context.update(kwargs)
274 self.context.update(kwargs)
274 self.batch_file = self.template_file+'-run'
275 self.batch_file = self.template_file+'-run'
275
276
276 def parse_job_id(self, output):
277 def parse_job_id(self, output):
277 m = re.match(self.job_id_regexp, output)
278 m = re.match(self.job_id_regexp, output)
278 if m is not None:
279 if m is not None:
@@ -320,6 +321,140 b' class PBSEngineSet(BatchEngineSet):'
320 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321
322
322
323
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
326 echo $!
327 """
328
329 engine_killer_template="""#!/bin/sh
330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 """
332
333 class SSHEngineSet(object):
334 sshx_template=sshx_template
335 engine_killer_template=engine_killer_template
336
337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 """Start a controller on localhost and engines using ssh.
339
340 The engine_hosts argument is a dict with hostnames as keys and
341 the number of engine (int) as values. sshx is the name of a local
342 file that will be used to run remote commands. This file is used
343 to setup the environment properly.
344 """
345
346 self.temp_dir = tempfile.gettempdir()
347 if sshx is not None:
348 self.sshx = sshx
349 else:
350 # Write the sshx.sh file locally from our template.
351 self.sshx = os.path.join(
352 self.temp_dir,
353 '%s-main-sshx.sh' % os.environ['USER']
354 )
355 f = open(self.sshx, 'w')
356 f.writelines(self.sshx_template)
357 f.close()
358 self.engine_command = ipengine
359 self.engine_hosts = engine_hosts
360 # Write the engine killer script file locally from our template.
361 self.engine_killer = os.path.join(
362 self.temp_dir,
363 '%s-local-engine_killer.sh' % os.environ['USER']
364 )
365 f = open(self.engine_killer, 'w')
366 f.writelines(self.engine_killer_template)
367 f.close()
368
369 def start(self, send_furl=False):
370 dlist = []
371 for host in self.engine_hosts.keys():
372 count = self.engine_hosts[host]
373 d = self._start(host, count, send_furl)
374 dlist.append(d)
375 return gatherBoth(dlist, consumeErrors=True)
376
377 def _start(self, hostname, count=1, send_furl=False):
378 if send_furl:
379 d = self._scp_furl(hostname)
380 else:
381 d = defer.succeed(None)
382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 return d
385
386 def _scp_furl(self, hostname):
387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 cmd_list = scp_cmd.split()
389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 log.msg('Copying furl file: %s' % scp_cmd)
391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 return d
393
394 def _scp_sshx(self, hostname):
395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 self.sshx, hostname,
397 self.temp_dir, os.environ['USER']
398 )
399 print
400 log.msg("Copying sshx: %s" % scp_cmd)
401 sshx_scp = scp_cmd.split()
402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 return d
404
405 def _ssh_engine(self, hostname, count):
406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 hostname, self.temp_dir,
408 os.environ['USER'], self.engine_command
409 )
410 cmds = exec_engine.split()
411 dlist = []
412 log.msg("about to start engines...")
413 for i in range(count):
414 log.msg('Starting engines: %s' % exec_engine)
415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 dlist.append(d)
417 return gatherBoth(dlist, consumeErrors=True)
418
419 def kill(self):
420 dlist = []
421 for host in self.engine_hosts.keys():
422 d = self._killall(host)
423 dlist.append(d)
424 return gatherBoth(dlist, consumeErrors=True)
425
426 def _killall(self, hostname):
427 d = self._scp_engine_killer(hostname)
428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 # d.addErrback(self._exec_err)
430 return d
431
432 def _scp_engine_killer(self, hostname):
433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 self.engine_killer,
435 hostname,
436 self.temp_dir,
437 os.environ['USER']
438 )
439 cmds = scp_cmd.split()
440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 return d
443
444 def _ssh_kill(self, hostname):
445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 hostname,
447 self.temp_dir,
448 os.environ['USER']
449 )
450 log.msg('Killing engine: %s' % kill_cmd)
451 kill_cmd = kill_cmd.split()
452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 return d
454
455 def _exec_err(self, r):
456 log.msg(r)
457
323 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
324 # Main functions for the different types of clusters
459 # Main functions for the different types of clusters
325 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
@@ -343,6 +478,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")'
343 cont_args.append('-y')
478 cont_args.append('-y')
344 return True
479 return True
345
480
481
346 def main_local(args):
482 def main_local(args):
347 cont_args = []
483 cont_args = []
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -376,6 +512,7 b' def main_local(args):'
376 dstart.addCallback(delay_start)
512 dstart.addCallback(delay_start)
377 dstart.addErrback(lambda f: f.raiseException())
513 dstart.addErrback(lambda f: f.raiseException())
378
514
515
379 def main_mpirun(args):
516 def main_mpirun(args):
380 cont_args = []
517 cont_args = []
381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
518 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -413,6 +550,7 b' def main_mpirun(args):'
413 dstart.addCallback(delay_start)
550 dstart.addCallback(delay_start)
414 dstart.addErrback(lambda f: f.raiseException())
551 dstart.addErrback(lambda f: f.raiseException())
415
552
553
416 def main_pbs(args):
554 def main_pbs(args):
417 cont_args = []
555 cont_args = []
418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
556 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -437,6 +575,49 b' def main_pbs(args):'
437 dstart.addErrback(lambda f: f.raiseException())
575 dstart.addErrback(lambda f: f.raiseException())
438
576
439
577
578 def main_ssh(args):
579 """Start a controller on localhost and engines using ssh.
580
581 Your clusterfile should look like::
582
583 send_furl = False # True, if you want
584 engines = {
585 'engine_host1' : engine_count,
586 'engine_host2' : engine_count2
587 }
588 """
589 clusterfile = {}
590 execfile(args.clusterfile, clusterfile)
591 if not clusterfile.has_key('send_furl'):
592 clusterfile['send_furl'] = False
593
594 cont_args = []
595 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596
597 # Check security settings before proceeding
598 if not check_security(args, cont_args):
599 return
600
601 cl = ControllerLauncher(extra_args=cont_args)
602 dstart = cl.start()
603 def start_engines(cont_pid):
604 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 def shutdown(signum, frame):
606 d = ssh_set.kill()
607 # d.addErrback(log.err)
608 cl.interrupt_then_kill(1.0)
609 reactor.callLater(2.0, reactor.stop)
610 signal.signal(signal.SIGINT,shutdown)
611 d = ssh_set.start(clusterfile['send_furl'])
612 return d
613
614 def delay_start(cont_pid):
615 reactor.callLater(1.0, start_engines, cont_pid)
616
617 dstart.addCallback(delay_start)
618 dstart.addErrback(lambda f: f.raiseException())
619
620
440 def get_args():
621 def get_args():
441 base_parser = argparse.ArgumentParser(add_help=False)
622 base_parser = argparse.ArgumentParser(add_help=False)
442 base_parser.add_argument(
623 base_parser.add_argument(
@@ -508,6 +689,27 b' def get_args():'
508 default='pbs.template'
689 default='pbs.template'
509 )
690 )
510 parser_pbs.set_defaults(func=main_pbs)
691 parser_pbs.set_defaults(func=main_pbs)
692
693 parser_ssh = subparsers.add_parser(
694 'ssh',
695 help='run a cluster using ssh, should have ssh-keys setup',
696 parents=[base_parser]
697 )
698 parser_ssh.add_argument(
699 '--clusterfile',
700 type=str,
701 dest='clusterfile',
702 help='python file describing the cluster',
703 default='clusterfile.py',
704 )
705 parser_ssh.add_argument(
706 '--sshx',
707 type=str,
708 dest='sshx',
709 help='sshx launcher helper'
710 )
711 parser_ssh.set_defaults(func=main_ssh)
712
511 args = parser.parse_args()
713 args = parser.parse_args()
512 return args
714 return args
513
715
@@ -27,6 +27,9 b' Release dev'
27 New features
27 New features
28 ------------
28 ------------
29
29
30 * The new ipcluster now has a fully working ssh mode that should work on
31 Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this!
32
30 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
33 * The wonderful TextMate editor can now be used with %edit on OS X. Thanks
31 to Matt Foster for this patch.
34 to Matt Foster for this patch.
32
35
@@ -59,6 +62,8 b' New features'
59 Bug fixes
62 Bug fixes
60 ---------
63 ---------
61
64
65 * Numerous bugs on Windows with the new ipcluster have been fixed.
66
62 * The ipengine and ipcontroller scripts now handle missing furl files
67 * The ipengine and ipcontroller scripts now handle missing furl files
63 more gracefully by giving better error messages.
68 more gracefully by giving better error messages.
64
69
@@ -53,6 +53,8 b' The :command:`ipcluster` command provides a simple way of starting a controller '
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
56
58
57 .. note::
59 .. note::
58
60
@@ -66,7 +68,8 b' The :command:`ipcluster` command provides a simple way of starting a controller '
66 :file:`~/.ipython/security` directory live on a shared filesystem that is
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
67 seen by both the controller and engines. If you don't have a shared file
69 seen by both the controller and engines. If you don't have a shared file
68 system you will need to use :command:`ipcontroller` and
70 system you will need to use :command:`ipcontroller` and
69 :command:`ipengine` directly.
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
70
73
71 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
72 and :command:`ipengine` to perform the steps described above.
75 and :command:`ipengine` to perform the steps described above.
@@ -159,6 +162,75 b' Additional command line options for this mode can be found by doing::'
159
162
160 $ ipcluster pbs -h
163 $ ipcluster pbs -h
161
164
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
175 .. sourcecode:: python
176
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
182
183 Since this is a regular python file usual python syntax applies. Things to note:
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
194
195 .. sourcecode:: bash
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
202 * sshx.sh
203 * engine_killer.sh
204
205 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206
207 The default sshx.sh is the following:
208
209 .. sourcecode:: bash
210
211 #!/bin/sh
212 "$@" &> /dev/null &
213 echo $!
214
215 If you want to use a custom sshx.sh script you need to use the ``--sshx``
216 option and specify the file to use. Using a custom sshx.sh file could be
217 helpful when you need to setup the environment on the remote host before
218 executing :command:`ipengine`.
219
220 For a detailed options list:
221
222 .. sourcecode:: bash
223
224 $ ipcluster ssh -h
225
226 Current limitations of the SSH mode of :command:`ipcluster` are:
227
228 * Untested on Windows. Would require a working :command:`ssh` on Windows.
229 Also, we are using shell scripts to setup and execute commands on remote
230 hosts.
231 * :command:`ipcontroller` is started on localhost, with no option to start it
232 on a remote node.
233
162 Using the :command:`ipcontroller` and :command:`ipengine` commands
234 Using the :command:`ipcontroller` and :command:`ipengine` commands
163 ==================================================================
235 ==================================================================
164
236
@@ -249,3 +321,4 b' the log files to us will often help us to debug any problems.'
249
321
250
322
251 .. [PBS] Portable Batch System. http://www.openpbs.org/
323 .. [PBS] Portable Batch System. http://www.openpbs.org/
324 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now