From 65f8e24828cac6e321f0f6527b9f910ba22e26bf 2011-04-08 00:38:17 From: MinRK Date: 2011-04-08 00:38:17 Subject: [PATCH] add ipcluster engines; fix ssh process shutdown --- diff --git a/IPython/zmq/parallel/ipclusterapp.py b/IPython/zmq/parallel/ipclusterapp.py index f91929f..aa86b84 100755 --- a/IPython/zmq/parallel/ipclusterapp.py +++ b/IPython/zmq/parallel/ipclusterapp.py @@ -63,6 +63,10 @@ ALREADY_STARTED = 10 # file to be found. ALREADY_STOPPED = 11 +# This will be the exit code if ipcluster engines is run, but there is not .pid +# file to be found. +NO_CLUSTER = 12 + #----------------------------------------------------------------------------- # Command line options @@ -164,6 +168,7 @@ class IPClusterAppConfigLoader(ClusterDirConfigLoader): otherwise use the '--cluster-dir' option. """ ) + paa = parser_start.add_argument paa('-n', '--number', type=int, dest='Global.n', @@ -205,7 +210,36 @@ class IPClusterAppConfigLoader(ClusterDirConfigLoader): dest='Global.signal', type=int, help="The signal number to use in stopping the cluster (default=2).", metavar="Global.signal") - + + # the "engines" subcommand parser + parser_engines = subparsers.add_parser( + 'engines', + parents=[parent_parser1, parent_parser2], + argument_default=SUPPRESS, + help="Attach some engines to an existing controller or cluster.", + description= + """Start one or more engines to connect to an existing Cluster + by profile name or cluster directory. + Cluster directories contain configuration, log and + security related files and are named using the convention + 'cluster_' and should be creating using the 'start' + subcommand of 'ipcluster'. If your cluster directory is in + the cwd or the ipython directory, you can simply refer to it + using its profile name, 'ipclusterz engines -n 4 -p `, + otherwise use the '--cluster-dir' option. + """ + ) + paa = parser_engines.add_argument + paa('-n', '--number', + type=int, dest='Global.n', + help='The number of engines to start.', + metavar='Global.n') + paa('--daemon', + dest='Global.daemonize', action='store_true', + help='Daemonize the ipcluster program. This implies --log-to-file') + paa('--no-daemon', + dest='Global.daemonize', action='store_false', + help="Dont't daemonize the ipcluster program.") #----------------------------------------------------------------------------- # Main application @@ -232,7 +266,7 @@ class IPClusterApp(ApplicationWithClusterDir): self.default_config.Global.delay = 1 self.default_config.Global.reset_config = False self.default_config.Global.clean_logs = True - self.default_config.Global.signal = 2 + self.default_config.Global.signal = signal.SIGINT self.default_config.Global.daemonize = False def find_resources(self): @@ -255,6 +289,17 @@ class IPClusterApp(ApplicationWithClusterDir): "'ipclusterz create -h' or 'ipclusterz list -h' for more " "information about creating and listing cluster dirs." ) + elif subcommand=='engines': + self.auto_create_cluster_dir = False + try: + super(IPClusterApp, self).find_resources() + except ClusterDirError: + raise ClusterDirError( + "Could not find a cluster directory. A cluster dir must " + "be created before running 'ipclusterz start'. Do " + "'ipclusterz create -h' or 'ipclusterz list -h' for more " + "information about creating and listing cluster dirs." + ) def list_cluster_dirs(self): # Find the search paths @@ -309,50 +354,54 @@ class IPClusterApp(ApplicationWithClusterDir): # reactor.callWhenRunning(self.start_launchers) dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) dc.start() + if subcmd == 'engines': + self.start_logging() + self.loop = ioloop.IOLoop.instance() + # reactor.callWhenRunning(self.start_launchers) + engine_only = lambda : self.start_launchers(controller=False) + dc = ioloop.DelayedCallback(engine_only, 0, self.loop) + dc.start() - def start_launchers(self): + def start_launchers(self, controller=True): config = self.master_config # Create the launchers. In both bases, we set the work_dir of # the launcher to the cluster_dir. This is where the launcher's # subprocesses will be launched. It is not where the controller # and engine will be launched. + if controller: + cl_class = import_item(config.Global.controller_launcher) + self.controller_launcher = cl_class( + work_dir=self.cluster_dir, config=config, + logname=self.log.name + ) + # Setup the observing of stopping. If the controller dies, shut + # everything down as that will be completely fatal for the engines. + self.controller_launcher.on_stop(self.stop_launchers) + # But, we don't monitor the stopping of engines. An engine dying + # is just fine and in principle a user could start a new engine. + # Also, if we did monitor engine stopping, it is difficult to + # know what to do when only some engines die. Currently, the + # observing of engine stopping is inconsistent. Some launchers + # might trigger on a single engine stopping, other wait until + # all stop. TODO: think more about how to handle this. + + el_class = import_item(config.Global.engine_launcher) self.engine_launcher = el_class( work_dir=self.cluster_dir, config=config, logname=self.log.name ) - cl_class = import_item(config.Global.controller_launcher) - self.controller_launcher = cl_class( - work_dir=self.cluster_dir, config=config, - logname=self.log.name - ) # Setup signals signal.signal(signal.SIGINT, self.sigint_handler) - # Setup the observing of stopping. If the controller dies, shut - # everything down as that will be completely fatal for the engines. - self.controller_launcher.on_stop(self.stop_launchers) - # d1.addCallback(self.stop_launchers) - # But, we don't monitor the stopping of engines. An engine dying - # is just fine and in principle a user could start a new engine. - # Also, if we did monitor engine stopping, it is difficult to - # know what to do when only some engines die. Currently, the - # observing of engine stopping is inconsistent. Some launchers - # might trigger on a single engine stopping, other wait until - # all stop. TODO: think more about how to handle this. - # Start the controller and engines self._stopping = False # Make sure stop_launchers is not called 2x. - d = self.start_controller() - dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop) + if controller: + self.start_controller() + dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop) dc.start() self.startup_message() - # d.addCallback(self.start_engines) - # d.addCallback(self.startup_message) - # If the controller or engines fail to start, stop everything - # d.addErrback(self.stop_launchers) - return d def startup_message(self, r=None): self.log.info("IPython cluster: started") @@ -369,6 +418,7 @@ class IPClusterApp(ApplicationWithClusterDir): def start_engines(self, r=None): # self.log.info("In start_engines") config = self.master_config + d = self.engine_launcher.start( config.Global.n, cluster_dir=config.Global.cluster_dir @@ -432,6 +482,8 @@ class IPClusterApp(ApplicationWithClusterDir): self.start_app_start() elif subcmd=='stop': self.start_app_stop() + elif subcmd=='engines': + self.start_app_engines() def start_app_start(self): """Start the app for the start subcommand.""" @@ -468,6 +520,29 @@ class IPClusterApp(ApplicationWithClusterDir): self.log.info("stopping...") self.remove_pid_file() + def start_app_engines(self): + """Start the app for the start subcommand.""" + config = self.master_config + # First see if the cluster is already running + + # Now log and daemonize + self.log.info( + 'Starting engines with [daemon=%r]' % config.Global.daemonize + ) + # TODO: Get daemonize working on Windows or as a Windows Server. + if config.Global.daemonize: + if os.name=='posix': + from twisted.scripts._twistd_unix import daemonize + daemonize() + + # Now write the new pid file AFTER our new forked pid is active. + # self.write_pid_file() + try: + self.loop.start() + except: + self.log.fatal("stopping...") + # self.remove_pid_file() + def start_app_stop(self): """Start the app for the stop subcommand.""" config = self.master_config diff --git a/IPython/zmq/parallel/launcher.py b/IPython/zmq/parallel/launcher.py index de23086..5189706 100644 --- a/IPython/zmq/parallel/launcher.py +++ b/IPython/zmq/parallel/launcher.py @@ -28,12 +28,12 @@ except ImportError: from subprocess import Popen, PIPE, STDOUT try: - from subprocess import check_open + from subprocess import check_output except ImportError: # pre-2.7: from StringIO import StringIO - def check_open(*args, **kwargs): + def check_output(*args, **kwargs): sio = StringIO() kwargs.update(dict(stdout=PIPE, stderr=STDOUT)) p = Popen(*args, **kwargs) @@ -495,7 +495,7 @@ class SSHLauncher(LocalProcessLauncher): """ ssh_cmd = List(['ssh'], config=True) - ssh_args = List([], config=True) + ssh_args = List(['-tt'], config=True) program = List(['date'], config=True) program_args = List([], config=True) hostname = Str('', config=True) @@ -513,11 +513,20 @@ class SSHLauncher(LocalProcessLauncher): self.program + self.program_args def start(self, cluster_dir, hostname=None, user=None): + print self.config if hostname is not None: self.hostname = hostname if user is not None: self.user = user + print (self.location, hostname, user) return super(SSHLauncher, self).start() + + def signal(self, sig): + if self.state == 'running': + # send escaped ssh connection-closer + self.process.stdin.write('~.') + self.process.stdin.flush() + class SSHControllerLauncher(SSHLauncher): @@ -568,9 +577,9 @@ class WindowsHPCLauncher(BaseLauncher): scheduler = Str('', config=True) job_cmd = Str(find_job_cmd(), config=True) - def __init__(self, work_dir=u'.', config=None): + def __init__(self, work_dir=u'.', config=None, **kwargs): super(WindowsHPCLauncher, self).__init__( - work_dir=work_dir, config=config + work_dir=work_dir, config=config, **kwargs ) @property @@ -730,9 +739,9 @@ class BatchSystemLauncher(BaseLauncher): # The full path to the instantiated batch script. batch_file = Unicode(u'') - def __init__(self, work_dir=u'.', config=None): + def __init__(self, work_dir=u'.', config=None, **kwargs): super(BatchSystemLauncher, self).__init__( - work_dir=work_dir, config=config + work_dir=work_dir, config=config, **kwargs ) self.batch_file = os.path.join(self.work_dir, self.batch_file_name) self.context = {} @@ -766,7 +775,7 @@ class BatchSystemLauncher(BaseLauncher): return job_id def stop(self): - output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT) + output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT) self.notify_stop(output) # Pass the output of the kill cmd return output