ipclusterapp.py
446 lines
| 14.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3605 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The ipcluster application. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Brian Granger | ||||
* MinRK | ||||
MinRK
|
r3605 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2008-2011 The IPython Development Team | ||
MinRK
|
r3605 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import errno | ||
MinRK
|
r3605 | import logging | ||
import os | ||||
MinRK
|
r3631 | import re | ||
MinRK
|
r3605 | import signal | ||
MinRK
|
r3846 | from subprocess import check_call, CalledProcessError, PIPE | ||
MinRK
|
r3624 | import zmq | ||
MinRK
|
r3605 | from zmq.eventloop import ioloop | ||
MinRK
|
r3986 | from IPython.config.application import Application, boolean_flag | ||
MinRK
|
r3985 | from IPython.config.loader import Config | ||
MinRK
|
r4024 | from IPython.core.application import BaseIPythonApplication | ||
from IPython.core.profiledir import ProfileDir | ||||
MinRK
|
r4019 | from IPython.utils.daemonize import daemonize | ||
MinRK
|
r3605 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3988 | from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List | ||
MinRK
|
r3688 | |||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import ( | ||
MinRK
|
r3992 | BaseParallelApplication, | ||
MinRK
|
r3985 | PIDFileError, | ||
MinRK
|
r3986 | base_flags, base_aliases | ||
MinRK
|
r3605 | ) | ||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3672 | default_config_file_name = u'ipcluster_config.py' | ||
MinRK
|
r3605 | |||
MinRK
|
r3990 | _description = """Start an IPython cluster for parallel computing. | ||
MinRK
|
r3605 | |||
An IPython cluster consists of 1 controller and 1 or more engines. | ||||
This command automates the startup of these processes using a wide | ||||
range of startup methods (SSH, local processes, PBS, mpiexec, | ||||
Windows HPC Server 2008). To start a cluster with 4 engines on your | ||||
MinRK
|
r3985 | local host simply do 'ipcluster start n=4'. For more complex usage | ||
MinRK
|
r3986 | you will typically do 'ipcluster create profile=mycluster', then edit | ||
configuration files, followed by 'ipcluster start profile=mycluster n=4'. | ||||
MinRK
|
r3605 | """ | ||
# Exit codes for ipcluster | ||||
# This will be the exit code if the ipcluster appears to be running because | ||||
# a .pid file exists | ||||
ALREADY_STARTED = 10 | ||||
# This will be the exit code if ipcluster stop is run, but there is not .pid | ||||
# file to be found. | ||||
ALREADY_STOPPED = 11 | ||||
MinRK
|
r3615 | # This will be the exit code if ipcluster engines is run, but there is not .pid | ||
# file to be found. | ||||
NO_CLUSTER = 12 | ||||
MinRK
|
r3605 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3605 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3990 | start_help = """Start an IPython cluster for parallel computing | ||
MinRK
|
r3986 | Start an ipython cluster by its profile name or cluster | ||
directory. Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r4024 | 'profile_<name>' and should be creating using the 'start' | ||
MinRK
|
r3986 | subcommand of 'ipcluster'. If your cluster directory is in | ||
the cwd or the ipython directory, you can simply refer to it | ||||
using its profile name, 'ipcluster start n=4 profile=<profile>`, | ||||
MinRK
|
r3992 | otherwise use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | stop_help = """Stop a running IPython cluster | ||
MinRK
|
r3986 | Stop a running ipython cluster by its profile name or cluster | ||
directory. Cluster directories are named using the convention | ||||
MinRK
|
r4024 | 'profile_<name>'. If your cluster directory is in | ||
MinRK
|
r3986 | the cwd or the ipython directory, you can simply refer to it | ||
using its profile name, 'ipcluster stop profile=<profile>`, otherwise | ||||
MinRK
|
r3992 | use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | engines_help = """Start engines connected to an existing IPython cluster | ||
MinRK
|
r3986 | Start one or more engines to connect to an existing Cluster | ||
by profile name or cluster directory. | ||||
Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r4024 | 'profile_<name>' and should be creating using the 'start' | ||
MinRK
|
r3986 | subcommand of 'ipcluster'. If your cluster directory is in | ||
the cwd or the ipython directory, you can simply refer to it | ||||
using its profile name, 'ipcluster engines n=4 profile=<profile>`, | ||||
MinRK
|
r3992 | otherwise use the 'profile_dir' option. | ||
MinRK
|
r3986 | """ | ||
stop_aliases = dict( | ||||
signal='IPClusterStop.signal', | ||||
MinRK
|
r3992 | profile='BaseIPythonApplication.profile', | ||
profile_dir='ProfileDir.location', | ||||
MinRK
|
r3986 | ) | ||
MinRK
|
r3992 | class IPClusterStop(BaseParallelApplication): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = stop_help | ||||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | |||
signal = Int(signal.SIGINT, config=True, | ||||
help="signal to use for stopping processes.") | ||||
aliases = Dict(stop_aliases) | ||||
MinRK
|
r3987 | |||
MinRK
|
r3986 | def start(self): | ||
"""Start the app for the stop subcommand.""" | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
self.log.critical( | ||||
'Could not read pid file, cluster is probably not running.' | ||||
MinRK
|
r3615 | ) | ||
MinRK
|
r3986 | # Here I exit with a unusual exit status that other processes | ||
# can watch for to learn how I existed. | ||||
self.remove_pid_file() | ||||
self.exit(ALREADY_STOPPED) | ||||
MinRK
|
r3615 | |||
MinRK
|
r3986 | if not self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster [pid=%r] is not running.' % pid | ||||
) | ||||
self.remove_pid_file() | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STOPPED) | ||||
elif os.name=='posix': | ||||
sig = self.signal | ||||
self.log.info( | ||||
"Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | ||||
) | ||||
try: | ||||
os.kill(pid, sig) | ||||
except OSError: | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
elif os.name=='nt': | ||||
try: | ||||
# kill the whole tree | ||||
p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) | ||||
except (CalledProcessError, OSError): | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
engine_aliases = {} | ||||
engine_aliases.update(base_aliases) | ||||
engine_aliases.update(dict( | ||||
n='IPClusterEngines.n', | ||||
elauncher = 'IPClusterEngines.engine_launcher_class', | ||||
)) | ||||
MinRK
|
r3992 | class IPClusterEngines(BaseParallelApplication): | ||
MinRK
|
r3986 | |||
name = u'ipcluster' | ||||
description = engines_help | ||||
usage = None | ||||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | default_log_level = logging.INFO | ||
classes = List() | ||||
def _classes_default(self): | ||||
from IPython.parallel.apps import launcher | ||||
launchers = launcher.all_launchers | ||||
eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__] | ||||
MinRK
|
r3992 | return [ProfileDir]+eslaunchers | ||
MinRK
|
r3986 | |||
n = Int(2, config=True, | ||||
help="The number of engines to start.") | ||||
MinRK
|
r3985 | |||
MinRK
|
r3988 | engine_launcher_class = Unicode('LocalEngineSetLauncher', | ||
MinRK
|
r3986 | config=True, | ||
help="The class for launching a set of Engines." | ||||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | daemonize = Bool(False, config=True, | ||
help='Daemonize the ipcluster program. This implies --log-to-file') | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def _daemonize_changed(self, name, old, new): | ||
if new: | ||||
self.log_to_file = True | ||||
aliases = Dict(engine_aliases) | ||||
# flags = Dict(flags) | ||||
_stopping = False | ||||
def initialize(self, argv=None): | ||||
super(IPClusterEngines, self).initialize(argv) | ||||
self.init_signal() | ||||
self.init_launchers() | ||||
def init_launchers(self): | ||||
self.engine_launcher = self.build_launcher(self.engine_launcher_class) | ||||
self.engine_launcher.on_stop(lambda r: self.loop.stop()) | ||||
def init_signal(self): | ||||
MinRK
|
r3605 | # Setup signals | ||
signal.signal(signal.SIGINT, self.sigint_handler) | ||||
MinRK
|
r3986 | |||
def build_launcher(self, clsname): | ||||
"""import and instantiate a Launcher based on importstring""" | ||||
if '.' not in clsname: | ||||
# not a module, presume it's the raw name in apps.launcher | ||||
clsname = 'IPython.parallel.apps.launcher.'+clsname | ||||
# print repr(clsname) | ||||
klass = import_item(clsname) | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | launcher = klass( | ||
MinRK
|
r4007 | work_dir=self.profile_dir.location, config=self.config, log=self.log | ||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | return launcher | ||
def start_engines(self): | ||||
self.log.info("Starting %i engines"%self.n) | ||||
self.engine_launcher.start( | ||||
MinRK
|
r3985 | self.n, | ||
MinRK
|
r4001 | self.profile_dir.location | ||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | def stop_engines(self): | ||
self.log.info("Stopping Engines...") | ||||
MinRK
|
r3605 | if self.engine_launcher.running: | ||
d = self.engine_launcher.stop() | ||||
return d | ||||
else: | ||||
return None | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self._stopping = True | ||||
MinRK
|
r3610 | self.log.error("IPython cluster: stopping") | ||
MinRK
|
r3986 | self.stop_engines() | ||
MinRK
|
r3605 | # Wait a few seconds to let things shut down. | ||
dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) | ||||
dc.start() | ||||
def sigint_handler(self, signum, frame): | ||||
MinRK
|
r3986 | self.log.debug("SIGINT received, stopping launchers...") | ||
MinRK
|
r3605 | self.stop_launchers() | ||
def start_logging(self): | ||||
# Remove old log files of the controller and engine | ||||
MinRK
|
r3985 | if self.clean_logs: | ||
MinRK
|
r3992 | log_dir = self.profile_dir.log_dir | ||
MinRK
|
r3605 | for f in os.listdir(log_dir): | ||
MinRK
|
r3613 | if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): | ||
os.remove(os.path.join(log_dir, f)) | ||||
# This will remove old log files for ipcluster itself | ||||
MinRK
|
r3992 | # super(IPBaseParallelApplication, self).start_logging() | ||
MinRK
|
r3605 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3986 | """Start the app for the engines subcommand.""" | ||
self.log.info("IPython cluster: started") | ||||
# First see if the cluster is already running | ||||
# Now log and daemonize | ||||
self.log.info( | ||||
'Starting engines with [daemon=%r]' % self.daemonize | ||||
) | ||||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
if self.daemonize: | ||||
if os.name=='posix': | ||||
daemonize() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop) | ||||
dc.start() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
# self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
start_aliases = {} | ||||
start_aliases.update(engine_aliases) | ||||
start_aliases.update(dict( | ||||
delay='IPClusterStart.delay', | ||||
clean_logs='IPClusterStart.clean_logs', | ||||
)) | ||||
class IPClusterStart(IPClusterEngines): | ||||
name = u'ipcluster' | ||||
description = start_help | ||||
default_log_level = logging.INFO | ||||
MinRK
|
r3992 | auto_create = Bool(True, config=True, | ||
help="whether to create the profile_dir if it doesn't exist") | ||||
MinRK
|
r3986 | classes = List() | ||
def _classes_default(self,): | ||||
from IPython.parallel.apps import launcher | ||||
MinRK
|
r4025 | return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers | ||
MinRK
|
r3986 | |||
clean_logs = Bool(True, config=True, | ||||
help="whether to cleanup old logs before starting") | ||||
delay = CFloat(1., config=True, | ||||
help="delay (in s) between starting the controller and the engines") | ||||
MinRK
|
r3988 | controller_launcher_class = Unicode('LocalControllerLauncher', | ||
MinRK
|
r3986 | config=True, | ||
help="The class for launching a Controller." | ||||
) | ||||
reset = Bool(False, config=True, | ||||
help="Whether to reset config files as part of '--create'." | ||||
) | ||||
# flags = Dict(flags) | ||||
aliases = Dict(start_aliases) | ||||
def init_launchers(self): | ||||
self.controller_launcher = self.build_launcher(self.controller_launcher_class) | ||||
self.engine_launcher = self.build_launcher(self.engine_launcher_class) | ||||
self.controller_launcher.on_stop(self.stop_launchers) | ||||
def start_controller(self): | ||||
self.controller_launcher.start( | ||||
MinRK
|
r4001 | self.profile_dir.location | ||
MinRK
|
r3986 | ) | ||
def stop_controller(self): | ||||
# self.log.info("In stop_controller") | ||||
if self.controller_launcher and self.controller_launcher.running: | ||||
return self.controller_launcher.stop() | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self.stop_controller() | ||||
super(IPClusterStart, self).stop_launchers() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def start(self): | ||
MinRK
|
r3605 | """Start the app for the start subcommand.""" | ||
# First see if the cluster is already running | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
pass | ||||
else: | ||||
MinRK
|
r3846 | if self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster is already running with [pid=%s]. ' | ||||
'use "ipcluster stop" to stop the cluster.' % pid | ||||
) | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STARTED) | ||||
else: | ||||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
# Now log and daemonize | ||||
self.log.info( | ||||
MinRK
|
r3985 | 'Starting ipcluster with [daemon=%r]' % self.daemonize | ||
MinRK
|
r3605 | ) | ||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
MinRK
|
r3985 | if self.daemonize: | ||
MinRK
|
r3605 | if os.name=='posix': | ||
daemonize() | ||||
MinRK
|
r3986 | dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) | ||
dc.start() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3605 | # Now write the new pid file AFTER our new forked pid is active. | ||
self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
MinRK
|
r3624 | except KeyboardInterrupt: | ||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r3846 | finally: | ||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | base='IPython.parallel.apps.ipclusterapp.IPCluster' | ||
MinRK
|
r3615 | |||
MinRK
|
r4025 | class IPClusterApp(Application): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = _description | ||||
MinRK
|
r3615 | |||
MinRK
|
r4025 | subcommands = { | ||
MinRK
|
r3986 | 'start' : (base+'Start', start_help), | ||
'stop' : (base+'Stop', stop_help), | ||||
'engines' : (base+'Engines', engines_help), | ||||
} | ||||
# no aliases or flags for parent App | ||||
aliases = Dict() | ||||
flags = Dict() | ||||
def start(self): | ||||
if self.subapp is None: | ||||
MinRK
|
r4026 | print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()) | ||
MinRK
|
r3986 | |||
MinRK
|
r4026 | self.print_description() | ||
MinRK
|
r3986 | self.print_subcommands() | ||
self.exit(1) | ||||
else: | ||||
return self.subapp.start() | ||||
MinRK
|
r3605 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython cluster.""" | ||||
MinRK
|
r4026 | app = IPClusterApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3605 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||