##// END OF EJS Templates
Merge pull request #1372 from minrk/reuse-cleanup...
Merge pull request #1372 from minrk/reuse-cleanup ipcontroller cleans up connection files unless reuse=True Connection files are not valid across sessions if reuse is False, but were previously preserved, which could cause engines to try to connect to the wrong ports if the Controller took too long to start and/or write new connection files. Also shuffles signal handling around a bit, so that a clean exit occurs, rather than calling sys.exit directly in the signal handler.

File last commit:

r5678:025436b5
r6080:c1c517a0 merge
Show More
baseapp.py
265 lines | 9.3 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
The Base Application class for IPython.parallel apps
Authors:
* Brian Granger
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
import logging
import re
import sys
from subprocess import Popen, PIPE
from IPython.config.application import catch_config_error
from IPython.core import release
from IPython.core.crashhandler import CrashHandler
from IPython.core.application import (
BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
)
from IPython.utils.path import expand_path
from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
class ParallelCrashHandler(CrashHandler):
"""sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
contact_name = release.authors['Min'][0]
contact_email = release.author_email
bug_tracker = 'https://github.com/ipython/ipython/issues'
super(ParallelCrashHandler,self).__init__(
app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
'profile-dir' : 'ProfileDir.location',
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
'cluster-id' : 'BaseParallelApplication.cluster_id',
})
base_flags = {
'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
}
base_flags.update(base_ip_flags)
class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
"""
crash_handler_class = ParallelCrashHandler
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
clean_logs = Bool(False, config=True,
help="whether to cleanup old logfiles before starting")
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
using multiple clusters with a single profile simultaneously.
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
"""
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
aliases = Dict(base_aliases)
flags = Dict(base_flags)
@catch_config_error
def initialize(self, argv=None):
"""initialize the app"""
super(BaseParallelApplication, self).initialize(argv)
self.to_work_dir()
self.reinit_logging()
def to_work_dir(self):
wd = self.work_dir
if unicode(wd) != os.getcwdu():
os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
# This is the working dir by now.
sys.path.insert(0, '')
def reinit_logging(self):
# Remove old log files
log_dir = self.profile_dir.log_dir
if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
os.remove(os.path.join(log_dir, f))
if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
self.log.removeHandler(self._log_handler)
self._log_handler = logging.StreamHandler(open_log_file)
self.log.addHandler(self._log_handler)
# Add timestamps to log format:
self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
self._log_handler.setFormatter(self._log_formatter)
# do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
def check_pid(self, pid):
if os.name == 'nt':
try:
import ctypes
# returns 0 if no such process (of ours) exists
# positive int otherwise
p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running via `OpenProcess`. "
" Making the likely assumption that it is."%pid
)
return True
return bool(p)
else:
try:
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
output,_ = p.communicate()
except OSError:
self.log.warn(
"Could not determine whether pid %i is running via `ps x`. "
" Making the likely assumption that it is."%pid
)
return True
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
return pid in pids