##// END OF EJS Templates
Fix race condition in javascript kernel message processing...
Fix race condition in javascript kernel message processing Because the binary messages are now deserialized using the asynchronous FileReader API, we need to have some way to force the messages to still be processed in the order they are received. This patch implements a simple processing queue using promises.

File last commit:

r18119:8da95fea
r20441:834cd9c4
Show More
baseapp.py
242 lines | 8.5 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
The Base Application class for IPython.parallel apps
"""
import os
import logging
import re
import sys
from IPython.config.application import catch_config_error, LevelFormatter
from IPython.core import release
from IPython.core.crashhandler import CrashHandler
from IPython.core.application import (
BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
)
from IPython.utils.path import expand_path
from IPython.utils.process import check_pid
from IPython.utils import py3compat
from IPython.utils.py3compat import unicode_type
from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
class ParallelCrashHandler(CrashHandler):
"""sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
contact_name = release.authors['Min'][0]
contact_email = release.author_email
bug_tracker = 'https://github.com/ipython/ipython/issues'
super(ParallelCrashHandler,self).__init__(
app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
'cluster-id' : 'BaseParallelApplication.cluster_id',
})
base_flags = {
'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
}
base_flags.update(base_ip_flags)
class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
"""
crash_handler_class = ParallelCrashHandler
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
def _log_format_default(self):
"""override default log format to include time"""
return u"%(asctime)s.%(msecs).03d [%(name)s]%(highlevel)s %(message)s"
work_dir = Unicode(py3compat.getcwd(), config=True,
help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode_type(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
clean_logs = Bool(False, config=True,
help="whether to cleanup old logfiles before starting")
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
using multiple clusters with a single profile simultaneously.
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
"""
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
aliases = Dict(base_aliases)
flags = Dict(base_flags)
@catch_config_error
def initialize(self, argv=None):
"""initialize the app"""
super(BaseParallelApplication, self).initialize(argv)
self.to_work_dir()
self.reinit_logging()
def to_work_dir(self):
wd = self.work_dir
if unicode_type(wd) != py3compat.getcwd():
os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
# This is the working dir by now.
sys.path.insert(0, '')
def reinit_logging(self):
# Remove old log files
log_dir = self.profile_dir.log_dir
if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
try:
os.remove(os.path.join(log_dir, f))
except (OSError, IOError):
# probably just conflict from sibling process
# already removing it
pass
if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
while self.log.handlers:
self.log.removeHandler(self.log.handlers[0])
self._log_handler = logging.StreamHandler(open_log_file)
self.log.addHandler(self._log_handler)
else:
self._log_handler = self.log.handlers[0]
# Add timestamps to log format:
self._log_formatter = LevelFormatter(self.log_format,
datefmt=self.log_datefmt)
self._log_handler.setFormatter(self._log_formatter)
# do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
def check_pid(self, pid):
try:
return check_pid(pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running. "
" Making the likely assumption that it is."%pid
)
return True