ipclusterapp.py
521 lines
| 17.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3605 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The ipcluster application. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import errno | ||
MinRK
|
r3605 | import logging | ||
import os | ||||
MinRK
|
r3631 | import re | ||
MinRK
|
r3605 | import signal | ||
MinRK
|
r3846 | from subprocess import check_call, CalledProcessError, PIPE | ||
MinRK
|
r3624 | import zmq | ||
MinRK
|
r3605 | from zmq.eventloop import ioloop | ||
MinRK
|
r3986 | from IPython.config.application import Application, boolean_flag | ||
MinRK
|
r3985 | from IPython.config.loader import Config | ||
MinRK
|
r3992 | from IPython.core.newapplication import BaseIPythonApplication, ProfileDir | ||
MinRK
|
r3605 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3988 | from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List | ||
MinRK
|
r3688 | |||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import ( | ||
MinRK
|
r3992 | BaseParallelApplication, | ||
MinRK
|
r3985 | PIDFileError, | ||
MinRK
|
r3986 | base_flags, base_aliases | ||
MinRK
|
r3605 | ) | ||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3672 | default_config_file_name = u'ipcluster_config.py' | ||
MinRK
|
r3605 | |||
MinRK
|
r3990 | _description = """Start an IPython cluster for parallel computing. | ||
MinRK
|
r3605 | |||
An IPython cluster consists of 1 controller and 1 or more engines. | ||||
This command automates the startup of these processes using a wide | ||||
range of startup methods (SSH, local processes, PBS, mpiexec, | ||||
Windows HPC Server 2008). To start a cluster with 4 engines on your | ||||
MinRK
|
r3985 | local host simply do 'ipcluster start n=4'. For more complex usage | ||
MinRK
|
r3986 | you will typically do 'ipcluster create profile=mycluster', then edit | ||
configuration files, followed by 'ipcluster start profile=mycluster n=4'. | ||||
MinRK
|
r3605 | """ | ||
# Exit codes for ipcluster | ||||
# This will be the exit code if the ipcluster appears to be running because | ||||
# a .pid file exists | ||||
ALREADY_STARTED = 10 | ||||
# This will be the exit code if ipcluster stop is run, but there is not .pid | ||||
# file to be found. | ||||
ALREADY_STOPPED = 11 | ||||
MinRK
|
r3615 | # This will be the exit code if ipcluster engines is run, but there is not .pid | ||
# file to be found. | ||||
NO_CLUSTER = 12 | ||||
MinRK
|
r3605 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3605 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3990 | start_help = """Start an IPython cluster for parallel computing | ||
MinRK
|
r3986 | Start an ipython cluster by its profile name or cluster | ||
directory. Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
'cluster_<profile>' and should be creating using the 'start' | ||||
subcommand of 'ipcluster'. If your cluster directory is in | ||||
the cwd or the ipython directory, you can simply refer to it | ||||
using its profile name, 'ipcluster start n=4 profile=<profile>`, | ||||
MinRK
|
r3992 | otherwise use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | stop_help = """Stop a running IPython cluster | ||
MinRK
|
r3986 | Stop a running ipython cluster by its profile name or cluster | ||
directory. Cluster directories are named using the convention | ||||
'cluster_<profile>'. If your cluster directory is in | ||||
the cwd or the ipython directory, you can simply refer to it | ||||
using its profile name, 'ipcluster stop profile=<profile>`, otherwise | ||||
MinRK
|
r3992 | use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | engines_help = """Start engines connected to an existing IPython cluster | ||
MinRK
|
r3986 | Start one or more engines to connect to an existing Cluster | ||
by profile name or cluster directory. | ||||
Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
'cluster_<profile>' and should be creating using the 'start' | ||||
subcommand of 'ipcluster'. If your cluster directory is in | ||||
the cwd or the ipython directory, you can simply refer to it | ||||
using its profile name, 'ipcluster engines n=4 profile=<profile>`, | ||||
MinRK
|
r3992 | otherwise use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | create_help = """Create an ipcluster profile by name | ||
MinRK
|
r3986 | Create an ipython cluster directory by its profile name or | ||
cluster directory path. Cluster directories contain | ||||
configuration, log and security related files and are named | ||||
using the convention 'cluster_<profile>'. By default they are | ||||
located in your ipython directory. Once created, you will | ||||
probably need to edit the configuration files in the cluster | ||||
directory to configure your cluster. Most users will create a | ||||
cluster directory by profile name, | ||||
`ipcluster create profile=mycluster`, which will put the directory | ||||
in `<ipython_dir>/cluster_mycluster`. | ||||
""" | ||||
MinRK
|
r3990 | list_help = """List available cluster profiles | ||
List all available clusters, by cluster directory, that can | ||||
MinRK
|
r3986 | be found in the current working directly or in the ipython | ||
directory. Cluster directories are named using the convention | ||||
'cluster_<profile>'. | ||||
""" | ||||
MinRK
|
r3985 | |||
MinRK
|
r3605 | |||
MinRK
|
r3986 | class IPClusterList(BaseIPythonApplication): | ||
name = u'ipcluster-list' | ||||
description = list_help | ||||
# empty aliases | ||||
aliases=Dict() | ||||
flags = Dict(base_flags) | ||||
def _log_level_default(self): | ||||
return 20 | ||||
MinRK
|
r3992 | def list_profile_dirs(self): | ||
MinRK
|
r3605 | # Find the search paths | ||
MinRK
|
r3992 | profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','') | ||
if profile_dir_paths: | ||||
profile_dir_paths = profile_dir_paths.split(':') | ||||
MinRK
|
r3605 | else: | ||
MinRK
|
r3992 | profile_dir_paths = [] | ||
MinRK
|
r3986 | |||
ipython_dir = self.ipython_dir | ||||
MinRK
|
r3992 | paths = [os.getcwd(), ipython_dir] + profile_dir_paths | ||
MinRK
|
r3605 | paths = list(set(paths)) | ||
MinRK
|
r3992 | self.log.info('Searching for cluster profiles in paths: %r' % paths) | ||
MinRK
|
r3605 | for path in paths: | ||
files = os.listdir(path) | ||||
for f in files: | ||||
full_path = os.path.join(path, f) | ||||
MinRK
|
r3992 | if os.path.isdir(full_path) and f.startswith('profile_') and \ | ||
os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')): | ||||
profile = f.split('_')[-1] | ||||
MinRK
|
r3986 | start_cmd = 'ipcluster start profile=%s n=4' % profile | ||
MinRK
|
r3605 | print start_cmd + " ==> " + full_path | ||
MinRK
|
r3986 | |||
def start(self): | ||||
MinRK
|
r3992 | self.list_profile_dirs() | ||
# `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | create_flags = {} | ||
create_flags.update(base_flags) | ||||
MinRK
|
r3992 | create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite', | ||
MinRK
|
r3986 | "reset config files to defaults", "leave existing config files")) | ||
MinRK
|
r3992 | class IPClusterCreate(BaseParallelApplication): | ||
name = u'ipcluster-create' | ||||
MinRK
|
r3986 | description = create_help | ||
MinRK
|
r3992 | auto_create = Bool(True) | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | |||
flags = Dict(create_flags) | ||||
MinRK
|
r3992 | aliases = Dict(dict(profile='BaseIPythonApplication.profile')) | ||
MinRK
|
r3986 | |||
MinRK
|
r3992 | classes = [ProfileDir] | ||
MinRK
|
r3986 | |||
stop_aliases = dict( | ||||
signal='IPClusterStop.signal', | ||||
MinRK
|
r3992 | profile='BaseIPythonApplication.profile', | ||
profile_dir='ProfileDir.location', | ||||
MinRK
|
r3986 | ) | ||
MinRK
|
r3992 | class IPClusterStop(BaseParallelApplication): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = stop_help | ||||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | |||
signal = Int(signal.SIGINT, config=True, | ||||
help="signal to use for stopping processes.") | ||||
aliases = Dict(stop_aliases) | ||||
MinRK
|
r3987 | |||
MinRK
|
r3986 | def start(self): | ||
"""Start the app for the stop subcommand.""" | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
self.log.critical( | ||||
'Could not read pid file, cluster is probably not running.' | ||||
MinRK
|
r3615 | ) | ||
MinRK
|
r3986 | # Here I exit with a unusual exit status that other processes | ||
# can watch for to learn how I existed. | ||||
self.remove_pid_file() | ||||
self.exit(ALREADY_STOPPED) | ||||
MinRK
|
r3615 | |||
MinRK
|
r3986 | if not self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster [pid=%r] is not running.' % pid | ||||
) | ||||
self.remove_pid_file() | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STOPPED) | ||||
elif os.name=='posix': | ||||
sig = self.signal | ||||
self.log.info( | ||||
"Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | ||||
) | ||||
try: | ||||
os.kill(pid, sig) | ||||
except OSError: | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
elif os.name=='nt': | ||||
try: | ||||
# kill the whole tree | ||||
p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) | ||||
except (CalledProcessError, OSError): | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
engine_aliases = {} | ||||
engine_aliases.update(base_aliases) | ||||
engine_aliases.update(dict( | ||||
n='IPClusterEngines.n', | ||||
elauncher = 'IPClusterEngines.engine_launcher_class', | ||||
)) | ||||
MinRK
|
r3992 | class IPClusterEngines(BaseParallelApplication): | ||
MinRK
|
r3986 | |||
name = u'ipcluster' | ||||
description = engines_help | ||||
usage = None | ||||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | default_log_level = logging.INFO | ||
classes = List() | ||||
def _classes_default(self): | ||||
from IPython.parallel.apps import launcher | ||||
launchers = launcher.all_launchers | ||||
eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__] | ||||
MinRK
|
r3992 | return [ProfileDir]+eslaunchers | ||
MinRK
|
r3986 | |||
n = Int(2, config=True, | ||||
help="The number of engines to start.") | ||||
MinRK
|
r3985 | |||
MinRK
|
r3988 | engine_launcher_class = Unicode('LocalEngineSetLauncher', | ||
MinRK
|
r3986 | config=True, | ||
help="The class for launching a set of Engines." | ||||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | daemonize = Bool(False, config=True, | ||
help='Daemonize the ipcluster program. This implies --log-to-file') | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def _daemonize_changed(self, name, old, new): | ||
if new: | ||||
self.log_to_file = True | ||||
aliases = Dict(engine_aliases) | ||||
# flags = Dict(flags) | ||||
_stopping = False | ||||
def initialize(self, argv=None): | ||||
super(IPClusterEngines, self).initialize(argv) | ||||
self.init_signal() | ||||
self.init_launchers() | ||||
def init_launchers(self): | ||||
self.engine_launcher = self.build_launcher(self.engine_launcher_class) | ||||
self.engine_launcher.on_stop(lambda r: self.loop.stop()) | ||||
def init_signal(self): | ||||
MinRK
|
r3605 | # Setup signals | ||
signal.signal(signal.SIGINT, self.sigint_handler) | ||||
MinRK
|
r3986 | |||
def build_launcher(self, clsname): | ||||
"""import and instantiate a Launcher based on importstring""" | ||||
if '.' not in clsname: | ||||
# not a module, presume it's the raw name in apps.launcher | ||||
clsname = 'IPython.parallel.apps.launcher.'+clsname | ||||
# print repr(clsname) | ||||
klass = import_item(clsname) | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | launcher = klass( | ||
MinRK
|
r3992 | work_dir=self.profile_dir.location, config=self.config, logname=self.log.name | ||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | return launcher | ||
def start_engines(self): | ||||
self.log.info("Starting %i engines"%self.n) | ||||
self.engine_launcher.start( | ||||
MinRK
|
r3985 | self.n, | ||
MinRK
|
r4001 | self.profile_dir.location | ||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | def stop_engines(self): | ||
self.log.info("Stopping Engines...") | ||||
MinRK
|
r3605 | if self.engine_launcher.running: | ||
d = self.engine_launcher.stop() | ||||
return d | ||||
else: | ||||
return None | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self._stopping = True | ||||
MinRK
|
r3610 | self.log.error("IPython cluster: stopping") | ||
MinRK
|
r3986 | self.stop_engines() | ||
MinRK
|
r3605 | # Wait a few seconds to let things shut down. | ||
dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) | ||||
dc.start() | ||||
def sigint_handler(self, signum, frame): | ||||
MinRK
|
r3986 | self.log.debug("SIGINT received, stopping launchers...") | ||
MinRK
|
r3605 | self.stop_launchers() | ||
def start_logging(self): | ||||
# Remove old log files of the controller and engine | ||||
MinRK
|
r3985 | if self.clean_logs: | ||
MinRK
|
r3992 | log_dir = self.profile_dir.log_dir | ||
MinRK
|
r3605 | for f in os.listdir(log_dir): | ||
MinRK
|
r3613 | if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): | ||
os.remove(os.path.join(log_dir, f)) | ||||
# This will remove old log files for ipcluster itself | ||||
MinRK
|
r3992 | # super(IPBaseParallelApplication, self).start_logging() | ||
MinRK
|
r3605 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3986 | """Start the app for the engines subcommand.""" | ||
self.log.info("IPython cluster: started") | ||||
# First see if the cluster is already running | ||||
# Now log and daemonize | ||||
self.log.info( | ||||
'Starting engines with [daemon=%r]' % self.daemonize | ||||
) | ||||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
if self.daemonize: | ||||
if os.name=='posix': | ||||
from twisted.scripts._twistd_unix import daemonize | ||||
daemonize() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop) | ||||
dc.start() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
# self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
start_aliases = {} | ||||
start_aliases.update(engine_aliases) | ||||
start_aliases.update(dict( | ||||
delay='IPClusterStart.delay', | ||||
clean_logs='IPClusterStart.clean_logs', | ||||
)) | ||||
class IPClusterStart(IPClusterEngines): | ||||
name = u'ipcluster' | ||||
description = start_help | ||||
default_log_level = logging.INFO | ||||
MinRK
|
r3992 | auto_create = Bool(True, config=True, | ||
help="whether to create the profile_dir if it doesn't exist") | ||||
MinRK
|
r3986 | classes = List() | ||
def _classes_default(self,): | ||||
from IPython.parallel.apps import launcher | ||||
MinRK
|
r3992 | return [ProfileDir]+launcher.all_launchers | ||
MinRK
|
r3986 | |||
clean_logs = Bool(True, config=True, | ||||
help="whether to cleanup old logs before starting") | ||||
delay = CFloat(1., config=True, | ||||
help="delay (in s) between starting the controller and the engines") | ||||
MinRK
|
r3988 | controller_launcher_class = Unicode('LocalControllerLauncher', | ||
MinRK
|
r3986 | config=True, | ||
help="The class for launching a Controller." | ||||
) | ||||
reset = Bool(False, config=True, | ||||
help="Whether to reset config files as part of '--create'." | ||||
) | ||||
# flags = Dict(flags) | ||||
aliases = Dict(start_aliases) | ||||
def init_launchers(self): | ||||
self.controller_launcher = self.build_launcher(self.controller_launcher_class) | ||||
self.engine_launcher = self.build_launcher(self.engine_launcher_class) | ||||
self.controller_launcher.on_stop(self.stop_launchers) | ||||
def start_controller(self): | ||||
self.controller_launcher.start( | ||||
MinRK
|
r4001 | self.profile_dir.location | ||
MinRK
|
r3986 | ) | ||
def stop_controller(self): | ||||
# self.log.info("In stop_controller") | ||||
if self.controller_launcher and self.controller_launcher.running: | ||||
return self.controller_launcher.stop() | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self.stop_controller() | ||||
super(IPClusterStart, self).stop_launchers() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def start(self): | ||
MinRK
|
r3605 | """Start the app for the start subcommand.""" | ||
# First see if the cluster is already running | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
pass | ||||
else: | ||||
MinRK
|
r3846 | if self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster is already running with [pid=%s]. ' | ||||
'use "ipcluster stop" to stop the cluster.' % pid | ||||
) | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STARTED) | ||||
else: | ||||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
# Now log and daemonize | ||||
self.log.info( | ||||
MinRK
|
r3985 | 'Starting ipcluster with [daemon=%r]' % self.daemonize | ||
MinRK
|
r3605 | ) | ||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
MinRK
|
r3985 | if self.daemonize: | ||
MinRK
|
r3605 | if os.name=='posix': | ||
from twisted.scripts._twistd_unix import daemonize | ||||
daemonize() | ||||
MinRK
|
r3986 | dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) | ||
dc.start() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3605 | # Now write the new pid file AFTER our new forked pid is active. | ||
self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
MinRK
|
r3624 | except KeyboardInterrupt: | ||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r3846 | finally: | ||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | base='IPython.parallel.apps.ipclusterapp.IPCluster' | ||
MinRK
|
r3615 | |||
MinRK
|
r3992 | class IPBaseParallelApplication(Application): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = _description | ||||
MinRK
|
r3615 | |||
MinRK
|
r3986 | subcommands = {'create' : (base+'Create', create_help), | ||
'list' : (base+'List', list_help), | ||||
'start' : (base+'Start', start_help), | ||||
'stop' : (base+'Stop', stop_help), | ||||
'engines' : (base+'Engines', engines_help), | ||||
} | ||||
# no aliases or flags for parent App | ||||
aliases = Dict() | ||||
flags = Dict() | ||||
def start(self): | ||||
if self.subapp is None: | ||||
print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys()) | ||||
self.print_subcommands() | ||||
self.exit(1) | ||||
else: | ||||
return self.subapp.start() | ||||
MinRK
|
r3605 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython cluster.""" | ||||
MinRK
|
r3999 | app = IPBaseParallelApplication.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3605 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||