diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 87a84a0..2f8ab1e 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -60,7 +60,7 @@ from IPython.utils.text import EvalFormatter from IPython.utils.traitlets import ( Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, ) -from IPython.utils.path import get_ipython_module_path +from IPython.utils.path import get_ipython_module_path, get_home_dir from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError from .win32support import forward_read_events @@ -217,16 +217,12 @@ class BaseLauncher(LoggingConfigurable): class ClusterAppMixin(HasTraits): """MixIn for cluster args as traits""" - cluster_args = List([]) profile_dir=Unicode('') cluster_id=Unicode('') - def _profile_dir_changed(self, name, old, new): - self.cluster_args = [] - if self.profile_dir: - self.cluster_args.extend(['--profile-dir', self.profile_dir]) - if self.cluster_id: - self.cluster_args.extend(['--cluster-id', self.cluster_id]) - _cluster_id_changed = _profile_dir_changed + + @property + def cluster_args(self): + return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id] class ControllerMixin(ClusterAppMixin): controller_cmd = List(ipcontroller_cmd_argv, config=True, @@ -243,6 +239,7 @@ class EngineMixin(ClusterAppMixin): help="command-line arguments to pass to ipengine" ) + #----------------------------------------------------------------------------- # Local process launchers #----------------------------------------------------------------------------- @@ -563,6 +560,8 @@ class SSHLauncher(LocalProcessLauncher): help="command for starting ssh") ssh_args = List(['-tt'], config=True, help="args to pass to ssh") + scp_cmd = List(['scp'], config=True, + help="command for sending files") program = List(['date'], help="Program to launch via ssh") program_args = List([], @@ -573,6 +572,10 @@ class SSHLauncher(LocalProcessLauncher): help="username for ssh") location = Unicode('', config=True, help="user@hostname location for ssh in one setting") + to_fetch = List([], config=True, + help="List of (remote, local) files to fetch after starting") + to_send = List([], config=True, + help="List of (local, remote) files to send before starting") def _hostname_changed(self, name, old, new): if self.user: @@ -586,14 +589,57 @@ class SSHLauncher(LocalProcessLauncher): def find_args(self): return self.ssh_cmd + self.ssh_args + [self.location] + \ self.program + self.program_args + + def _send_file(self, local, remote): + """send a single file""" + remote = "%s:%s" % (self.location, remote) + for i in range(10): + if not os.path.exists(local): + self.log.debug("waiting for %s" % local) + time.sleep(1) + else: + break + self.log.info("sending %s to %s", local, remote) + check_output(self.scp_cmd + [local, remote]) + + def send_files(self): + """send our files""" + if not self.send_files: + return + for local_file, remote_file in self.to_send: + self._send_file(local_file, remote_file) + + def _fetch_file(self, remote, local): + """send a single file""" + full_remote = "%s:%s" % (self.location, remote) + self.log.info("fetching %s from %s", local, full_remote) + for i in range(10): + # wait up to 10s for remote file to exist + check = check_output(self.ssh_cmd + self.ssh_args + \ + [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"]) + check = check.strip() + if check.strip() == 'no': + time.sleep(1) + elif check.strip() == 'yes': + break + check_output(self.scp_cmd + [full_remote, local]) + + def fetch_files(self): + """override in subclass""" + if not self.fetch_files: + return + for remote_file, local_file in self.to_fetch: + self._fetch_file(remote_file, local_file) def start(self, hostname=None, user=None): if hostname is not None: self.hostname = hostname if user is not None: self.user = user - - return super(SSHLauncher, self).start() + + self.send_files() + super(SSHLauncher, self).start() + self.fetch_files() def signal(self, sig): if self.state == 'running': @@ -601,27 +647,67 @@ class SSHLauncher(LocalProcessLauncher): self.process.stdin.write('~.') self.process.stdin.flush() +class SSHClusterLauncher(SSHLauncher): + + remote_profile_dir = Unicode('', config=True, + help="""The remote profile_dir to use. + + If not specified, use calling profile, stripping out possible leading homedir. + """) + + def _remote_profie_dir_default(self): + """turns /home/you/.ipython/profile_foo into .ipython/profile_foo + """ + home = get_home_dir() + if not home.endswith('/'): + home = home+'/' + + if self.profile_dir.startswith(home): + return self.profile_dir[len(home):] + else: + return self.profile_dir + + def _cluster_id_changed(self, name, old, new): + if new: + raise ValueError("cluster id not supported by SSH launchers") + + @property + def cluster_args(self): + return ['--profile-dir', self.remote_profile_dir] - -class SSHControllerLauncher(SSHLauncher, ControllerMixin): +class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin): # alias back to *non-configurable* program[_args] for use in find_args() # this way all Controller/EngineSetLaunchers have the same form, rather # than *some* having `program_args` and others `controller_args` + + def _controller_cmd_default(self): + return ['ipcontroller'] + @property def program(self): return self.controller_cmd - + @property def program_args(self): return self.cluster_args + self.controller_args + def _to_fetch_default(self): + return [ + (os.path.join(self.remote_profile_dir, 'security', cf), + os.path.join(self.profile_dir, 'security', cf),) + for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json') + ] -class SSHEngineLauncher(SSHLauncher, EngineMixin): +class SSHEngineLauncher(SSHClusterLauncher, EngineMixin): # alias back to *non-configurable* program[_args] for use in find_args() # this way all Controller/EngineSetLaunchers have the same form, rather # than *some* having `program_args` and others `controller_args` + + def _engine_cmd_default(self): + return ['ipengine'] + @property def program(self): return self.engine_cmd @@ -629,6 +715,13 @@ class SSHEngineLauncher(SSHLauncher, EngineMixin): @property def program_args(self): return self.cluster_args + self.engine_args + + def _to_send_default(self): + return [ + (os.path.join(self.profile_dir, 'security', cf), + os.path.join(self.remote_profile_dir, 'security', cf)) + for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json') + ] class SSHEngineSetLauncher(LocalEngineSetLauncher): @@ -669,6 +762,9 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log, profile_dir=self.profile_dir, cluster_id=self.cluster_id, ) + if i > 0: + # only send files for the first engine on each host + el.to_send = [] # Copy the engine args over to each engine launcher. el.engine_cmd = self.engine_cmd @@ -681,6 +777,35 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): return dlist +class SSHProxyEngineSetLauncher(SSHClusterLauncher): + """Launcher for calling + `ipcluster engines` on a remote machine. + + Requires that remote profile is already configured. + """ + + n = Integer() + ipcluster_cmd = List(['ipcluster'], config=True) + + @property + def program(self): + return self.ipcluster_cmd + ['engines'] + + @property + def program_args(self): + return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir] + + def _to_send_default(self): + return [ + (os.path.join(self.profile_dir, 'security', cf), + os.path.join(self.remote_profile_dir, 'security', cf)) + for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json') + ] + + def start(self, n): + self.n = n + super(SSHProxyEngineSetLauncher, self).start() + #----------------------------------------------------------------------------- # Windows HPC Server 2008 scheduler launchers