##// END OF EJS Templates
Add fsync() call in atomic_writing
Add fsync() call in atomic_writing

File last commit:

r17182:21dbb4b9
r17595:929c34a7
Show More
baseapp.py
260 lines | 9.1 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 # encoding: utf-8
"""
MinRK
update recently changed modules with Authors in docstring
r4018 The Base Application class for IPython.parallel apps
Authors:
* Brian Granger
* Min RK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import os
MinRK
resort imports in a cleaner order
r3631 import logging
import re
MinRK
Refactor newparallel to use Config system...
r3604 import sys
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846 from subprocess import Popen, PIPE
MinRK
use LevelFormatter in parallel apps
r10561 from IPython.config.application import catch_config_error, LevelFormatter
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.core import release
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.core.crashhandler import CrashHandler
MinRK
rename core.newapplication -> core.application
r4023 from IPython.core.application import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.utils.path import expand_path
Thomas Kluyver
Use check_pid from utils in IPython.parallel
r17182 from IPython.utils.process import check_pid
Thomas Kluyver
Python 3 compatibility for os.getcwdu()
r13447 from IPython.utils import py3compat
Thomas Kluyver
Replace references to unicode and basestring
r13353 from IPython.utils.py3compat import unicode_type
MinRK
update parallel apps to use ProfileDir
r3992
Thomas Kluyver
Replace references to unicode and basestring
r13353 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 class ParallelCrashHandler(CrashHandler):
MinRK
Refactor newparallel to use Config system...
r3604 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 contact_name = release.authors['Min'][0]
MinRK
use ipython-dev as the email address for crash reports
r5316 contact_email = release.author_email
bug_tracker = 'https://github.com/ipython/ipython/issues'
MinRK
update parallel apps to use ProfileDir
r3992 super(ParallelCrashHandler,self).__init__(
MinRK
Refactor newparallel to use Config system...
r3604 app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
MinRK
add cluster_id to parallel apps...
r4847 'cluster-id' : 'BaseParallelApplication.cluster_id',
MinRK
update parallel apps to use ProfileDir
r3992 })
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
base_flags = {
MinRK
remove uneccesary Config objects from flags.
r3994 'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 }
MinRK
update parallel apps to use ProfileDir
r3992 base_flags.update(base_ip_flags)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
update parallel apps to use ProfileDir
r3992 class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
MinRK
Refactor newparallel to use Config system...
r3604 """
MinRK
update parallel apps to use ProfileDir
r3992 crash_handler_class = ParallelCrashHandler
MinRK
ipcluster implemented with new subcommands
r3986
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
MinRK
Application.log_format is a configurable
r6884
def _log_format_default(self):
"""override default log format to include time"""
MinRK
use LevelFormatter in parallel apps
r10561 return u"%(asctime)s.%(msecs).03d [%(name)s]%(highlevel)s %(message)s"
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
Thomas Kluyver
Python 3 compatibility for os.getcwdu()
r13447 work_dir = Unicode(py3compat.getcwd(), config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
Thomas Kluyver
Replace references to unicode and basestring
r13353 self.work_dir = unicode_type(expand_path(new))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
log_to_file = Bool(config=True,
help="whether to log to a file")
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 clean_logs = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="whether to cleanup old logfiles before starting")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 log_url = Unicode('', config=True,
MinRK
cleanup parallel traits...
r3988 help="The ZMQ URL of the iplogger to aggregate logging.")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add cluster_id to parallel apps...
r4847 cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
MinRK
parallel.apps cleanup per review...
r4850 using multiple clusters with a single profile simultaneously.
MinRK
add cluster_id to parallel apps...
r4847
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
MinRK
parallel.apps cleanup per review...
r4850
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
MinRK
add cluster_id to parallel apps...
r4847 """
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
MinRK
update parallel apps to use ProfileDir
r3992 def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
MinRK
ipcluster implemented with new subcommands
r3986
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
aliases = Dict(base_aliases)
flags = Dict(base_flags)
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
catch_config -> catch_config_error
r5214 @catch_config_error
MinRK
ipcluster implemented with new subcommands
r3986 def initialize(self, argv=None):
"""initialize the app"""
MinRK
update parallel apps to use ProfileDir
r3992 super(BaseParallelApplication, self).initialize(argv)
MinRK
ipcluster implemented with new subcommands
r3986 self.to_work_dir()
MinRK
re-enable log forwarding and iplogger
r3989 self.reinit_logging()
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604 def to_work_dir(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 wd = self.work_dir
Thomas Kluyver
Python 3 compatibility for os.getcwdu()
r13447 if unicode_type(wd) != py3compat.getcwd():
MinRK
Refactor newparallel to use Config system...
r3604 os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
MinRK
re-enable log forwarding and iplogger
r3989 # This is the working dir by now.
sys.path.insert(0, '')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
ipcluster implemented with new subcommands
r3986 def reinit_logging(self):
# Remove old log files
MinRK
update parallel apps to use ProfileDir
r3992 log_dir = self.profile_dir.log_dir
MinRK
ipcluster implemented with new subcommands
r3986 if self.clean_logs:
for f in os.listdir(log_dir):
MinRK
don't let log cleanup errors prevent engine start...
r8484 if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
try:
os.remove(os.path.join(log_dir, f))
except (OSError, IOError):
# probably just conflict from sibling process
# already removing it
pass
MinRK
ipcluster implemented with new subcommands
r3986 if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
MinRK
move default log setup to _log_default from init_logging...
r6883 while self.log.handlers:
self.log.removeHandler(self.log.handlers[0])
MinRK
ipcluster implemented with new subcommands
r3986 self._log_handler = logging.StreamHandler(open_log_file)
self.log.addHandler(self._log_handler)
MinRK
move default log setup to _log_default from init_logging...
r6883 else:
self._log_handler = self.log.handlers[0]
MinRK
add timestamps to parallel app log output...
r5678 # Add timestamps to log format:
MinRK
use LevelFormatter in parallel apps
r10561 self._log_formatter = LevelFormatter(self.log_format,
datefmt=self.log_datefmt)
MinRK
add timestamps to parallel app log output...
r5678 self._log_handler.setFormatter(self._log_formatter)
MinRK
IPython.parallel logging cleanup...
r4506 # do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
MinRK
Refactor newparallel to use Config system...
r3604
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
MinRK
raise PIDFileError on empty PID file, not ValueError in baseapp
r4032 s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
MinRK
Refactor newparallel to use Config system...
r3604 return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846
def check_pid(self, pid):
Thomas Kluyver
Use check_pid from utils in IPython.parallel
r17182 try:
return check_pid(pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running. "
" Making the likely assumption that it is."%pid
)
return True