ipclusterapp.py
617 lines
| 23.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3605 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The ipcluster application. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import errno | ||
MinRK
|
r3605 | import logging | ||
import os | ||||
MinRK
|
r3631 | import re | ||
MinRK
|
r3605 | import signal | ||
MinRK
|
r3846 | from subprocess import check_call, CalledProcessError, PIPE | ||
MinRK
|
r3624 | import zmq | ||
MinRK
|
r3605 | from zmq.eventloop import ioloop | ||
from IPython.external.argparse import ArgumentParser, SUPPRESS | ||||
from IPython.utils.importstring import import_item | ||||
MinRK
|
r3688 | |||
from IPython.parallel.apps.clusterdir import ( | ||||
MinRK
|
r3605 | ApplicationWithClusterDir, ClusterDirConfigLoader, | ||
ClusterDirError, PIDFileError | ||||
) | ||||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3672 | default_config_file_name = u'ipcluster_config.py' | ||
MinRK
|
r3605 | |||
_description = """\ | ||||
Start an IPython cluster for parallel computing.\n\n | ||||
An IPython cluster consists of 1 controller and 1 or more engines. | ||||
This command automates the startup of these processes using a wide | ||||
range of startup methods (SSH, local processes, PBS, mpiexec, | ||||
Windows HPC Server 2008). To start a cluster with 4 engines on your | ||||
MinRK
|
r3672 | local host simply do 'ipcluster start -n 4'. For more complex usage | ||
you will typically do 'ipcluster create -p mycluster', then edit | ||||
configuration files, followed by 'ipcluster start -p mycluster -n 4'. | ||||
MinRK
|
r3605 | """ | ||
# Exit codes for ipcluster | ||||
# This will be the exit code if the ipcluster appears to be running because | ||||
# a .pid file exists | ||||
ALREADY_STARTED = 10 | ||||
# This will be the exit code if ipcluster stop is run, but there is not .pid | ||||
# file to be found. | ||||
ALREADY_STOPPED = 11 | ||||
MinRK
|
r3615 | # This will be the exit code if ipcluster engines is run, but there is not .pid | ||
# file to be found. | ||||
NO_CLUSTER = 12 | ||||
MinRK
|
r3605 | |||
#----------------------------------------------------------------------------- | ||||
# Command line options | ||||
#----------------------------------------------------------------------------- | ||||
class IPClusterAppConfigLoader(ClusterDirConfigLoader): | ||||
def _add_arguments(self): | ||||
# Don't call ClusterDirConfigLoader._add_arguments as we don't want | ||||
# its defaults on self.parser. Instead, we will put those on | ||||
# default options on our subparsers. | ||||
# This has all the common options that all subcommands use | ||||
parent_parser1 = ArgumentParser( | ||||
add_help=False, | ||||
argument_default=SUPPRESS | ||||
) | ||||
self._add_ipython_dir(parent_parser1) | ||||
self._add_log_level(parent_parser1) | ||||
# This has all the common options that other subcommands use | ||||
parent_parser2 = ArgumentParser( | ||||
add_help=False, | ||||
argument_default=SUPPRESS | ||||
) | ||||
self._add_cluster_profile(parent_parser2) | ||||
self._add_cluster_dir(parent_parser2) | ||||
self._add_work_dir(parent_parser2) | ||||
paa = parent_parser2.add_argument | ||||
paa('--log-to-file', | ||||
action='store_true', dest='Global.log_to_file', | ||||
help='Log to a file in the log directory (default is stdout)') | ||||
# Create the object used to create the subparsers. | ||||
subparsers = self.parser.add_subparsers( | ||||
dest='Global.subcommand', | ||||
title='ipcluster subcommands', | ||||
description= | ||||
"""ipcluster has a variety of subcommands. The general way of | ||||
MinRK
|
r3672 | running ipcluster is 'ipcluster <cmd> [options]'. To get help | ||
on a particular subcommand do 'ipcluster <cmd> -h'.""" | ||||
# help="For more help, type 'ipcluster <cmd> -h'", | ||||
MinRK
|
r3605 | ) | ||
# The "list" subcommand parser | ||||
parser_list = subparsers.add_parser( | ||||
'list', | ||||
parents=[parent_parser1], | ||||
argument_default=SUPPRESS, | ||||
help="List all clusters in cwd and ipython_dir.", | ||||
description= | ||||
"""List all available clusters, by cluster directory, that can | ||||
be found in the current working directly or in the ipython | ||||
directory. Cluster directories are named using the convention | ||||
MinRK
|
r3672 | 'cluster_<profile>'.""" | ||
MinRK
|
r3605 | ) | ||
# The "create" subcommand parser | ||||
parser_create = subparsers.add_parser( | ||||
'create', | ||||
parents=[parent_parser1, parent_parser2], | ||||
argument_default=SUPPRESS, | ||||
help="Create a new cluster directory.", | ||||
description= | ||||
"""Create an ipython cluster directory by its profile name or | ||||
cluster directory path. Cluster directories contain | ||||
configuration, log and security related files and are named | ||||
MinRK
|
r3672 | using the convention 'cluster_<profile>'. By default they are | ||
MinRK
|
r3605 | located in your ipython directory. Once created, you will | ||
probably need to edit the configuration files in the cluster | ||||
directory to configure your cluster. Most users will create a | ||||
cluster directory by profile name, | ||||
MinRK
|
r3672 | 'ipcluster create -p mycluster', which will put the directory | ||
in '<ipython_dir>/cluster_mycluster'. | ||||
MinRK
|
r3605 | """ | ||
) | ||||
paa = parser_create.add_argument | ||||
paa('--reset-config', | ||||
dest='Global.reset_config', action='store_true', | ||||
help= | ||||
"""Recopy the default config files to the cluster directory. | ||||
You will loose any modifications you have made to these files.""") | ||||
# The "start" subcommand parser | ||||
parser_start = subparsers.add_parser( | ||||
'start', | ||||
parents=[parent_parser1, parent_parser2], | ||||
argument_default=SUPPRESS, | ||||
help="Start a cluster.", | ||||
description= | ||||
"""Start an ipython cluster by its profile name or cluster | ||||
directory. Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r3672 | 'cluster_<profile>' and should be creating using the 'start' | ||
MinRK
|
r3605 | subcommand of 'ipcluster'. If your cluster directory is in | ||
the cwd or the ipython directory, you can simply refer to it | ||||
MinRK
|
r3672 | using its profile name, 'ipcluster start -n 4 -p <profile>`, | ||
MinRK
|
r3605 | otherwise use the '--cluster-dir' option. | ||
""" | ||||
) | ||||
MinRK
|
r3615 | |||
MinRK
|
r3605 | paa = parser_start.add_argument | ||
paa('-n', '--number', | ||||
type=int, dest='Global.n', | ||||
help='The number of engines to start.', | ||||
metavar='Global.n') | ||||
paa('--clean-logs', | ||||
dest='Global.clean_logs', action='store_true', | ||||
help='Delete old log flies before starting.') | ||||
paa('--no-clean-logs', | ||||
dest='Global.clean_logs', action='store_false', | ||||
help="Don't delete old log flies before starting.") | ||||
paa('--daemon', | ||||
dest='Global.daemonize', action='store_true', | ||||
help='Daemonize the ipcluster program. This implies --log-to-file') | ||||
paa('--no-daemon', | ||||
dest='Global.daemonize', action='store_false', | ||||
help="Dont't daemonize the ipcluster program.") | ||||
MinRK
|
r3614 | paa('--delay', | ||
type=float, dest='Global.delay', | ||||
help="Specify the delay (in seconds) between starting the controller and starting the engine(s).") | ||||
MinRK
|
r3605 | |||
# The "stop" subcommand parser | ||||
parser_stop = subparsers.add_parser( | ||||
'stop', | ||||
parents=[parent_parser1, parent_parser2], | ||||
argument_default=SUPPRESS, | ||||
help="Stop a running cluster.", | ||||
description= | ||||
"""Stop a running ipython cluster by its profile name or cluster | ||||
directory. Cluster directories are named using the convention | ||||
MinRK
|
r3672 | 'cluster_<profile>'. If your cluster directory is in | ||
MinRK
|
r3605 | the cwd or the ipython directory, you can simply refer to it | ||
MinRK
|
r3672 | using its profile name, 'ipcluster stop -p <profile>`, otherwise | ||
MinRK
|
r3605 | use the '--cluster-dir' option. | ||
""" | ||||
) | ||||
paa = parser_stop.add_argument | ||||
paa('--signal', | ||||
dest='Global.signal', type=int, | ||||
help="The signal number to use in stopping the cluster (default=2).", | ||||
metavar="Global.signal") | ||||
MinRK
|
r3615 | |||
# the "engines" subcommand parser | ||||
parser_engines = subparsers.add_parser( | ||||
'engines', | ||||
parents=[parent_parser1, parent_parser2], | ||||
argument_default=SUPPRESS, | ||||
help="Attach some engines to an existing controller or cluster.", | ||||
description= | ||||
"""Start one or more engines to connect to an existing Cluster | ||||
by profile name or cluster directory. | ||||
Cluster directories contain configuration, log and | ||||
security related files and are named using the convention | ||||
MinRK
|
r3672 | 'cluster_<profile>' and should be creating using the 'start' | ||
MinRK
|
r3615 | subcommand of 'ipcluster'. If your cluster directory is in | ||
the cwd or the ipython directory, you can simply refer to it | ||||
MinRK
|
r3672 | using its profile name, 'ipcluster engines -n 4 -p <profile>`, | ||
MinRK
|
r3615 | otherwise use the '--cluster-dir' option. | ||
""" | ||||
) | ||||
paa = parser_engines.add_argument | ||||
paa('-n', '--number', | ||||
type=int, dest='Global.n', | ||||
help='The number of engines to start.', | ||||
metavar='Global.n') | ||||
paa('--daemon', | ||||
dest='Global.daemonize', action='store_true', | ||||
help='Daemonize the ipcluster program. This implies --log-to-file') | ||||
paa('--no-daemon', | ||||
dest='Global.daemonize', action='store_false', | ||||
help="Dont't daemonize the ipcluster program.") | ||||
MinRK
|
r3605 | |||
#----------------------------------------------------------------------------- | ||||
# Main application | ||||
#----------------------------------------------------------------------------- | ||||
class IPClusterApp(ApplicationWithClusterDir): | ||||
MinRK
|
r3672 | name = u'ipcluster' | ||
MinRK
|
r3605 | description = _description | ||
usage = None | ||||
command_line_loader = IPClusterAppConfigLoader | ||||
default_config_file_name = default_config_file_name | ||||
default_log_level = logging.INFO | ||||
auto_create_cluster_dir = False | ||||
def create_default_config(self): | ||||
super(IPClusterApp, self).create_default_config() | ||||
self.default_config.Global.controller_launcher = \ | ||||
MinRK
|
r3688 | 'IPython.parallel.apps.launcher.LocalControllerLauncher' | ||
MinRK
|
r3605 | self.default_config.Global.engine_launcher = \ | ||
MinRK
|
r3688 | 'IPython.parallel.apps.launcher.LocalEngineSetLauncher' | ||
MinRK
|
r3605 | self.default_config.Global.n = 2 | ||
MinRK
|
r3618 | self.default_config.Global.delay = 2 | ||
MinRK
|
r3605 | self.default_config.Global.reset_config = False | ||
self.default_config.Global.clean_logs = True | ||||
MinRK
|
r3615 | self.default_config.Global.signal = signal.SIGINT | ||
MinRK
|
r3605 | self.default_config.Global.daemonize = False | ||
def find_resources(self): | ||||
subcommand = self.command_line_config.Global.subcommand | ||||
if subcommand=='list': | ||||
self.list_cluster_dirs() | ||||
# Exit immediately because there is nothing left to do. | ||||
self.exit() | ||||
elif subcommand=='create': | ||||
self.auto_create_cluster_dir = True | ||||
super(IPClusterApp, self).find_resources() | ||||
elif subcommand=='start' or subcommand=='stop': | ||||
self.auto_create_cluster_dir = True | ||||
try: | ||||
super(IPClusterApp, self).find_resources() | ||||
except ClusterDirError: | ||||
raise ClusterDirError( | ||||
"Could not find a cluster directory. A cluster dir must " | ||||
MinRK
|
r3672 | "be created before running 'ipcluster start'. Do " | ||
"'ipcluster create -h' or 'ipcluster list -h' for more " | ||||
MinRK
|
r3605 | "information about creating and listing cluster dirs." | ||
) | ||||
MinRK
|
r3615 | elif subcommand=='engines': | ||
self.auto_create_cluster_dir = False | ||||
try: | ||||
super(IPClusterApp, self).find_resources() | ||||
except ClusterDirError: | ||||
raise ClusterDirError( | ||||
"Could not find a cluster directory. A cluster dir must " | ||||
MinRK
|
r3672 | "be created before running 'ipcluster start'. Do " | ||
"'ipcluster create -h' or 'ipcluster list -h' for more " | ||||
MinRK
|
r3615 | "information about creating and listing cluster dirs." | ||
) | ||||
MinRK
|
r3605 | |||
def list_cluster_dirs(self): | ||||
# Find the search paths | ||||
cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') | ||||
if cluster_dir_paths: | ||||
cluster_dir_paths = cluster_dir_paths.split(':') | ||||
else: | ||||
cluster_dir_paths = [] | ||||
try: | ||||
ipython_dir = self.command_line_config.Global.ipython_dir | ||||
except AttributeError: | ||||
ipython_dir = self.default_config.Global.ipython_dir | ||||
paths = [os.getcwd(), ipython_dir] + \ | ||||
cluster_dir_paths | ||||
paths = list(set(paths)) | ||||
self.log.info('Searching for cluster dirs in paths: %r' % paths) | ||||
for path in paths: | ||||
files = os.listdir(path) | ||||
for f in files: | ||||
full_path = os.path.join(path, f) | ||||
MinRK
|
r3672 | if os.path.isdir(full_path) and f.startswith('cluster_'): | ||
MinRK
|
r3605 | profile = full_path.split('_')[-1] | ||
MinRK
|
r3672 | start_cmd = 'ipcluster start -p %s -n 4' % profile | ||
MinRK
|
r3605 | print start_cmd + " ==> " + full_path | ||
def pre_construct(self): | ||||
# IPClusterApp.pre_construct() is where we cd to the working directory. | ||||
super(IPClusterApp, self).pre_construct() | ||||
config = self.master_config | ||||
try: | ||||
daemon = config.Global.daemonize | ||||
if daemon: | ||||
config.Global.log_to_file = True | ||||
except AttributeError: | ||||
pass | ||||
def construct(self): | ||||
config = self.master_config | ||||
subcmd = config.Global.subcommand | ||||
reset = config.Global.reset_config | ||||
if subcmd == 'list': | ||||
return | ||||
if subcmd == 'create': | ||||
self.log.info('Copying default config files to cluster directory ' | ||||
'[overwrite=%r]' % (reset,)) | ||||
self.cluster_dir_obj.copy_all_config_files(overwrite=reset) | ||||
if subcmd =='start': | ||||
self.cluster_dir_obj.copy_all_config_files(overwrite=False) | ||||
self.start_logging() | ||||
self.loop = ioloop.IOLoop.instance() | ||||
# reactor.callWhenRunning(self.start_launchers) | ||||
dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3615 | if subcmd == 'engines': | ||
self.start_logging() | ||||
self.loop = ioloop.IOLoop.instance() | ||||
# reactor.callWhenRunning(self.start_launchers) | ||||
engine_only = lambda : self.start_launchers(controller=False) | ||||
dc = ioloop.DelayedCallback(engine_only, 0, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3615 | def start_launchers(self, controller=True): | ||
MinRK
|
r3605 | config = self.master_config | ||
# Create the launchers. In both bases, we set the work_dir of | ||||
# the launcher to the cluster_dir. This is where the launcher's | ||||
# subprocesses will be launched. It is not where the controller | ||||
# and engine will be launched. | ||||
MinRK
|
r3615 | if controller: | ||
cl_class = import_item(config.Global.controller_launcher) | ||||
self.controller_launcher = cl_class( | ||||
work_dir=self.cluster_dir, config=config, | ||||
logname=self.log.name | ||||
) | ||||
# Setup the observing of stopping. If the controller dies, shut | ||||
# everything down as that will be completely fatal for the engines. | ||||
self.controller_launcher.on_stop(self.stop_launchers) | ||||
# But, we don't monitor the stopping of engines. An engine dying | ||||
# is just fine and in principle a user could start a new engine. | ||||
# Also, if we did monitor engine stopping, it is difficult to | ||||
# know what to do when only some engines die. Currently, the | ||||
# observing of engine stopping is inconsistent. Some launchers | ||||
# might trigger on a single engine stopping, other wait until | ||||
# all stop. TODO: think more about how to handle this. | ||||
MinRK
|
r3624 | else: | ||
self.controller_launcher = None | ||||
MinRK
|
r3615 | |||
MinRK
|
r3605 | el_class = import_item(config.Global.engine_launcher) | ||
self.engine_launcher = el_class( | ||||
MinRK
|
r3610 | work_dir=self.cluster_dir, config=config, logname=self.log.name | ||
MinRK
|
r3605 | ) | ||
# Setup signals | ||||
signal.signal(signal.SIGINT, self.sigint_handler) | ||||
# Start the controller and engines | ||||
self._stopping = False # Make sure stop_launchers is not called 2x. | ||||
MinRK
|
r3615 | if controller: | ||
self.start_controller() | ||||
dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop) | ||||
MinRK
|
r3614 | dc.start() | ||
MinRK
|
r3605 | self.startup_message() | ||
def startup_message(self, r=None): | ||||
MinRK
|
r3610 | self.log.info("IPython cluster: started") | ||
MinRK
|
r3605 | return r | ||
def start_controller(self, r=None): | ||||
MinRK
|
r3610 | # self.log.info("In start_controller") | ||
MinRK
|
r3605 | config = self.master_config | ||
d = self.controller_launcher.start( | ||||
cluster_dir=config.Global.cluster_dir | ||||
) | ||||
return d | ||||
def start_engines(self, r=None): | ||||
MinRK
|
r3610 | # self.log.info("In start_engines") | ||
MinRK
|
r3605 | config = self.master_config | ||
MinRK
|
r3615 | |||
MinRK
|
r3605 | d = self.engine_launcher.start( | ||
config.Global.n, | ||||
cluster_dir=config.Global.cluster_dir | ||||
) | ||||
return d | ||||
def stop_controller(self, r=None): | ||||
MinRK
|
r3610 | # self.log.info("In stop_controller") | ||
MinRK
|
r3624 | if self.controller_launcher and self.controller_launcher.running: | ||
MinRK
|
r3605 | return self.controller_launcher.stop() | ||
def stop_engines(self, r=None): | ||||
MinRK
|
r3610 | # self.log.info("In stop_engines") | ||
MinRK
|
r3605 | if self.engine_launcher.running: | ||
d = self.engine_launcher.stop() | ||||
# d.addErrback(self.log_err) | ||||
return d | ||||
else: | ||||
return None | ||||
def log_err(self, f): | ||||
MinRK
|
r3610 | self.log.error(f.getTraceback()) | ||
MinRK
|
r3605 | return None | ||
def stop_launchers(self, r=None): | ||||
if not self._stopping: | ||||
self._stopping = True | ||||
# if isinstance(r, failure.Failure): | ||||
MinRK
|
r3610 | # self.log.error('Unexpected error in ipcluster:') | ||
# self.log.info(r.getTraceback()) | ||||
self.log.error("IPython cluster: stopping") | ||||
MinRK
|
r3605 | # These return deferreds. We are not doing anything with them | ||
# but we are holding refs to them as a reminder that they | ||||
# do return deferreds. | ||||
d1 = self.stop_engines() | ||||
d2 = self.stop_controller() | ||||
# Wait a few seconds to let things shut down. | ||||
dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop) | ||||
dc.start() | ||||
# reactor.callLater(4.0, reactor.stop) | ||||
def sigint_handler(self, signum, frame): | ||||
self.stop_launchers() | ||||
def start_logging(self): | ||||
# Remove old log files of the controller and engine | ||||
if self.master_config.Global.clean_logs: | ||||
log_dir = self.master_config.Global.log_dir | ||||
for f in os.listdir(log_dir): | ||||
MinRK
|
r3613 | if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): | ||
os.remove(os.path.join(log_dir, f)) | ||||
# This will remove old log files for ipcluster itself | ||||
MinRK
|
r3605 | super(IPClusterApp, self).start_logging() | ||
def start_app(self): | ||||
"""Start the application, depending on what subcommand is used.""" | ||||
subcmd = self.master_config.Global.subcommand | ||||
if subcmd=='create' or subcmd=='list': | ||||
return | ||||
elif subcmd=='start': | ||||
self.start_app_start() | ||||
elif subcmd=='stop': | ||||
self.start_app_stop() | ||||
MinRK
|
r3615 | elif subcmd=='engines': | ||
self.start_app_engines() | ||||
MinRK
|
r3605 | |||
def start_app_start(self): | ||||
"""Start the app for the start subcommand.""" | ||||
config = self.master_config | ||||
# First see if the cluster is already running | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
pass | ||||
else: | ||||
MinRK
|
r3846 | if self.check_pid(pid): | ||
self.log.critical( | ||||
'Cluster is already running with [pid=%s]. ' | ||||
'use "ipcluster stop" to stop the cluster.' % pid | ||||
) | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STARTED) | ||||
else: | ||||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
# Now log and daemonize | ||||
self.log.info( | ||||
MinRK
|
r3672 | 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize | ||
MinRK
|
r3605 | ) | ||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
if config.Global.daemonize: | ||||
if os.name=='posix': | ||||
from twisted.scripts._twistd_unix import daemonize | ||||
daemonize() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
MinRK
|
r3624 | except KeyboardInterrupt: | ||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r3846 | finally: | ||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
MinRK
|
r3615 | def start_app_engines(self): | ||
"""Start the app for the start subcommand.""" | ||||
config = self.master_config | ||||
# First see if the cluster is already running | ||||
# Now log and daemonize | ||||
self.log.info( | ||||
'Starting engines with [daemon=%r]' % config.Global.daemonize | ||||
) | ||||
# TODO: Get daemonize working on Windows or as a Windows Server. | ||||
if config.Global.daemonize: | ||||
if os.name=='posix': | ||||
from twisted.scripts._twistd_unix import daemonize | ||||
daemonize() | ||||
# Now write the new pid file AFTER our new forked pid is active. | ||||
# self.write_pid_file() | ||||
try: | ||||
self.loop.start() | ||||
MinRK
|
r3624 | except KeyboardInterrupt: | ||
pass | ||||
except zmq.ZMQError as e: | ||||
if e.errno == errno.EINTR: | ||||
pass | ||||
else: | ||||
raise | ||||
MinRK
|
r3615 | # self.remove_pid_file() | ||
MinRK
|
r3605 | def start_app_stop(self): | ||
"""Start the app for the stop subcommand.""" | ||||
config = self.master_config | ||||
try: | ||||
pid = self.get_pid_from_file() | ||||
except PIDFileError: | ||||
self.log.critical( | ||||
MinRK
|
r3846 | 'Could not read pid file, cluster is probably not running.' | ||
MinRK
|
r3605 | ) | ||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
MinRK
|
r3846 | self.remove_pid_file() | ||
MinRK
|
r3605 | self.exit(ALREADY_STOPPED) | ||
MinRK
|
r3846 | |||
if not self.check_pid(pid): | ||||
self.log.critical( | ||||
'Cluster [pid=%r] is not running.' % pid | ||||
) | ||||
self.remove_pid_file() | ||||
# Here I exit with a unusual exit status that other processes | ||||
# can watch for to learn how I existed. | ||||
self.exit(ALREADY_STOPPED) | ||||
elif os.name=='posix': | ||||
sig = config.Global.signal | ||||
self.log.info( | ||||
"Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) | ||||
) | ||||
try: | ||||
MinRK
|
r3605 | os.kill(pid, sig) | ||
MinRK
|
r3846 | except OSError: | ||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
MinRK
|
r3605 | self.remove_pid_file() | ||
MinRK
|
r3846 | elif os.name=='nt': | ||
try: | ||||
# kill the whole tree | ||||
p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE) | ||||
except (CalledProcessError, OSError): | ||||
self.log.error("Stopping cluster failed, assuming already dead.", | ||||
exc_info=True) | ||||
self.remove_pid_file() | ||||
MinRK
|
r3605 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython cluster.""" | ||||
app = IPClusterApp() | ||||
app.start() | ||||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||