Show More
@@ -693,7 +693,7 b' class QueuedEngine(object):' | |||
|
693 | 693 | @queue |
|
694 | 694 | def execute(self, lines): |
|
695 | 695 | pass |
|
696 | ||
|
696 | ||
|
697 | 697 | @queue |
|
698 | 698 | def push(self, namespace): |
|
699 | 699 | pass |
@@ -131,7 +131,7 b' class FCSynchronousMultiEngineFromMultiEngine(Referenceable):' | |||
|
131 | 131 | def _addDeferredIDCallback(self, did, callback, *args, **kwargs): |
|
132 | 132 | self._deferredIDCallbacks[did] = (callback, args, kwargs) |
|
133 | 133 | return did |
|
134 |
|
|
|
134 | ||
|
135 | 135 | #--------------------------------------------------------------------------- |
|
136 | 136 | # IEngineMultiplexer related methods |
|
137 | 137 | #--------------------------------------------------------------------------- |
@@ -346,7 +346,7 b' class FCFullSynchronousMultiEngineClient(object):' | |||
|
346 | 346 | #--------------------------------------------------------------------------- |
|
347 | 347 | # IEngineMultiplexer related methods |
|
348 | 348 | #--------------------------------------------------------------------------- |
|
349 |
|
|
|
349 | ||
|
350 | 350 | def execute(self, lines, targets='all', block=True): |
|
351 | 351 | d = self.remote_reference.callRemote('execute', lines, targets, block) |
|
352 | 352 | d.addCallback(self.unpackage) |
@@ -18,6 +18,7 b' import os' | |||
|
18 | 18 | import re |
|
19 | 19 | import sys |
|
20 | 20 | import signal |
|
21 | import tempfile | |
|
21 | 22 | pjoin = os.path.join |
|
22 | 23 | |
|
23 | 24 | from twisted.internet import reactor, defer |
@@ -81,10 +82,10 b' class LauncherProcessProtocol(ProcessProtocol):' | |||
|
81 | 82 | ) |
|
82 | 83 | else: |
|
83 | 84 | raise UnknownStatus("unknown exit status, this is probably a bug in Twisted") |
|
84 | ||
|
85 | ||
|
85 | 86 | def outReceived(self, data): |
|
86 | 87 | log.msg(data) |
|
87 | ||
|
88 | ||
|
88 | 89 | def errReceived(self, data): |
|
89 | 90 | log.err(data) |
|
90 | 91 | |
@@ -272,7 +273,7 b' class BatchEngineSet(object):' | |||
|
272 | 273 | self.context = {} |
|
273 | 274 | self.context.update(kwargs) |
|
274 | 275 | self.batch_file = self.template_file+'-run' |
|
275 | ||
|
276 | ||
|
276 | 277 | def parse_job_id(self, output): |
|
277 | 278 | m = re.match(self.job_id_regexp, output) |
|
278 | 279 | if m is not None: |
@@ -320,6 +321,140 b' class PBSEngineSet(BatchEngineSet):' | |||
|
320 | 321 | BatchEngineSet.__init__(self, template_file, **kwargs) |
|
321 | 322 | |
|
322 | 323 | |
|
324 | sshx_template="""#!/bin/sh | |
|
325 | "$@" &> /dev/null & | |
|
326 | echo $! | |
|
327 | """ | |
|
328 | ||
|
329 | engine_killer_template="""#!/bin/sh | |
|
330 | ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM | |
|
331 | """ | |
|
332 | ||
|
333 | class SSHEngineSet(object): | |
|
334 | sshx_template=sshx_template | |
|
335 | engine_killer_template=engine_killer_template | |
|
336 | ||
|
337 | def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"): | |
|
338 | """Start a controller on localhost and engines using ssh. | |
|
339 | ||
|
340 | The engine_hosts argument is a dict with hostnames as keys and | |
|
341 | the number of engine (int) as values. sshx is the name of a local | |
|
342 | file that will be used to run remote commands. This file is used | |
|
343 | to setup the environment properly. | |
|
344 | """ | |
|
345 | ||
|
346 | self.temp_dir = tempfile.gettempdir() | |
|
347 | if sshx is not None: | |
|
348 | self.sshx = sshx | |
|
349 | else: | |
|
350 | # Write the sshx.sh file locally from our template. | |
|
351 | self.sshx = os.path.join( | |
|
352 | self.temp_dir, | |
|
353 | '%s-main-sshx.sh' % os.environ['USER'] | |
|
354 | ) | |
|
355 | f = open(self.sshx, 'w') | |
|
356 | f.writelines(self.sshx_template) | |
|
357 | f.close() | |
|
358 | self.engine_command = ipengine | |
|
359 | self.engine_hosts = engine_hosts | |
|
360 | # Write the engine killer script file locally from our template. | |
|
361 | self.engine_killer = os.path.join( | |
|
362 | self.temp_dir, | |
|
363 | '%s-local-engine_killer.sh' % os.environ['USER'] | |
|
364 | ) | |
|
365 | f = open(self.engine_killer, 'w') | |
|
366 | f.writelines(self.engine_killer_template) | |
|
367 | f.close() | |
|
368 | ||
|
369 | def start(self, send_furl=False): | |
|
370 | dlist = [] | |
|
371 | for host in self.engine_hosts.keys(): | |
|
372 | count = self.engine_hosts[host] | |
|
373 | d = self._start(host, count, send_furl) | |
|
374 | dlist.append(d) | |
|
375 | return gatherBoth(dlist, consumeErrors=True) | |
|
376 | ||
|
377 | def _start(self, hostname, count=1, send_furl=False): | |
|
378 | if send_furl: | |
|
379 | d = self._scp_furl(hostname) | |
|
380 | else: | |
|
381 | d = defer.succeed(None) | |
|
382 | d.addCallback(lambda r: self._scp_sshx(hostname)) | |
|
383 | d.addCallback(lambda r: self._ssh_engine(hostname, count)) | |
|
384 | return d | |
|
385 | ||
|
386 | def _scp_furl(self, hostname): | |
|
387 | scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname) | |
|
388 | cmd_list = scp_cmd.split() | |
|
389 | cmd_list[1] = os.path.expanduser(cmd_list[1]) | |
|
390 | log.msg('Copying furl file: %s' % scp_cmd) | |
|
391 | d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ) | |
|
392 | return d | |
|
393 | ||
|
394 | def _scp_sshx(self, hostname): | |
|
395 | scp_cmd = "scp %s %s:%s/%s-sshx.sh" % ( | |
|
396 | self.sshx, hostname, | |
|
397 | self.temp_dir, os.environ['USER'] | |
|
398 | ) | |
|
399 | ||
|
400 | log.msg("Copying sshx: %s" % scp_cmd) | |
|
401 | sshx_scp = scp_cmd.split() | |
|
402 | d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ) | |
|
403 | return d | |
|
404 | ||
|
405 | def _ssh_engine(self, hostname, count): | |
|
406 | exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % ( | |
|
407 | hostname, self.temp_dir, | |
|
408 | os.environ['USER'], self.engine_command | |
|
409 | ) | |
|
410 | cmds = exec_engine.split() | |
|
411 | dlist = [] | |
|
412 | log.msg("about to start engines...") | |
|
413 | for i in range(count): | |
|
414 | log.msg('Starting engines: %s' % exec_engine) | |
|
415 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
416 | dlist.append(d) | |
|
417 | return gatherBoth(dlist, consumeErrors=True) | |
|
418 | ||
|
419 | def kill(self): | |
|
420 | dlist = [] | |
|
421 | for host in self.engine_hosts.keys(): | |
|
422 | d = self._killall(host) | |
|
423 | dlist.append(d) | |
|
424 | return gatherBoth(dlist, consumeErrors=True) | |
|
425 | ||
|
426 | def _killall(self, hostname): | |
|
427 | d = self._scp_engine_killer(hostname) | |
|
428 | d.addCallback(lambda r: self._ssh_kill(hostname)) | |
|
429 | # d.addErrback(self._exec_err) | |
|
430 | return d | |
|
431 | ||
|
432 | def _scp_engine_killer(self, hostname): | |
|
433 | scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % ( | |
|
434 | self.engine_killer, | |
|
435 | hostname, | |
|
436 | self.temp_dir, | |
|
437 | os.environ['USER'] | |
|
438 | ) | |
|
439 | cmds = scp_cmd.split() | |
|
440 | log.msg('Copying engine_killer: %s' % scp_cmd) | |
|
441 | d = getProcessOutput(cmds[0], cmds[1:], env=os.environ) | |
|
442 | return d | |
|
443 | ||
|
444 | def _ssh_kill(self, hostname): | |
|
445 | kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % ( | |
|
446 | hostname, | |
|
447 | self.temp_dir, | |
|
448 | os.environ['USER'] | |
|
449 | ) | |
|
450 | log.msg('Killing engine: %s' % kill_cmd) | |
|
451 | kill_cmd = kill_cmd.split() | |
|
452 | d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ) | |
|
453 | return d | |
|
454 | ||
|
455 | def _exec_err(self, r): | |
|
456 | log.msg(r) | |
|
457 | ||
|
323 | 458 | #----------------------------------------------------------------------------- |
|
324 | 459 | # Main functions for the different types of clusters |
|
325 | 460 | #----------------------------------------------------------------------------- |
@@ -343,6 +478,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")' | |||
|
343 | 478 | cont_args.append('-y') |
|
344 | 479 | return True |
|
345 | 480 | |
|
481 | ||
|
346 | 482 | def main_local(args): |
|
347 | 483 | cont_args = [] |
|
348 | 484 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
@@ -376,6 +512,7 b' def main_local(args):' | |||
|
376 | 512 | dstart.addCallback(delay_start) |
|
377 | 513 | dstart.addErrback(lambda f: f.raiseException()) |
|
378 | 514 | |
|
515 | ||
|
379 | 516 | def main_mpirun(args): |
|
380 | 517 | cont_args = [] |
|
381 | 518 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
@@ -413,6 +550,7 b' def main_mpirun(args):' | |||
|
413 | 550 | dstart.addCallback(delay_start) |
|
414 | 551 | dstart.addErrback(lambda f: f.raiseException()) |
|
415 | 552 | |
|
553 | ||
|
416 | 554 | def main_pbs(args): |
|
417 | 555 | cont_args = [] |
|
418 | 556 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
@@ -437,6 +575,49 b' def main_pbs(args):' | |||
|
437 | 575 | dstart.addErrback(lambda f: f.raiseException()) |
|
438 | 576 | |
|
439 | 577 | |
|
578 | def main_ssh(args): | |
|
579 | """Start a controller on localhost and engines using ssh. | |
|
580 | ||
|
581 | Your clusterfile should look like:: | |
|
582 | ||
|
583 | send_furl = False # True, if you want | |
|
584 | engines = { | |
|
585 | 'engine_host1' : engine_count, | |
|
586 | 'engine_host2' : engine_count2 | |
|
587 | } | |
|
588 | """ | |
|
589 | clusterfile = {} | |
|
590 | execfile(args.clusterfile, clusterfile) | |
|
591 | if not clusterfile.has_key('send_furl'): | |
|
592 | clusterfile['send_furl'] = False | |
|
593 | ||
|
594 | cont_args = [] | |
|
595 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | |
|
596 | ||
|
597 | # Check security settings before proceeding | |
|
598 | if not check_security(args, cont_args): | |
|
599 | return | |
|
600 | ||
|
601 | cl = ControllerLauncher(extra_args=cont_args) | |
|
602 | dstart = cl.start() | |
|
603 | def start_engines(cont_pid): | |
|
604 | ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx) | |
|
605 | def shutdown(signum, frame): | |
|
606 | d = ssh_set.kill() | |
|
607 | # d.addErrback(log.err) | |
|
608 | cl.interrupt_then_kill(1.0) | |
|
609 | reactor.callLater(2.0, reactor.stop) | |
|
610 | signal.signal(signal.SIGINT,shutdown) | |
|
611 | d = ssh_set.start(clusterfile['send_furl']) | |
|
612 | return d | |
|
613 | ||
|
614 | def delay_start(cont_pid): | |
|
615 | reactor.callLater(1.0, start_engines, cont_pid) | |
|
616 | ||
|
617 | dstart.addCallback(delay_start) | |
|
618 | dstart.addErrback(lambda f: f.raiseException()) | |
|
619 | ||
|
620 | ||
|
440 | 621 | def get_args(): |
|
441 | 622 | base_parser = argparse.ArgumentParser(add_help=False) |
|
442 | 623 | base_parser.add_argument( |
@@ -508,6 +689,27 b' def get_args():' | |||
|
508 | 689 | default='pbs.template' |
|
509 | 690 | ) |
|
510 | 691 | parser_pbs.set_defaults(func=main_pbs) |
|
692 | ||
|
693 | parser_ssh = subparsers.add_parser( | |
|
694 | 'ssh', | |
|
695 | help='run a cluster using ssh, should have ssh-keys setup', | |
|
696 | parents=[base_parser] | |
|
697 | ) | |
|
698 | parser_ssh.add_argument( | |
|
699 | '--clusterfile', | |
|
700 | type=str, | |
|
701 | dest='clusterfile', | |
|
702 | help='python file describing the cluster', | |
|
703 | default='clusterfile.py', | |
|
704 | ) | |
|
705 | parser_ssh.add_argument( | |
|
706 | '--sshx', | |
|
707 | type=str, | |
|
708 | dest='sshx', | |
|
709 | help='sshx launcher helper' | |
|
710 | ) | |
|
711 | parser_ssh.set_defaults(func=main_ssh) | |
|
712 | ||
|
511 | 713 | args = parser.parse_args() |
|
512 | 714 | return args |
|
513 | 715 |
@@ -27,6 +27,9 b' Release dev' | |||
|
27 | 27 | New features |
|
28 | 28 | ------------ |
|
29 | 29 | |
|
30 | * The new ipcluster now has a fully working ssh mode that should work on | |
|
31 | Linux, Unix and OS X. Thanks to Vishal Vatsa for implementing this! | |
|
32 | ||
|
30 | 33 | * The wonderful TextMate editor can now be used with %edit on OS X. Thanks |
|
31 | 34 | to Matt Foster for this patch. |
|
32 | 35 | |
@@ -59,6 +62,8 b' New features' | |||
|
59 | 62 | Bug fixes |
|
60 | 63 | --------- |
|
61 | 64 | |
|
65 | * Numerous bugs on Windows with the new ipcluster have been fixed. | |
|
66 | ||
|
62 | 67 | * The ipengine and ipcontroller scripts now handle missing furl files |
|
63 | 68 | more gracefully by giving better error messages. |
|
64 | 69 |
@@ -53,6 +53,8 b' The :command:`ipcluster` command provides a simple way of starting a controller ' | |||
|
53 | 53 | 2. When engines are started using the :command:`mpirun` command that comes |
|
54 | 54 | with most MPI [MPI]_ implementations |
|
55 | 55 | 3. When engines are started using the PBS [PBS]_ batch system. |
|
56 | 4. When the controller is started on localhost and the engines are started on | |
|
57 | remote nodes using :command:`ssh`. | |
|
56 | 58 | |
|
57 | 59 | .. note:: |
|
58 | 60 | |
@@ -66,7 +68,8 b' The :command:`ipcluster` command provides a simple way of starting a controller ' | |||
|
66 | 68 | :file:`~/.ipython/security` directory live on a shared filesystem that is |
|
67 | 69 | seen by both the controller and engines. If you don't have a shared file |
|
68 | 70 | system you will need to use :command:`ipcontroller` and |
|
69 | :command:`ipengine` directly. | |
|
71 | :command:`ipengine` directly. This constraint can be relaxed if you are | |
|
72 | using the :command:`ssh` method to start the cluster. | |
|
70 | 73 | |
|
71 | 74 | Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller` |
|
72 | 75 | and :command:`ipengine` to perform the steps described above. |
@@ -159,6 +162,75 b' Additional command line options for this mode can be found by doing::' | |||
|
159 | 162 | |
|
160 | 163 | $ ipcluster pbs -h |
|
161 | 164 | |
|
165 | Using :command:`ipcluster` in SSH mode | |
|
166 | -------------------------------------- | |
|
167 | ||
|
168 | The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote | |
|
169 | nodes and the :command:`ipcontroller` on localhost. | |
|
170 | ||
|
171 | When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins. | |
|
172 | ||
|
173 | To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile": | |
|
174 | ||
|
175 | .. sourcecode:: python | |
|
176 | ||
|
177 | send_furl = True | |
|
178 | engines = { 'host1.example.com' : 2, | |
|
179 | 'host2.example.com' : 5, | |
|
180 | 'host3.example.com' : 1, | |
|
181 | 'host4.example.com' : 8 } | |
|
182 | ||
|
183 | Since this is a regular python file usual python syntax applies. Things to note: | |
|
184 | ||
|
185 | * The `engines` dict, where the keys is the host we want to run engines on and | |
|
186 | the value is the number of engines to run on that host. | |
|
187 | * send_furl can either be `True` or `False`, if `True` it will copy over the | |
|
188 | furl needed for :command:`ipengine` to each host. | |
|
189 | ||
|
190 | The ``--clusterfile`` command line option lets you specify the file to use for | |
|
191 | the cluster definition. Once you have your cluster file and you can | |
|
192 | :command:`ssh` into the remote hosts with out an password you are ready to | |
|
193 | start your cluster like so: | |
|
194 | ||
|
195 | .. sourcecode:: bash | |
|
196 | ||
|
197 | $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py | |
|
198 | ||
|
199 | ||
|
200 | Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts: | |
|
201 | ||
|
202 | * sshx.sh | |
|
203 | * engine_killer.sh | |
|
204 | ||
|
205 | Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp. | |
|
206 | ||
|
207 | The default sshx.sh is the following: | |
|
208 | ||
|
209 | .. sourcecode:: bash | |
|
210 | ||
|
211 | #!/bin/sh | |
|
212 | "$@" &> /dev/null & | |
|
213 | echo $! | |
|
214 | ||
|
215 | If you want to use a custom sshx.sh script you need to use the ``--sshx`` | |
|
216 | option and specify the file to use. Using a custom sshx.sh file could be | |
|
217 | helpful when you need to setup the environment on the remote host before | |
|
218 | executing :command:`ipengine`. | |
|
219 | ||
|
220 | For a detailed options list: | |
|
221 | ||
|
222 | .. sourcecode:: bash | |
|
223 | ||
|
224 | $ ipcluster ssh -h | |
|
225 | ||
|
226 | Current limitations of the SSH mode of :command:`ipcluster` are: | |
|
227 | ||
|
228 | * Untested on Windows. Would require a working :command:`ssh` on Windows. | |
|
229 | Also, we are using shell scripts to setup and execute commands on remote | |
|
230 | hosts. | |
|
231 | * :command:`ipcontroller` is started on localhost, with no option to start it | |
|
232 | on a remote node. | |
|
233 | ||
|
162 | 234 | Using the :command:`ipcontroller` and :command:`ipengine` commands |
|
163 | 235 | ================================================================== |
|
164 | 236 | |
@@ -249,3 +321,4 b' the log files to us will often help us to debug any problems.' | |||
|
249 | 321 | |
|
250 | 322 | |
|
251 | 323 | .. [PBS] Portable Batch System. http://www.openpbs.org/ |
|
324 | .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent |
General Comments 0
You need to be logged in to leave comments.
Login now