baseapp.py
277 lines
| 9.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | # encoding: utf-8 | ||
""" | ||||
MinRK
|
r4018 | The Base Application class for IPython.parallel apps | ||
Authors: | ||||
* Brian Granger | ||||
* Min RK | ||||
MinRK
|
r3604 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2008-2011 The IPython Development Team | ||
MinRK
|
r3604 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import with_statement | ||||
import os | ||||
MinRK
|
r3631 | import logging | ||
import re | ||||
MinRK
|
r3604 | import sys | ||
MinRK
|
r3846 | from subprocess import Popen, PIPE | ||
MinRK
|
r5214 | from IPython.config.application import catch_config_error | ||
MinRK
|
r3604 | from IPython.core import release | ||
MinRK
|
r3992 | from IPython.core.crashhandler import CrashHandler | ||
MinRK
|
r4023 | from IPython.core.application import ( | ||
MinRK
|
r3992 | BaseIPythonApplication, | ||
base_aliases as base_ip_aliases, | ||||
base_flags as base_ip_flags | ||||
MinRK
|
r3604 | ) | ||
MinRK
|
r3992 | from IPython.utils.path import expand_path | ||
MinRK
|
r3991 | from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List | ||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# Module errors | ||||
#----------------------------------------------------------------------------- | ||||
class PIDFileError(Exception): | ||||
pass | ||||
#----------------------------------------------------------------------------- | ||||
# Crash handler for this application | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3992 | class ParallelCrashHandler(CrashHandler): | ||
MinRK
|
r3604 | """sys.excepthook for IPython itself, leaves a detailed report on disk.""" | ||
def __init__(self, app): | ||||
MinRK
|
r3985 | contact_name = release.authors['Min'][0] | ||
MinRK
|
r5316 | contact_email = release.author_email | ||
bug_tracker = 'https://github.com/ipython/ipython/issues' | ||||
MinRK
|
r3992 | super(ParallelCrashHandler,self).__init__( | ||
MinRK
|
r3604 | app, contact_name, contact_email, bug_tracker | ||
) | ||||
#----------------------------------------------------------------------------- | ||||
# Main application | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3992 | base_aliases = {} | ||
base_aliases.update(base_ip_aliases) | ||||
base_aliases.update({ | ||||
MinRK
|
r4214 | 'profile-dir' : 'ProfileDir.location', | ||
'work-dir' : 'BaseParallelApplication.work_dir', | ||||
'log-to-file' : 'BaseParallelApplication.log_to_file', | ||||
'clean-logs' : 'BaseParallelApplication.clean_logs', | ||||
'log-url' : 'BaseParallelApplication.log_url', | ||||
MinRK
|
r4847 | 'cluster-id' : 'BaseParallelApplication.cluster_id', | ||
MinRK
|
r3992 | }) | ||
MinRK
|
r3985 | |||
base_flags = { | ||||
MinRK
|
r3994 | 'log-to-file' : ( | ||
{'BaseParallelApplication' : {'log_to_file' : True}}, | ||||
"send log output to a file" | ||||
) | ||||
MinRK
|
r3985 | } | ||
MinRK
|
r3992 | base_flags.update(base_ip_flags) | ||
MinRK
|
r3604 | |||
MinRK
|
r3992 | class BaseParallelApplication(BaseIPythonApplication): | ||
"""The base Application for IPython.parallel apps | ||||
Principle extensions to BaseIPyythonApplication: | ||||
* work_dir | ||||
* remote logging via pyzmq | ||||
* IOLoop instance | ||||
MinRK
|
r3604 | """ | ||
MinRK
|
r3992 | crash_handler_class = ParallelCrashHandler | ||
MinRK
|
r3986 | |||
def _log_level_default(self): | ||||
# temporarily override default_log_level to INFO | ||||
return logging.INFO | ||||
MinRK
|
r6884 | |||
def _log_format_default(self): | ||||
"""override default log format to include time""" | ||||
return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s" | ||||
MinRK
|
r3985 | |||
work_dir = Unicode(os.getcwdu(), config=True, | ||||
help='Set the working dir for the process.' | ||||
) | ||||
def _work_dir_changed(self, name, old, new): | ||||
self.work_dir = unicode(expand_path(new)) | ||||
log_to_file = Bool(config=True, | ||||
help="whether to log to a file") | ||||
MinRK
|
r3991 | clean_logs = Bool(False, config=True, | ||
MinRK
|
r3985 | help="whether to cleanup old logfiles before starting") | ||
MinRK
|
r3604 | |||
MinRK
|
r3991 | log_url = Unicode('', config=True, | ||
MinRK
|
r3988 | help="The ZMQ URL of the iplogger to aggregate logging.") | ||
MinRK
|
r3604 | |||
MinRK
|
r4847 | cluster_id = Unicode('', config=True, | ||
help="""String id to add to runtime files, to prevent name collisions when | ||||
MinRK
|
r4850 | using multiple clusters with a single profile simultaneously. | ||
MinRK
|
r4847 | |||
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json' | ||||
MinRK
|
r4850 | |||
Since this is text inserted into filenames, typical recommendations apply: | ||||
Simple character strings are ideal, and spaces are not recommended (but should | ||||
generally work). | ||||
MinRK
|
r4847 | """ | ||
) | ||||
def _cluster_id_changed(self, name, old, new): | ||||
self.name = self.__class__.name | ||||
if new: | ||||
self.name += '-%s'%new | ||||
MinRK
|
r3992 | def _config_files_default(self): | ||
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py'] | ||||
MinRK
|
r3986 | |||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
from zmq.eventloop.ioloop import IOLoop | ||||
return IOLoop.instance() | ||||
MinRK
|
r3985 | |||
aliases = Dict(base_aliases) | ||||
flags = Dict(base_flags) | ||||
MinRK
|
r3986 | |||
MinRK
|
r5214 | @catch_config_error | ||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
"""initialize the app""" | ||||
MinRK
|
r3992 | super(BaseParallelApplication, self).initialize(argv) | ||
MinRK
|
r3986 | self.to_work_dir() | ||
MinRK
|
r3989 | self.reinit_logging() | ||
MinRK
|
r3986 | |||
MinRK
|
r3604 | def to_work_dir(self): | ||
MinRK
|
r3985 | wd = self.work_dir | ||
if unicode(wd) != os.getcwdu(): | ||||
MinRK
|
r3604 | os.chdir(wd) | ||
self.log.info("Changing to working dir: %s" % wd) | ||||
MinRK
|
r3989 | # This is the working dir by now. | ||
sys.path.insert(0, '') | ||||
MinRK
|
r3604 | |||
MinRK
|
r3986 | def reinit_logging(self): | ||
# Remove old log files | ||||
MinRK
|
r3992 | log_dir = self.profile_dir.log_dir | ||
MinRK
|
r3986 | if self.clean_logs: | ||
for f in os.listdir(log_dir): | ||||
MinRK
|
r8556 | if re.match(r'%s-\d+\.(log|err|out)' % self.name, f): | ||
try: | ||||
os.remove(os.path.join(log_dir, f)) | ||||
except (OSError, IOError): | ||||
# probably just conflict from sibling process | ||||
# already removing it | ||||
pass | ||||
MinRK
|
r3986 | if self.log_to_file: | ||
# Start logging to the new log file | ||||
log_filename = self.name + u'-' + str(os.getpid()) + u'.log' | ||||
logfile = os.path.join(log_dir, log_filename) | ||||
open_log_file = open(logfile, 'w') | ||||
else: | ||||
open_log_file = None | ||||
if open_log_file is not None: | ||||
MinRK
|
r6883 | while self.log.handlers: | ||
self.log.removeHandler(self.log.handlers[0]) | ||||
MinRK
|
r3986 | self._log_handler = logging.StreamHandler(open_log_file) | ||
self.log.addHandler(self._log_handler) | ||||
MinRK
|
r6883 | else: | ||
self._log_handler = self.log.handlers[0] | ||||
MinRK
|
r5678 | # Add timestamps to log format: | ||
MinRK
|
r6884 | self._log_formatter = logging.Formatter(self.log_format, | ||
MinRK
|
r5678 | datefmt="%Y-%m-%d %H:%M:%S") | ||
self._log_handler.setFormatter(self._log_formatter) | ||||
MinRK
|
r4506 | # do not propagate log messages to root logger | ||
# ipcluster app will sometimes print duplicate messages during shutdown | ||||
# if this is 1 (default): | ||||
self.log.propagate = False | ||||
MinRK
|
r3604 | |||
def write_pid_file(self, overwrite=False): | ||||
"""Create a .pid file in the pid_dir with my pid. | ||||
This must be called after pre_construct, which sets `self.pid_dir`. | ||||
This raises :exc:`PIDFileError` if the pid file exists already. | ||||
""" | ||||
MinRK
|
r3992 | pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') | ||
MinRK
|
r3604 | if os.path.isfile(pid_file): | ||
pid = self.get_pid_from_file() | ||||
if not overwrite: | ||||
raise PIDFileError( | ||||
'The pid file [%s] already exists. \nThis could mean that this ' | ||||
'server is already running with [pid=%s].' % (pid_file, pid) | ||||
) | ||||
with open(pid_file, 'w') as f: | ||||
self.log.info("Creating pid file: %s" % pid_file) | ||||
f.write(repr(os.getpid())+'\n') | ||||
def remove_pid_file(self): | ||||
"""Remove the pid file. | ||||
This should be called at shutdown by registering a callback with | ||||
:func:`reactor.addSystemEventTrigger`. This needs to return | ||||
``None``. | ||||
""" | ||||
MinRK
|
r3992 | pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') | ||
MinRK
|
r3604 | if os.path.isfile(pid_file): | ||
try: | ||||
self.log.info("Removing pid file: %s" % pid_file) | ||||
os.remove(pid_file) | ||||
except: | ||||
self.log.warn("Error removing the pid file: %s" % pid_file) | ||||
def get_pid_from_file(self): | ||||
"""Get the pid from the pid file. | ||||
If the pid file doesn't exist a :exc:`PIDFileError` is raised. | ||||
""" | ||||
MinRK
|
r3992 | pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid') | ||
MinRK
|
r3604 | if os.path.isfile(pid_file): | ||
with open(pid_file, 'r') as f: | ||||
MinRK
|
r4032 | s = f.read().strip() | ||
try: | ||||
pid = int(s) | ||||
except: | ||||
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s)) | ||||
MinRK
|
r3604 | return pid | ||
else: | ||||
raise PIDFileError('pid file not found: %s' % pid_file) | ||||
MinRK
|
r3846 | |||
def check_pid(self, pid): | ||||
if os.name == 'nt': | ||||
try: | ||||
import ctypes | ||||
# returns 0 if no such process (of ours) exists | ||||
# positive int otherwise | ||||
p = ctypes.windll.kernel32.OpenProcess(1,0,pid) | ||||
except Exception: | ||||
self.log.warn( | ||||
"Could not determine whether pid %i is running via `OpenProcess`. " | ||||
" Making the likely assumption that it is."%pid | ||||
) | ||||
return True | ||||
return bool(p) | ||||
else: | ||||
try: | ||||
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE) | ||||
output,_ = p.communicate() | ||||
except OSError: | ||||
self.log.warn( | ||||
"Could not determine whether pid %i is running via `ps x`. " | ||||
" Making the likely assumption that it is."%pid | ||||
) | ||||
return True | ||||
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)) | ||||
return pid in pids | ||||