##// END OF EJS Templates
Merge branch 'stdin'...
Merge branch 'stdin' Changes stdin channel from REQ-REQ to ROUTER-DEALER, fixing the round-robin load-balancing of stdin_requests across frontends. stdin_requests now go to the client that made the execute_request that prompted the stdin request. stdin_requests from frontends that do not support stdin will raise an error, rather than hanging on input that will never arrive. reviewed by @fperez closes #673

File last commit:

r4862:afbc4511
r4954:6e92ffd9 merge
Show More
baseapp.py
261 lines | 9.1 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 # encoding: utf-8
"""
MinRK
update recently changed modules with Authors in docstring
r4018 The Base Application class for IPython.parallel apps
Authors:
* Brian Granger
* Min RK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
MinRK
resort imports in a cleaner order
r3631 import logging
import re
MinRK
Refactor newparallel to use Config system...
r3604 import sys
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846 from subprocess import Popen, PIPE
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.core import release
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.core.crashhandler import CrashHandler
MinRK
rename core.newapplication -> core.application
r4023 from IPython.core.application import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.utils.path import expand_path
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 class ParallelCrashHandler(CrashHandler):
MinRK
Refactor newparallel to use Config system...
r3604 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 contact_name = release.authors['Min'][0]
contact_email = release.authors['Min'][1]
MinRK
Refactor newparallel to use Config system...
r3604 bug_tracker = 'http://github.com/ipython/ipython/issues'
MinRK
update parallel apps to use ProfileDir
r3992 super(ParallelCrashHandler,self).__init__(
MinRK
Refactor newparallel to use Config system...
r3604 app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 'profile-dir' : 'ProfileDir.location',
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
MinRK
add cluster_id to parallel apps...
r4847 'cluster-id' : 'BaseParallelApplication.cluster_id',
MinRK
update parallel apps to use ProfileDir
r3992 })
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
base_flags = {
MinRK
remove uneccesary Config objects from flags.
r3994 'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 }
MinRK
update parallel apps to use ProfileDir
r3992 base_flags.update(base_ip_flags)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
update parallel apps to use ProfileDir
r3992 class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
MinRK
Refactor newparallel to use Config system...
r3604 """
MinRK
update parallel apps to use ProfileDir
r3992 crash_handler_class = ParallelCrashHandler
MinRK
ipcluster implemented with new subcommands
r3986
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 clean_logs = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="whether to cleanup old logfiles before starting")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 log_url = Unicode('', config=True,
MinRK
cleanup parallel traits...
r3988 help="The ZMQ URL of the iplogger to aggregate logging.")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add cluster_id to parallel apps...
r4847 cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
MinRK
parallel.apps cleanup per review...
r4850 using multiple clusters with a single profile simultaneously.
MinRK
add cluster_id to parallel apps...
r4847
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
MinRK
parallel.apps cleanup per review...
r4850
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
MinRK
add cluster_id to parallel apps...
r4847 """
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
MinRK
update parallel apps to use ProfileDir
r3992 def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
MinRK
ipcluster implemented with new subcommands
r3986
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
aliases = Dict(base_aliases)
flags = Dict(base_flags)
MinRK
ipcluster implemented with new subcommands
r3986
def initialize(self, argv=None):
"""initialize the app"""
MinRK
update parallel apps to use ProfileDir
r3992 super(BaseParallelApplication, self).initialize(argv)
MinRK
ipcluster implemented with new subcommands
r3986 self.to_work_dir()
MinRK
re-enable log forwarding and iplogger
r3989 self.reinit_logging()
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604 def to_work_dir(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 wd = self.work_dir
if unicode(wd) != os.getcwdu():
MinRK
Refactor newparallel to use Config system...
r3604 os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
MinRK
re-enable log forwarding and iplogger
r3989 # This is the working dir by now.
sys.path.insert(0, '')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
ipcluster implemented with new subcommands
r3986 def reinit_logging(self):
# Remove old log files
MinRK
update parallel apps to use ProfileDir
r3992 log_dir = self.profile_dir.log_dir
MinRK
ipcluster implemented with new subcommands
r3986 if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
os.remove(os.path.join(log_dir, f))
if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
self.log.removeHandler(self._log_handler)
self._log_handler = logging.StreamHandler(open_log_file)
self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
self._log_handler.setFormatter(self._log_formatter)
self.log.addHandler(self._log_handler)
MinRK
IPython.parallel logging cleanup...
r4506 # do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
MinRK
Refactor newparallel to use Config system...
r3604
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
MinRK
raise PIDFileError on empty PID file, not ValueError in baseapp
r4032 s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
MinRK
Refactor newparallel to use Config system...
r3604 return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846
def check_pid(self, pid):
if os.name == 'nt':
try:
import ctypes
# returns 0 if no such process (of ours) exists
# positive int otherwise
p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running via `OpenProcess`. "
" Making the likely assumption that it is."%pid
)
return True
return bool(p)
else:
try:
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
output,_ = p.communicate()
except OSError:
self.log.warn(
"Could not determine whether pid %i is running via `ps x`. "
" Making the likely assumption that it is."%pid
)
return True
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
return pid in pids