ipclusterapp.py
460 lines
| 17.4 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2304 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The ipcluster application. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import logging | ||||
import os | ||||
import signal | ||||
bgranger
|
r2318 | if os.name=='posix': | ||
from twisted.scripts._twistd_unix import daemonize | ||||
Brian Granger
|
r2313 | |||
Brian Granger
|
r2304 | from IPython.core import release | ||
Fernando Perez
|
r2429 | from IPython.external.argparse import ArgumentParser | ||
Brian Granger
|
r2304 | from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault | ||
from IPython.utils.importstring import import_item | ||||
from IPython.kernel.clusterdir import ( | ||||
Brian Granger
|
r2313 | ApplicationWithClusterDir, ClusterDirError, PIDFileError | ||
Brian Granger
|
r2304 | ) | ||
bgranger
|
r2335 | from twisted.internet import reactor, defer | ||
from twisted.python import log, failure | ||||
Brian Granger
|
r2304 | |||
Brian Granger
|
r2323 | |||
Brian Granger
|
r2304 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2323 | # The ipcluster application | ||
Brian Granger
|
r2304 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2323 | # Exit codes for ipcluster | ||
Brian Granger
|
r2304 | |||
Brian Granger
|
r2323 | # This will be the exit code if the ipcluster appears to be running because | ||
# a .pid file exists | ||||
ALREADY_STARTED = 10 | ||||
# This will be the exit code if ipcluster stop is run, but there is not .pid | ||||
# file to be found. | ||||
ALREADY_STOPPED = 11 | ||||
Brian Granger
|
r2304 | |||
class IPClusterCLLoader(ArgParseConfigLoader): | ||||
Fernando Perez
|
r2429 | def _add_other_arguments(self): | ||
Brian Granger
|
r2304 | # This has all the common options that all subcommands use | ||
Fernando Perez
|
r2429 | parent_parser1 = ArgumentParser(add_help=False, | ||
argument_default=NoConfigDefault) | ||||
Brian Granger
|
r2314 | parent_parser1.add_argument('--ipython-dir', | ||
Brian Granger
|
r2328 | dest='Global.ipython_dir',type=unicode, | ||
Brian Granger
|
r2322 | help='Set to override default location of Global.ipython_dir.', | ||
metavar='Global.ipython_dir') | ||||
Brian Granger
|
r2314 | parent_parser1.add_argument('--log-level', | ||
Brian Granger
|
r2304 | dest="Global.log_level",type=int, | ||
help='Set the log level (0,10,20,30,40,50). Default is 30.', | ||||
metavar='Global.log_level') | ||||
# This has all the common options that other subcommands use | ||||
Fernando Perez
|
r2429 | parent_parser2 = ArgumentParser(add_help=False, | ||
argument_default=NoConfigDefault) | ||||
Brian Granger
|
r2314 | parent_parser2.add_argument('-p','--profile', | ||
Brian Granger
|
r2328 | dest='Global.profile',type=unicode, | ||
Brian Granger
|
r2304 | help='The string name of the profile to be used. This determines ' | ||
'the name of the cluster dir as: cluster_<profile>. The default profile ' | ||||
'is named "default". The cluster directory is resolve this way ' | ||||
'if the --cluster-dir option is not used.', | ||||
metavar='Global.profile') | ||||
Brian Granger
|
r2314 | parent_parser2.add_argument('--cluster-dir', | ||
Brian Granger
|
r2328 | dest='Global.cluster_dir',type=unicode, | ||
Brian Granger
|
r2304 | help='Set the cluster dir. This overrides the logic used by the ' | ||
'--profile option.', | ||||
Brian Granger
|
r2330 | metavar='Global.cluster_dir'), | ||
Brian Granger
|
r2336 | parent_parser2.add_argument('--work-dir', | ||
dest='Global.work_dir',type=unicode, | ||||
Brian Granger
|
r2330 | help='Set the working dir for the process.', | ||
Brian Granger
|
r2336 | metavar='Global.work_dir') | ||
Brian Granger
|
r2304 | parent_parser2.add_argument('--log-to-file', | ||
action='store_true', dest='Global.log_to_file', | ||||
help='Log to a file in the log directory (default is stdout)' | ||||
) | ||||
subparsers = self.parser.add_subparsers( | ||||
dest='Global.subcommand', | ||||
title='ipcluster subcommands', | ||||
description='ipcluster has a variety of subcommands. ' | ||||
'The general way of running ipcluster is "ipcluster <cmd> ' | ||||
' [options]""', | ||||
help='For more help, type "ipcluster <cmd> -h"') | ||||
parser_list = subparsers.add_parser( | ||||
'list', | ||||
Brian Granger
|
r2322 | help='List all clusters in cwd and ipython_dir.', | ||
Brian Granger
|
r2304 | parents=[parent_parser1] | ||
) | ||||
parser_create = subparsers.add_parser( | ||||
'create', | ||||
help='Create a new cluster directory.', | ||||
parents=[parent_parser1, parent_parser2] | ||||
) | ||||
parser_create.add_argument( | ||||
'--reset-config', | ||||
dest='Global.reset_config', action='store_true', | ||||
default=NoConfigDefault, | ||||
help='Recopy the default config files to the cluster directory. ' | ||||
'You will loose any modifications you have made to these files.' | ||||
) | ||||
parser_start = subparsers.add_parser( | ||||
'start', | ||||
help='Start a cluster.', | ||||
parents=[parent_parser1, parent_parser2] | ||||
) | ||||
parser_start.add_argument( | ||||
'-n', '--number', | ||||
type=int, dest='Global.n', | ||||
help='The number of engines to start.', | ||||
metavar='Global.n' | ||||
) | ||||
Brian Granger
|
r2314 | parser_start.add_argument('--clean-logs', | ||
Brian Granger
|
r2306 | dest='Global.clean_logs', action='store_true', | ||
help='Delete old log flies before starting.', | ||||
) | ||||
Brian Granger
|
r2314 | parser_start.add_argument('--no-clean-logs', | ||
Brian Granger
|
r2306 | dest='Global.clean_logs', action='store_false', | ||
help="Don't delete old log flies before starting.", | ||||
) | ||||
Brian Granger
|
r2314 | parser_start.add_argument('--daemon', | ||
Brian Granger
|
r2313 | dest='Global.daemonize', action='store_true', | ||
help='Daemonize the ipcluster program. This implies --log-to-file', | ||||
) | ||||
Brian Granger
|
r2322 | parser_start.add_argument('--no-daemon', | ||
Brian Granger
|
r2313 | dest='Global.daemonize', action='store_false', | ||
help="Dont't daemonize the ipcluster program.", | ||||
) | ||||
parser_start = subparsers.add_parser( | ||||
'stop', | ||||
help='Stop a cluster.', | ||||
parents=[parent_parser1, parent_parser2] | ||||
) | ||||
Brian Granger
|
r2323 | parser_start.add_argument('--signal', | ||
dest='Global.signal', type=int, | ||||
Brian Granger
|
r2313 | help="The signal number to use in stopping the cluster (default=2).", | ||
Brian Granger
|
r2323 | metavar="Global.signal", | ||
Brian Granger
|
r2313 | ) | ||
Brian Granger
|
r2304 | |||
Brian Granger
|
r2323 | |||
Brian Granger
|
r2328 | default_config_file_name = u'ipcluster_config.py' | ||
Brian Granger
|
r2304 | |||
Brian Granger
|
r2343 | _description = """Start an IPython cluster for parallel computing.\n\n | ||
An IPython cluster consists of 1 controller and 1 or more engines. | ||||
This command automates the startup of these processes using a wide | ||||
range of startup methods (SSH, local processes, PBS, mpiexec, | ||||
Windows HPC Server 2008). To start a cluster with 4 engines on your | ||||
local host simply do "ipcluster start -n 4". For more complex usage | ||||
you will typically do "ipcluster create -p mycluster", then edit | ||||
configuration files, followed by "ipcluster start -p mycluster -n 4". | ||||
""" | ||||
Brian Granger
|
r2304 | class IPClusterApp(ApplicationWithClusterDir): | ||
Brian Granger
|
r2328 | name = u'ipcluster' | ||
Brian Granger
|
r2343 | description = _description | ||
Brian Granger
|
r2304 | config_file_name = default_config_file_name | ||
default_log_level = logging.INFO | ||||
auto_create_cluster_dir = False | ||||
def create_default_config(self): | ||||
super(IPClusterApp, self).create_default_config() | ||||
self.default_config.Global.controller_launcher = \ | ||||
'IPython.kernel.launcher.LocalControllerLauncher' | ||||
self.default_config.Global.engine_launcher = \ | ||||
'IPython.kernel.launcher.LocalEngineSetLauncher' | ||||
self.default_config.Global.n = 2 | ||||
self.default_config.Global.reset_config = False | ||||
Brian Granger
|
r2306 | self.default_config.Global.clean_logs = True | ||
Brian Granger
|
r2323 | self.default_config.Global.signal = 2 | ||
Brian Granger
|
r2313 | self.default_config.Global.daemonize = False | ||
Brian Granger
|
r2304 | |||
def create_command_line_config(self): | ||||
"""Create and return a command line config loader.""" | ||||
return IPClusterCLLoader( | ||||
description=self.description, | ||||
version=release.version | ||||
) | ||||
def find_resources(self): | ||||
subcommand = self.command_line_config.Global.subcommand | ||||
if subcommand=='list': | ||||
self.list_cluster_dirs() | ||||
# Exit immediately because there is nothing left to do. | ||||
self.exit() | ||||
elif subcommand=='create': | ||||
self.auto_create_cluster_dir = True | ||||
super(IPClusterApp, self).find_resources() | ||||
Brian Granger
|
r2313 | elif subcommand=='start' or subcommand=='stop': | ||
Brian Granger
|
r2342 | self.auto_create_cluster_dir = True | ||
Brian Granger
|
r2304 | try: | ||
super(IPClusterApp, self).find_resources() | ||||
except ClusterDirError: | ||||
raise ClusterDirError( | ||||
"Could not find a cluster directory. A cluster dir must " | ||||
"be created before running 'ipcluster start'. Do " | ||||
"'ipcluster create -h' or 'ipcluster list -h' for more " | ||||
"information about creating and listing cluster dirs." | ||||
) | ||||
Brian Granger
|
r2306 | |||
def list_cluster_dirs(self): | ||||
# Find the search paths | ||||
Brian Granger
|
r2322 | cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') | ||
Brian Granger
|
r2304 | if cluster_dir_paths: | ||
cluster_dir_paths = cluster_dir_paths.split(':') | ||||
else: | ||||
cluster_dir_paths = [] | ||||
Brian Granger
|
r2306 | try: | ||
Brian Granger
|
r2322 | ipython_dir = self.command_line_config.Global.ipython_dir | ||
Brian Granger
|
r2306 | except AttributeError: | ||
Brian Granger
|
r2322 | ipython_dir = self.default_config.Global.ipython_dir | ||
paths = [os.getcwd(), ipython_dir] + \ | ||||
Brian Granger
|
r2304 | cluster_dir_paths | ||
Brian Granger
|
r2306 | paths = list(set(paths)) | ||
Brian Granger
|
r2304 | self.log.info('Searching for cluster dirs in paths: %r' % paths) | ||
for path in paths: | ||||
files = os.listdir(path) | ||||
for f in files: | ||||
full_path = os.path.join(path, f) | ||||
if os.path.isdir(full_path) and f.startswith('cluster_'): | ||||
profile = full_path.split('_')[-1] | ||||
bgranger
|
r2338 | start_cmd = 'ipcluster start -p %s -n 4' % profile | ||
Brian Granger
|
r2304 | print start_cmd + " ==> " + full_path | ||
Brian Granger
|
r2323 | def pre_construct(self): | ||
Brian Granger
|
r2336 | # IPClusterApp.pre_construct() is where we cd to the working directory. | ||
Brian Granger
|
r2323 | super(IPClusterApp, self).pre_construct() | ||
config = self.master_config | ||||
try: | ||||
daemon = config.Global.daemonize | ||||
if daemon: | ||||
config.Global.log_to_file = True | ||||
except AttributeError: | ||||
pass | ||||
def construct(self): | ||||
config = self.master_config | ||||
Brian Granger
|
r2342 | subcmd = config.Global.subcommand | ||
reset = config.Global.reset_config | ||||
if subcmd == 'list': | ||||
return | ||||
if subcmd == 'create': | ||||
Brian Granger
|
r2323 | self.log.info('Copying default config files to cluster directory ' | ||
Brian Granger
|
r2342 | '[overwrite=%r]' % (reset,)) | ||
self.cluster_dir_obj.copy_all_config_files(overwrite=reset) | ||||
if subcmd =='start': | ||||
self.cluster_dir_obj.copy_all_config_files(overwrite=False) | ||||
Brian Granger
|
r2323 | self.start_logging() | ||
reactor.callWhenRunning(self.start_launchers) | ||||
Brian Granger
|
r2304 | def start_launchers(self): | ||
config = self.master_config | ||||
Brian Granger
|
r2336 | # Create the launchers. In both bases, we set the work_dir of | ||
# the launcher to the cluster_dir. This is where the launcher's | ||||
# subprocesses will be launched. It is not where the controller | ||||
# and engine will be launched. | ||||
Brian Granger
|
r2304 | el_class = import_item(config.Global.engine_launcher) | ||
self.engine_launcher = el_class( | ||||
Brian Granger
|
r2336 | work_dir=self.cluster_dir, config=config | ||
Brian Granger
|
r2304 | ) | ||
cl_class = import_item(config.Global.controller_launcher) | ||||
self.controller_launcher = cl_class( | ||||
Brian Granger
|
r2336 | work_dir=self.cluster_dir, config=config | ||
Brian Granger
|
r2304 | ) | ||
# Setup signals | ||||
bgranger
|
r2335 | signal.signal(signal.SIGINT, self.sigint_handler) | ||
Brian Granger
|
r2304 | |||
bgranger
|
r2335 | # Setup the observing of stopping. If the controller dies, shut | ||
# everything down as that will be completely fatal for the engines. | ||||
Brian Granger
|
r2304 | d1 = self.controller_launcher.observe_stop() | ||
bgranger
|
r2335 | d1.addCallback(self.stop_launchers) | ||
# But, we don't monitor the stopping of engines. An engine dying | ||||
# is just fine and in principle a user could start a new engine. | ||||
# Also, if we did monitor engine stopping, it is difficult to | ||||
# know what to do when only some engines die. Currently, the | ||||
# observing of engine stopping is inconsistent. Some launchers | ||||
# might trigger on a single engine stopping, other wait until | ||||
# all stop. TODO: think more about how to handle this. | ||||
Brian Granger
|
r2304 | |||
# Start the controller and engines | ||||
bgranger
|
r2335 | self._stopping = False # Make sure stop_launchers is not called 2x. | ||
d = self.start_controller() | ||||
d.addCallback(self.start_engines) | ||||
d.addCallback(self.startup_message) | ||||
# If the controller or engines fail to start, stop everything | ||||
d.addErrback(self.stop_launchers) | ||||
return d | ||||
def startup_message(self, r=None): | ||||
log.msg("IPython cluster: started") | ||||
return r | ||||
def start_controller(self, r=None): | ||||
# log.msg("In start_controller") | ||||
config = self.master_config | ||||
Brian Granger
|
r2304 | d = self.controller_launcher.start( | ||
bgranger
|
r2334 | cluster_dir=config.Global.cluster_dir | ||
Brian Granger
|
r2304 | ) | ||
bgranger
|
r2335 | return d | ||
def start_engines(self, r=None): | ||||
# log.msg("In start_engines") | ||||
Brian Granger
|
r2304 | config = self.master_config | ||
d = self.engine_launcher.start( | ||||
config.Global.n, | ||||
bgranger
|
r2334 | cluster_dir=config.Global.cluster_dir | ||
Brian Granger
|
r2304 | ) | ||
return d | ||||
bgranger
|
r2335 | def stop_controller(self, r=None): | ||
# log.msg("In stop_controller") | ||||
if self.controller_launcher.running: | ||||
d = self.controller_launcher.stop() | ||||
d.addErrback(self.log_err) | ||||
return d | ||||
else: | ||||
return defer.succeed(None) | ||||
def stop_engines(self, r=None): | ||||
# log.msg("In stop_engines") | ||||
if self.engine_launcher.running: | ||||
d = self.engine_launcher.stop() | ||||
d.addErrback(self.log_err) | ||||
return d | ||||
else: | ||||
return defer.succeed(None) | ||||
Brian Granger
|
r2304 | |||
bgranger
|
r2335 | def log_err(self, f): | ||
log.msg(f.getTraceback()) | ||||
return None | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self._stopping = True | ||||
if isinstance(r, failure.Failure): | ||||
log.msg('Unexpected error in ipcluster:') | ||||
log.msg(r.getTraceback()) | ||||
log.msg("IPython cluster: stopping") | ||||
Fernando Perez
|
r2429 | self.stop_engines() | ||
self.stop_controller() | ||||
bgranger
|
r2335 | # Wait a few seconds to let things shut down. | ||
bgranger
|
r2338 | reactor.callLater(4.0, reactor.stop) | ||
bgranger
|
r2335 | |||
def sigint_handler(self, signum, frame): | ||||
self.stop_launchers() | ||||
Brian Granger
|
r2306 | def start_logging(self): | ||
bgranger
|
r2335 | # Remove old log files of the controller and engine | ||
Brian Granger
|
r2306 | if self.master_config.Global.clean_logs: | ||
log_dir = self.master_config.Global.log_dir | ||||
for f in os.listdir(log_dir): | ||||
bgranger
|
r2335 | if f.startswith('ipengine' + '-'): | ||
if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | ||||
os.remove(os.path.join(log_dir, f)) | ||||
if f.startswith('ipcontroller' + '-'): | ||||
if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'): | ||||
os.remove(os.path.join(log_dir, f)) | ||||
# This will remote old log files for ipcluster itself | ||||
Brian Granger
|
r2306 | super(IPClusterApp, self).start_logging() | ||
Brian Granger
|
r2304 | def start_app(self): | ||
Brian Granger
|
r2313 | """Start the application, depending on what subcommand is used.""" | ||
Brian Granger
|
r2323 | subcmd = self.master_config.Global.subcommand | ||
Brian Granger
|
r2313 | if subcmd=='create' or subcmd=='list': | ||
Brian Granger
|
r2304 | return | ||
Brian Granger
|
r2313 | elif subcmd=='start': | ||
Brian Granger
|
r2323 | self.start_app_start() | ||
Brian Granger
|
r2313 | elif subcmd=='stop': | ||
Brian Granger
|
r2323 | self.start_app_stop() | ||
def start_app_start(self): | ||||
"""Start the app for the start subcommand.""" | ||||
config = self.master_config | ||||
# First see if the cluster is already running | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
pass | ||||
else: | ||||
self.log.critical( | ||||
'Cluster is already running with [pid=%s]. ' | ||||
'use "ipcluster stop" to stop the cluster.' % pid | ||||
) | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STARTED) | ||||
# Now log and daemonize | ||||
self.log.info( | ||||
'Starting ipcluster with [daemon=%r]' % config.Global.daemonize | ||||
) | ||||
Brian Granger
|
r2342 | # TODO: Get daemonize working on Windows or as a Windows Server. | ||
Brian Granger
|
r2323 | if config.Global.daemonize: | ||
if os.name=='posix': | ||||
daemonize() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
self.write_pid_file() | ||||
reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file) | ||||
reactor.run() | ||||
def start_app_stop(self): | ||||
"""Start the app for the stop subcommand.""" | ||||
config = self.master_config | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
self.log.critical( | ||||
'Problem reading pid file, cluster is probably not running.' | ||||
Brian Granger
|
r2313 | ) | ||
Brian Granger
|
r2323 | # Here I exit with a unusual exit status that other processes | ||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STOPPED) | ||||
Brian Granger
|
r2331 | else: | ||
if os.name=='posix': | ||||
sig = config.Global.signal | ||||
self.log.info( | ||||
"Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | ||||
) | ||||
os.kill(pid, sig) | ||||
Brian Granger
|
r2332 | elif os.name=='nt': | ||
Brian Granger
|
r2331 | # As of right now, we don't support daemonize on Windows, so | ||
# stop will not do anything. Minimally, it should clean up the | ||||
# old .pid files. | ||||
self.remove_pid_file() | ||||
Brian Granger
|
r2304 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython cluster.""" | ||||
app = IPClusterApp() | ||||
app.start() | ||||
if __name__ == '__main__': | ||||
Brian Granger
|
r2306 | launch_new_instance() | ||