##// END OF EJS Templates
Merge pull request #1295 from minrk/btree...
Merge pull request #1295 from minrk/btree Add binary-tree engine interconnect example. This implements a parallel [all]reduce as used in traditional MapReduce scenarios; this is a useful example showing how the IPython.parallel tools can be configured with a different interconnect topology in addition to the default view of N engines connected to 1 controller in a simple star topology.

File last commit:

r5678:025436b5
r6662:eada8294 merge
Show More
baseapp.py
265 lines | 9.3 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 # encoding: utf-8
"""
MinRK
update recently changed modules with Authors in docstring
r4018 The Base Application class for IPython.parallel apps
Authors:
* Brian Granger
* Min RK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
MinRK
resort imports in a cleaner order
r3631 import logging
import re
MinRK
Refactor newparallel to use Config system...
r3604 import sys
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846 from subprocess import Popen, PIPE
MinRK
catch_config -> catch_config_error
r5214 from IPython.config.application import catch_config_error
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.core import release
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.core.crashhandler import CrashHandler
MinRK
rename core.newapplication -> core.application
r4023 from IPython.core.application import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.utils.path import expand_path
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 class ParallelCrashHandler(CrashHandler):
MinRK
Refactor newparallel to use Config system...
r3604 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 contact_name = release.authors['Min'][0]
MinRK
use ipython-dev as the email address for crash reports
r5316 contact_email = release.author_email
bug_tracker = 'https://github.com/ipython/ipython/issues'
MinRK
update parallel apps to use ProfileDir
r3992 super(ParallelCrashHandler,self).__init__(
MinRK
Refactor newparallel to use Config system...
r3604 app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
MinRK
update parallel apps to use ProfileDir
r3992 base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 'profile-dir' : 'ProfileDir.location',
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
MinRK
add cluster_id to parallel apps...
r4847 'cluster-id' : 'BaseParallelApplication.cluster_id',
MinRK
update parallel apps to use ProfileDir
r3992 })
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
base_flags = {
MinRK
remove uneccesary Config objects from flags.
r3994 'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 }
MinRK
update parallel apps to use ProfileDir
r3992 base_flags.update(base_ip_flags)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
update parallel apps to use ProfileDir
r3992 class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
MinRK
Refactor newparallel to use Config system...
r3604 """
MinRK
update parallel apps to use ProfileDir
r3992 crash_handler_class = ParallelCrashHandler
MinRK
ipcluster implemented with new subcommands
r3986
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 clean_logs = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="whether to cleanup old logfiles before starting")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 log_url = Unicode('', config=True,
MinRK
cleanup parallel traits...
r3988 help="The ZMQ URL of the iplogger to aggregate logging.")
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add cluster_id to parallel apps...
r4847 cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
MinRK
parallel.apps cleanup per review...
r4850 using multiple clusters with a single profile simultaneously.
MinRK
add cluster_id to parallel apps...
r4847
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
MinRK
parallel.apps cleanup per review...
r4850
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
MinRK
add cluster_id to parallel apps...
r4847 """
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
MinRK
update parallel apps to use ProfileDir
r3992 def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
MinRK
ipcluster implemented with new subcommands
r3986
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
aliases = Dict(base_aliases)
flags = Dict(base_flags)
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
catch_config -> catch_config_error
r5214 @catch_config_error
MinRK
ipcluster implemented with new subcommands
r3986 def initialize(self, argv=None):
"""initialize the app"""
MinRK
update parallel apps to use ProfileDir
r3992 super(BaseParallelApplication, self).initialize(argv)
MinRK
ipcluster implemented with new subcommands
r3986 self.to_work_dir()
MinRK
re-enable log forwarding and iplogger
r3989 self.reinit_logging()
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604 def to_work_dir(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 wd = self.work_dir
if unicode(wd) != os.getcwdu():
MinRK
Refactor newparallel to use Config system...
r3604 os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
MinRK
re-enable log forwarding and iplogger
r3989 # This is the working dir by now.
sys.path.insert(0, '')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
ipcluster implemented with new subcommands
r3986 def reinit_logging(self):
# Remove old log files
MinRK
update parallel apps to use ProfileDir
r3992 log_dir = self.profile_dir.log_dir
MinRK
ipcluster implemented with new subcommands
r3986 if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
os.remove(os.path.join(log_dir, f))
if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
self.log.removeHandler(self._log_handler)
self._log_handler = logging.StreamHandler(open_log_file)
self.log.addHandler(self._log_handler)
MinRK
add timestamps to parallel app log output...
r5678 # Add timestamps to log format:
self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
self._log_handler.setFormatter(self._log_formatter)
MinRK
IPython.parallel logging cleanup...
r4506 # do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
MinRK
Refactor newparallel to use Config system...
r3604
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
MinRK
update parallel apps to use ProfileDir
r3992 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
MinRK
Refactor newparallel to use Config system...
r3604 if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
MinRK
raise PIDFileError on empty PID file, not ValueError in baseapp
r4032 s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
MinRK
Refactor newparallel to use Config system...
r3604 return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
MinRK
add check_pid, and handle stale PID info in ipcluster....
r3846
def check_pid(self, pid):
if os.name == 'nt':
try:
import ctypes
# returns 0 if no such process (of ours) exists
# positive int otherwise
p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running via `OpenProcess`. "
" Making the likely assumption that it is."%pid
)
return True
return bool(p)
else:
try:
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
output,_ = p.communicate()
except OSError:
self.log.warn(
"Could not determine whether pid %i is running via `ps x`. "
" Making the likely assumption that it is."%pid
)
return True
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
return pid in pids