ipclusterapp.py
598 lines
| 21.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3605 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The ipcluster application. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Brian Granger | ||||
* MinRK | ||||
MinRK
|
r3605 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2008-2011 The IPython Development Team | ||
MinRK
|
r3605 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import errno | ||
MinRK
|
r3605 | import logging | ||
import os | ||||
MinRK
|
r3631 | import re | ||
MinRK
|
r3605 | import signal | ||
MinRK
|
r3846 | from subprocess import check_call, CalledProcessError, PIPE | ||
MinRK
|
r3624 | import zmq | ||
MinRK
|
r3605 | from zmq.eventloop import ioloop | ||
MinRK
|
r5214 | from IPython.config.application import Application, boolean_flag, catch_config_error | ||
MinRK
|
r3985 | from IPython.config.loader import Config | ||
MinRK
|
r4024 | from IPython.core.application import BaseIPythonApplication | ||
from IPython.core.profiledir import ProfileDir | ||||
MinRK
|
r4019 | from IPython.utils.daemonize import daemonize | ||
MinRK
|
r3605 | from IPython.utils.importstring import import_item | ||
MinRK
|
r4247 | from IPython.utils.sysinfo import num_cpus | ||
MinRK
|
r5344 | from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any, | ||
Thomas Kluyver
|
r4055 | DottedObjectName) | ||
MinRK
|
r3688 | |||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import ( | ||
MinRK
|
r3992 | BaseParallelApplication, | ||
MinRK
|
r3985 | PIDFileError, | ||
MinRK
|
r3986 | base_flags, base_aliases | ||
MinRK
|
r3605 | ) | ||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3672 | default_config_file_name = u'ipcluster_config.py' | ||
MinRK
|
r3605 | |||
MinRK
|
r3990 | _description = """Start an IPython cluster for parallel computing. | ||
MinRK
|
r3605 | |||
An IPython cluster consists of 1 controller and 1 or more engines. | ||||
This command automates the startup of these processes using a wide | ||||
range of startup methods (SSH, local processes, PBS, mpiexec, | ||||
Windows HPC Server 2008). To start a cluster with 4 engines on your | ||||
Brian Granger
|
r4216 | local host simply do 'ipcluster start --n=4'. For more complex usage | ||
MinRK
|
r4397 | you will typically do 'ipython profile create mycluster --parallel', then edit | ||
Brian Granger
|
r4216 | configuration files, followed by 'ipcluster start --profile=mycluster --n=4'. | ||
""" | ||||
_main_examples = """ | ||||
Brian E. Granger
|
r4218 | ipcluster start --n=4 # start a 4 node cluster on localhost | ||
Brian Granger
|
r4216 | ipcluster start -h # show the help string for the start subcmd | ||
Brian E. Granger
|
r4218 | |||
Brian Granger
|
r4216 | ipcluster stop -h # show the help string for the stop subcmd | ||
ipcluster engines -h # show the help string for the engines subcmd | ||||
""" | ||||
_start_examples = """ | ||||
ipython profile create mycluster --parallel # create mycluster profile | ||||
ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes | ||||
""" | ||||
_stop_examples = """ | ||||
ipcluster stop --profile=mycluster # stop a running cluster by profile name | ||||
""" | ||||
_engines_examples = """ | ||||
ipcluster engines --profile=mycluster --n=4 # start 4 engines only | ||||
MinRK
|
r3605 | """ | ||
# Exit codes for ipcluster | ||||
# This will be the exit code if the ipcluster appears to be running because | ||||
# a .pid file exists | ||||
ALREADY_STARTED = 10 | ||||
# This will be the exit code if ipcluster stop is run, but there is not .pid | ||||
# file to be found. | ||||
ALREADY_STOPPED = 11 | ||||
MinRK
|
r3615 | # This will be the exit code if ipcluster engines is run, but there is not .pid | ||
# file to be found. | ||||
NO_CLUSTER = 12 | ||||
MinRK
|
r3605 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3605 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3990 | start_help = """Start an IPython cluster for parallel computing | ||
MinRK
|
r3986 | Start an ipython cluster by its profile name or cluster | ||
directory. Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r4024 | 'profile_<name>' and should be creating using the 'start' | ||
Bernardo B. Marques
|
r4872 | subcommand of 'ipcluster'. If your cluster directory is in | ||
MinRK
|
r3986 | the cwd or the ipython directory, you can simply refer to it | ||
Brian E. Granger
|
r4218 | using its profile name, 'ipcluster start --n=4 --profile=<profile>`, | ||
otherwise use the 'profile-dir' option. | ||||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | stop_help = """Stop a running IPython cluster | ||
MinRK
|
r3986 | Stop a running ipython cluster by its profile name or cluster | ||
directory. Cluster directories are named using the convention | ||||
Bernardo B. Marques
|
r4872 | 'profile_<name>'. If your cluster directory is in | ||
MinRK
|
r3986 | the cwd or the ipython directory, you can simply refer to it | ||
Brian E. Granger
|
r4218 | using its profile name, 'ipcluster stop --profile=<profile>`, otherwise | ||
use the '--profile-dir' option. | ||||
MinRK
|
r3986 | """ | ||
MinRK
|
r3990 | engines_help = """Start engines connected to an existing IPython cluster | ||
MinRK
|
r3986 | Start one or more engines to connect to an existing Cluster | ||
by profile name or cluster directory. | ||||
Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r4024 | 'profile_<name>' and should be creating using the 'start' | ||
Bernardo B. Marques
|
r4872 | subcommand of 'ipcluster'. If your cluster directory is in | ||
MinRK
|
r3986 | the cwd or the ipython directory, you can simply refer to it | ||
Brian E. Granger
|
r4218 | using its profile name, 'ipcluster engines --n=4 --profile=<profile>`, | ||
otherwise use the 'profile-dir' option. | ||||
MinRK
|
r3986 | """ | ||
stop_aliases = dict( | ||||
signal='IPClusterStop.signal', | ||||
) | ||||
MinRK
|
r4114 | stop_aliases.update(base_aliases) | ||
MinRK
|
r3986 | |||
MinRK
|
r3992 | class IPClusterStop(BaseParallelApplication): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = stop_help | ||||
Brian Granger
|
r4216 | examples = _stop_examples | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5344 | signal = Integer(signal.SIGINT, config=True, | ||
MinRK
|
r3986 | help="signal to use for stopping processes.") | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | aliases = Dict(stop_aliases) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | def start(self): | ||
"""Start the app for the stop subcommand.""" | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
self.log.critical( | ||||
'Could not read pid file, cluster is probably not running.' | ||||
MinRK
|
r3615 | ) | ||
MinRK
|
r3986 | # Here I exit with a unusual exit status that other processes | ||
# can watch for to learn how I existed. | ||||
self.remove_pid_file() | ||||
self.exit(ALREADY_STOPPED) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | if not self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster [pid=%r] is not running.' % pid | ||||
) | ||||
self.remove_pid_file() | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STOPPED) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | elif os.name=='posix': | ||
sig = self.signal | ||||
self.log.info( | ||||
"Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | ||||
) | ||||
try: | ||||
os.kill(pid, sig) | ||||
except OSError: | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
elif os.name=='nt': | ||||
try: | ||||
# kill the whole tree | ||||
p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) | ||||
except (CalledProcessError, OSError): | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | engine_aliases = {} | ||
engine_aliases.update(base_aliases) | ||||
engine_aliases.update(dict( | ||||
n='IPClusterEngines.n', | ||||
MinRK
|
r4115 | engines = 'IPClusterEngines.engine_launcher_class', | ||
MinRK
|
r4114 | daemonize = 'IPClusterEngines.daemonize', | ||
)) | ||||
engine_flags = {} | ||||
engine_flags.update(base_flags) | ||||
engine_flags.update(dict( | ||||
daemonize=( | ||||
{'IPClusterEngines' : {'daemonize' : True}}, | ||||
"""run the cluster into the background (not available on Windows)""", | ||||
) | ||||
MinRK
|
r3986 | )) | ||
MinRK
|
r3992 | class IPClusterEngines(BaseParallelApplication): | ||
MinRK
|
r3986 | |||
name = u'ipcluster' | ||||
description = engines_help | ||||
Brian Granger
|
r4216 | examples = _engines_examples | ||
MinRK
|
r3986 | usage = None | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3986 | default_log_level = logging.INFO | ||
classes = List() | ||||
def _classes_default(self): | ||||
from IPython.parallel.apps import launcher | ||||
launchers = launcher.all_launchers | ||||
eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__] | ||||
MinRK
|
r3992 | return [ProfileDir]+eslaunchers | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5344 | n = Integer(num_cpus(), config=True, | ||
MinRK
|
r4247 | help="""The number of engines to start. The default is to use one for each | ||
CPU on your machine""") | ||||
MinRK
|
r3985 | |||
MinRK
|
r5182 | engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class") | ||
def _engine_launcher_changed(self, name, old, new): | ||||
if isinstance(new, basestring): | ||||
self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12," | ||||
" use engine_launcher_class" % self.__class__.__name__) | ||||
self.engine_launcher_class = new | ||||
Thomas Kluyver
|
r4055 | engine_launcher_class = DottedObjectName('LocalEngineSetLauncher', | ||
MinRK
|
r3986 | config=True, | ||
MinRK
|
r4247 | help="""The class for launching a set of Engines. Change this value | ||
MinRK
|
r5696 | to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc. | ||
MinRK
|
r4247 | Each launcher class has its own set of configuration options, for making sure | ||
it will work in your environment. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4247 | You can also write your own launcher, and specify it's absolute import path, | ||
as in 'mymodule.launcher.FTLEnginesLauncher`. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5696 | IPython's bundled examples include: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5696 | Local : start engines locally as subprocesses [default] | ||
MPI : use mpiexec to launch engines in an MPI environment | ||||
PBS : use PBS (qsub) to submit engines to a batch queue | ||||
SGE : use SGE (qsub) to submit engines to a batch queue | ||||
LSF : use LSF (bsub) to submit engines to a batch queue | ||||
SSH : use SSH to start the controller | ||||
Note that SSH does *not* move the connection files | ||||
around, so you will likely have to do this manually | ||||
unless the machines are on a shared file system. | ||||
WindowsHPC : use Windows HPC | ||||
MinRK
|
r5183 | |||
If you are using one of IPython's builtin launchers, you can specify just the | ||||
prefix, e.g: | ||||
c.IPClusterEngines.engine_launcher_class = 'SSH' | ||||
or: | ||||
MinRK
|
r5696 | ipcluster start --engines=MPI | ||
MinRK
|
r5183 | |||
MinRK
|
r4247 | """ | ||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | daemonize = Bool(False, config=True, | ||
MinRK
|
r4114 | help="""Daemonize the ipcluster program. This implies --log-to-file. | ||
Not available on Windows. | ||||
""") | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def _daemonize_changed(self, name, old, new): | ||
if new: | ||||
self.log_to_file = True | ||||
MinRK
|
r5344 | early_shutdown = Integer(30, config=True, help="The timeout (in seconds)") | ||
MinRK
|
r5205 | _stopping = False | ||
MinRK
|
r3986 | aliases = Dict(engine_aliases) | ||
MinRK
|
r4114 | flags = Dict(engine_flags) | ||
MinRK
|
r3986 | |||
MinRK
|
r5214 | @catch_config_error | ||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
super(IPClusterEngines, self).initialize(argv) | ||||
self.init_signal() | ||||
self.init_launchers() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | def init_launchers(self): | ||
MinRK
|
r4851 | self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | def init_signal(self): | ||
MinRK
|
r3605 | # Setup signals | ||
signal.signal(signal.SIGINT, self.sigint_handler) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4851 | def build_launcher(self, clsname, kind=None): | ||
MinRK
|
r3986 | """import and instantiate a Launcher based on importstring""" | ||
if '.' not in clsname: | ||||
# not a module, presume it's the raw name in apps.launcher | ||||
MinRK
|
r4851 | if kind and kind not in clsname: | ||
# doesn't match necessary full class name, assume it's | ||||
MinRK
|
r5696 | # just 'PBS' or 'MPI' prefix: | ||
MinRK
|
r4851 | clsname = clsname + kind + 'Launcher' | ||
MinRK
|
r3986 | clsname = 'IPython.parallel.apps.launcher.'+clsname | ||
MinRK
|
r4247 | try: | ||
klass = import_item(clsname) | ||||
except (ImportError, KeyError): | ||||
self.log.fatal("Could not import launcher class: %r"%clsname) | ||||
self.exit(1) | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | launcher = klass( | ||
MinRK
|
r4848 | work_dir=u'.', config=self.config, log=self.log, | ||
profile_dir=self.profile_dir.location, cluster_id=self.cluster_id, | ||||
MinRK
|
r3605 | ) | ||
MinRK
|
r3986 | return launcher | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5218 | def engines_started_ok(self): | ||
MinRK
|
r5205 | self.log.info("Engines appear to have started successfully") | ||
self.early_shutdown = 0 | ||||
MinRK
|
r3986 | def start_engines(self): | ||
MinRK
|
r5754 | # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH: | ||
n = getattr(self.engine_launcher, 'engine_count', self.n) | ||||
self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class) | ||||
MinRK
|
r4848 | self.engine_launcher.start(self.n) | ||
MinRK
|
r5205 | self.engine_launcher.on_stop(self.engines_stopped_early) | ||
if self.early_shutdown: | ||||
MinRK
|
r5218 | ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start() | ||
MinRK
|
r5205 | |||
def engines_stopped_early(self, r): | ||||
if self.early_shutdown and not self._stopping: | ||||
self.log.error(""" | ||||
Engines shutdown early, they probably failed to connect. | ||||
Check the engine log files for output. | ||||
If your controller and engines are not on the same machine, you probably | ||||
have to instruct the controller to listen on an interface other than localhost. | ||||
You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args. | ||||
Be sure to read our security docs before instructing your controller to listen on | ||||
a public interface. | ||||
""") | ||||
self.stop_launchers() | ||||
return self.engines_stopped(r) | ||||
def engines_stopped(self, r): | ||||
return self.loop.stop() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def stop_engines(self): | ||
MinRK
|
r3605 | if self.engine_launcher.running: | ||
MinRK
|
r5205 | self.log.info("Stopping Engines...") | ||
MinRK
|
r3605 | d = self.engine_launcher.stop() | ||
return d | ||||
else: | ||||
return None | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self._stopping = True | ||||
MinRK
|
r3610 | self.log.error("IPython cluster: stopping") | ||
MinRK
|
r3986 | self.stop_engines() | ||
MinRK
|
r3605 | # Wait a few seconds to let things shut down. | ||
MinRK
|
r5205 | dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) | ||
MinRK
|
r3605 | dc.start() | ||
def sigint_handler(self, signum, frame): | ||||
MinRK
|
r3986 | self.log.debug("SIGINT received, stopping launchers...") | ||
MinRK
|
r3605 | self.stop_launchers() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3605 | def start_logging(self): | ||
# Remove old log files of the controller and engine | ||||
MinRK
|
r3985 | if self.clean_logs: | ||
MinRK
|
r3992 | log_dir = self.profile_dir.log_dir | ||
MinRK
|
r3605 | for f in os.listdir(log_dir): | ||
MinRK
|
r3613 | if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): | ||
os.remove(os.path.join(log_dir, f)) | ||||
# This will remove old log files for ipcluster itself | ||||
MinRK
|
r3992 | # super(IPBaseParallelApplication, self).start_logging() | ||
MinRK
|
r3605 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3986 | """Start the app for the engines subcommand.""" | ||
self.log.info("IPython cluster: started") | ||||
# First see if the cluster is already running | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | # Now log and daemonize | ||
self.log.info( | ||||
'Starting engines with [daemon=%r]' % self.daemonize | ||||
) | ||||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
if self.daemonize: | ||||
if os.name=='posix': | ||||
daemonize() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop) | ||||
dc.start() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
# self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
except KeyboardInterrupt: | ||||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
start_aliases = {} | ||||
start_aliases.update(engine_aliases) | ||||
start_aliases.update(dict( | ||||
delay='IPClusterStart.delay', | ||||
MinRK
|
r4115 | controller = 'IPClusterStart.controller_launcher_class', | ||
MinRK
|
r3986 | )) | ||
MinRK
|
r4214 | start_aliases['clean-logs'] = 'IPClusterStart.clean_logs' | ||
MinRK
|
r3986 | |||
class IPClusterStart(IPClusterEngines): | ||||
name = u'ipcluster' | ||||
description = start_help | ||||
Brian Granger
|
r4216 | examples = _start_examples | ||
MinRK
|
r3986 | default_log_level = logging.INFO | ||
MinRK
|
r3992 | auto_create = Bool(True, config=True, | ||
help="whether to create the profile_dir if it doesn't exist") | ||||
MinRK
|
r3986 | classes = List() | ||
def _classes_default(self,): | ||||
from IPython.parallel.apps import launcher | ||||
MinRK
|
r4025 | return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers | ||
Bernardo B. Marques
|
r4872 | |||
clean_logs = Bool(True, config=True, | ||||
MinRK
|
r3986 | help="whether to cleanup old logs before starting") | ||
delay = CFloat(1., config=True, | ||||
help="delay (in s) between starting the controller and the engines") | ||||
MinRK
|
r5182 | controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class") | ||
def _controller_launcher_changed(self, name, old, new): | ||||
if isinstance(new, basestring): | ||||
# old 0.11-style config | ||||
self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12," | ||||
" use controller_launcher_class" % self.__class__.__name__) | ||||
self.controller_launcher_class = new | ||||
Thomas Kluyver
|
r4055 | controller_launcher_class = DottedObjectName('LocalControllerLauncher', | ||
MinRK
|
r3986 | config=True, | ||
MinRK
|
r5069 | help="""The class for launching a Controller. Change this value if you want | ||
MinRK
|
r5696 | your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4247 | Each launcher class has its own set of configuration options, for making sure | ||
it will work in your environment. | ||||
MinRK
|
r5696 | |||
Note that using a batch launcher for the controller *does not* put it | ||||
in the same batch job as the engines, so they will still start separately. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5696 | IPython's bundled examples include: | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5696 | Local : start engines locally as subprocesses | ||
MPI : use mpiexec to launch the controller in an MPI universe | ||||
PBS : use PBS (qsub) to submit the controller to a batch queue | ||||
SGE : use SGE (qsub) to submit the controller to a batch queue | ||||
LSF : use LSF (bsub) to submit the controller to a batch queue | ||||
SSH : use SSH to start the controller | ||||
WindowsHPC : use Windows HPC | ||||
MinRK
|
r5183 | |||
If you are using one of IPython's builtin launchers, you can specify just the | ||||
prefix, e.g: | ||||
c.IPClusterStart.controller_launcher_class = 'SSH' | ||||
or: | ||||
MinRK
|
r5696 | ipcluster start --controller=MPI | ||
MinRK
|
r5183 | |||
MinRK
|
r4247 | """ | ||
MinRK
|
r3986 | ) | ||
reset = Bool(False, config=True, | ||||
help="Whether to reset config files as part of '--create'." | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | # flags = Dict(flags) | ||
aliases = Dict(start_aliases) | ||||
def init_launchers(self): | ||||
MinRK
|
r4851 | self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller') | ||
self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet') | ||||
MinRK
|
r3986 | self.controller_launcher.on_stop(self.stop_launchers) | ||
MinRK
|
r5205 | |||
def engines_stopped(self, r): | ||||
"""prevent parent.engines_stopped from stopping everything on engine shutdown""" | ||||
pass | ||||
MinRK
|
r3986 | def start_controller(self): | ||
MinRK
|
r5754 | self.log.info("Starting Controller with %s", self.controller_launcher_class) | ||
MinRK
|
r4848 | self.controller_launcher.start() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | def stop_controller(self): | ||
# self.log.info("In stop_controller") | ||||
if self.controller_launcher and self.controller_launcher.running: | ||||
return self.controller_launcher.stop() | ||||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self.stop_controller() | ||||
super(IPClusterStart, self).stop_launchers() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | def start(self): | ||
MinRK
|
r3605 | """Start the app for the start subcommand.""" | ||
# First see if the cluster is already running | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
pass | ||||
else: | ||||
MinRK
|
r3846 | if self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster is already running with [pid=%s]. ' | ||||
'use "ipcluster stop" to stop the cluster.' % pid | ||||
) | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STARTED) | ||||
else: | ||||
self.remove_pid_file() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3605 | |||
# Now log and daemonize | ||||
self.log.info( | ||||
MinRK
|
r3985 | 'Starting ipcluster with [daemon=%r]' % self.daemonize | ||
MinRK
|
r3605 | ) | ||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
MinRK
|
r3985 | if self.daemonize: | ||
MinRK
|
r3605 | if os.name=='posix': | ||
daemonize() | ||||
MinRK
|
r3986 | dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) | ||
dc.start() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3605 | # Now write the new pid file AFTER our new forked pid is active. | ||
self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
MinRK
|
r3624 | except KeyboardInterrupt: | ||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r3846 | finally: | ||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3986 | base='IPython.parallel.apps.ipclusterapp.IPCluster' | ||
MinRK
|
r3615 | |||
MinRK
|
r4025 | class IPClusterApp(Application): | ||
MinRK
|
r3986 | name = u'ipcluster' | ||
description = _description | ||||
Brian Granger
|
r4216 | examples = _main_examples | ||
MinRK
|
r4025 | subcommands = { | ||
MinRK
|
r3986 | 'start' : (base+'Start', start_help), | ||
'stop' : (base+'Stop', stop_help), | ||||
'engines' : (base+'Engines', engines_help), | ||||
} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | # no aliases or flags for parent App | ||
aliases = Dict() | ||||
flags = Dict() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3986 | def start(self): | ||
if self.subapp is None: | ||||
MinRK
|
r4026 | print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()) | ||
MinRK
|
r3986 | |||
MinRK
|
r4026 | self.print_description() | ||
MinRK
|
r3986 | self.print_subcommands() | ||
self.exit(1) | ||||
else: | ||||
return self.subapp.start() | ||||
MinRK
|
r3605 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython cluster.""" | ||||
MinRK
|
r4026 | app = IPClusterApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3605 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||