##// END OF EJS Templates
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
Brian Granger -
Show More
@@ -18,6 +18,7 b' import os'
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 pjoin = os.path.join
22 pjoin = os.path.join
22
23
23 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
@@ -81,10 +82,10 b' class LauncherProcessProtocol(ProcessProtocol):'
81 )
82 )
82 else:
83 else:
83 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84
85
85 def outReceived(self, data):
86 def outReceived(self, data):
86 log.msg(data)
87 log.msg(data)
87
88
88 def errReceived(self, data):
89 def errReceived(self, data):
89 log.err(data)
90 log.err(data)
90
91
@@ -272,7 +273,7 b' class BatchEngineSet(object):'
272 self.context = {}
273 self.context = {}
273 self.context.update(kwargs)
274 self.context.update(kwargs)
274 self.batch_file = self.template_file+'-run'
275 self.batch_file = self.template_file+'-run'
275
276
276 def parse_job_id(self, output):
277 def parse_job_id(self, output):
277 m = re.match(self.job_id_regexp, output)
278 m = re.match(self.job_id_regexp, output)
278 if m is not None:
279 if m is not None:
@@ -319,6 +320,83 b' class PBSEngineSet(BatchEngineSet):'
319 def __init__(self, template_file, **kwargs):
320 def __init__(self, template_file, **kwargs):
320 BatchEngineSet.__init__(self, template_file, **kwargs)
321 BatchEngineSet.__init__(self, template_file, **kwargs)
321
322
323 class SSHEngineSet(object):
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
326 echo $!"""
327
328 engine_killer_template="""#!/bin/sh
329
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
331
332 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
333 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
335 self.sshx = sshx
336 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
338 f = open(self.sshx, 'w')
339 f.writelines(self.sshx_template)
340 f.close()
341 self.engine_command = ipengine
342 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
344 f = open(self.engine_killer, 'w')
345 f.writelines(self.engine_killer_template)
346 f.close()
347
348 def start(self, send_furl=False):
349 for host in self.engine_hosts.keys():
350 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
352
353 def killall(self):
354 for host in self.engine_hosts.keys():
355 self._killall(host)
356
357 def _start(self, host_name, count=1, send_furl=False):
358
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
361 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
365
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
368 cmds = exec_engine.split()
369 print cmds
370 for i in range(count):
371 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
372
373 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
380 else:
381 _scp_sshx(d=None)
382
383 def _killall(self, host_name):
384 def _exec_err(d):
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
387
388 def _exec_kill(d):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
390 kill_cmd = kill_cmd.split()
391 print kill_cmd
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
393 d.addErrback(_exec_err)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
395 cmds = scp_cmd.split()
396 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
398 d.addErrback(_exec_err)
399
322
400
323 #-----------------------------------------------------------------------------
401 #-----------------------------------------------------------------------------
324 # Main functions for the different types of clusters
402 # Main functions for the different types of clusters
@@ -343,6 +421,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")'
343 cont_args.append('-y')
421 cont_args.append('-y')
344 return True
422 return True
345
423
424
346 def main_local(args):
425 def main_local(args):
347 cont_args = []
426 cont_args = []
348 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
427 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -376,6 +455,7 b' def main_local(args):'
376 dstart.addCallback(delay_start)
455 dstart.addCallback(delay_start)
377 dstart.addErrback(lambda f: f.raiseException())
456 dstart.addErrback(lambda f: f.raiseException())
378
457
458
379 def main_mpirun(args):
459 def main_mpirun(args):
380 cont_args = []
460 cont_args = []
381 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
461 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -413,6 +493,7 b' def main_mpirun(args):'
413 dstart.addCallback(delay_start)
493 dstart.addCallback(delay_start)
414 dstart.addErrback(lambda f: f.raiseException())
494 dstart.addErrback(lambda f: f.raiseException())
415
495
496
416 def main_pbs(args):
497 def main_pbs(args):
417 cont_args = []
498 cont_args = []
418 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
499 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -437,6 +518,40 b' def main_pbs(args):'
437 dstart.addErrback(lambda f: f.raiseException())
518 dstart.addErrback(lambda f: f.raiseException())
438
519
439
520
521 # currently the ssh launcher only launches the controller on localhost.
522 def main_ssh(args):
523 # the clusterfile should look like:
524 # send_furl = False # True, if you want
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
526 clusterfile = {}
527 execfile(args.clusterfile, clusterfile)
528 if not clusterfile.has_key('send_furl'):
529 clusterfile['send_furl'] = False
530
531 cont_args = []
532 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
534 cont_args.append('-x')
535 if args.y:
536 cont_args.append('-y')
537 cl = ControllerLauncher(extra_args=cont_args)
538 dstart = cl.start()
539 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
542 def shutdown(signum, frame):
543 est.killall()
544 cl.interrupt_then_kill(0.5)
545 reactor.callLater(2.0, reactor.stop)
546 signal.signal(signal.SIGINT,shutdown)
547
548 def delay_start(cont_pid):
549 reactor.callLater(1.0, start_engines, cont_pid)
550
551 dstart.addCallback(delay_start)
552 dstart.addErrback(lambda f: f.raiseException())
553
554
440 def get_args():
555 def get_args():
441 base_parser = argparse.ArgumentParser(add_help=False)
556 base_parser = argparse.ArgumentParser(add_help=False)
442 base_parser.add_argument(
557 base_parser.add_argument(
@@ -508,6 +623,28 b' def get_args():'
508 default='pbs.template'
623 default='pbs.template'
509 )
624 )
510 parser_pbs.set_defaults(func=main_pbs)
625 parser_pbs.set_defaults(func=main_pbs)
626
627 parser_ssh = subparsers.add_parser(
628 'ssh',
629 help='run a cluster using ssh, should have ssh-keys setup',
630 parents=[base_parser]
631 )
632 parser_ssh.add_argument(
633 '--clusterfile',
634 type=str,
635 dest='clusterfile',
636 help='python file describing the cluster',
637 default='clusterfile.py',
638 )
639 parser_ssh.add_argument(
640 '--sshx',
641 type=str,
642 dest='sshx',
643 help='sshx launcher helper',
644 default='sshx.sh',
645 )
646 parser_ssh.set_defaults(func=main_ssh)
647
511 args = parser.parse_args()
648 args = parser.parse_args()
512 return args
649 return args
513
650
@@ -53,6 +53,8 b' The :command:`ipcluster` command provides a simple way of starting a controller '
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
56
58
57 .. note::
59 .. note::
58
60
@@ -66,7 +68,8 b' The :command:`ipcluster` command provides a simple way of starting a controller '
66 :file:`~/.ipython/security` directory live on a shared filesystem that is
68 :file:`~/.ipython/security` directory live on a shared filesystem that is
67 seen by both the controller and engines. If you don't have a shared file
69 seen by both the controller and engines. If you don't have a shared file
68 system you will need to use :command:`ipcontroller` and
70 system you will need to use :command:`ipcontroller` and
69 :command:`ipengine` directly.
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
70
73
71 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
72 and :command:`ipengine` to perform the steps described above.
75 and :command:`ipengine` to perform the steps described above.
@@ -159,6 +162,77 b' Additional command line options for this mode can be found by doing::'
159
162
160 $ ipcluster pbs -h
163 $ ipcluster pbs -h
161
164
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
175 .. sourcecode:: python
176
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
182
183 Since this is a regular python file usual python syntax applies. Things to note:
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
194
195 .. sourcecode:: bash
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
202 * sshx.sh
203 * engine_killer.sh
204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
208
209 The sshx.sh is as simple as:
210
211 .. sourcecode:: bash
212
213 #!/bin/sh
214 "$@" &> /dev/null &
215 echo $!
216
217 If you want to use a custom sshx.sh script you need to use the ``--sshx``
218 option and specify the file to use. Using a custom sshx.sh file could be
219 helpful when you need to setup the environment on the remote host before
220 executing :command:`ipengine`.
221
222 For a detailed options list:
223
224 .. sourcecode:: bash
225
226 $ ipcluster ssh -h
227
228 Current limitations of the SSH mode of :command:`ipcluster` are:
229
230 * Untested on Windows. Would require a working :command:`ssh` on Windows.
231 Also, we are using shell scripts to setup and execute commands on remote
232 hosts.
233 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
235
162 Using the :command:`ipcontroller` and :command:`ipengine` commands
236 Using the :command:`ipcontroller` and :command:`ipengine` commands
163 ==================================================================
237 ==================================================================
164
238
@@ -249,3 +323,4 b' the log files to us will often help us to debug any problems.'
249
323
250
324
251 .. [PBS] Portable Batch System. http://www.openpbs.org/
325 .. [PBS] Portable Batch System. http://www.openpbs.org/
326 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now