##// END OF EJS Templates
Merge pull request #1768 from minrk/parallelmagics...
Merge pull request #1768 from minrk/parallelmagics Update parallel magics They now display all output, so you can do parallel plotting or other actions with complex display. The `px` magic has now both line and cell modes, and in cell mode finer control has been added about how to collate output from multiple engines. Tests, docs and example notebook added.

File last commit:

r6884:9fc6597a
r7060:a1360828 merge
Show More
baseapp.py
272 lines | 9.6 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""
The Base Application class for IPython.parallel apps
Authors:
* Brian Granger
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
import logging
import re
import sys
from subprocess import Popen, PIPE
from IPython.config.application import catch_config_error
from IPython.core import release
from IPython.core.crashhandler import CrashHandler
from IPython.core.application import (
BaseIPythonApplication,
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
)
from IPython.utils.path import expand_path
from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
#-----------------------------------------------------------------------------
# Module errors
#-----------------------------------------------------------------------------
class PIDFileError(Exception):
pass
#-----------------------------------------------------------------------------
# Crash handler for this application
#-----------------------------------------------------------------------------
class ParallelCrashHandler(CrashHandler):
"""sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
contact_name = release.authors['Min'][0]
contact_email = release.author_email
bug_tracker = 'https://github.com/ipython/ipython/issues'
super(ParallelCrashHandler,self).__init__(
app, contact_name, contact_email, bug_tracker
)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
base_aliases = {}
base_aliases.update(base_ip_aliases)
base_aliases.update({
'profile-dir' : 'ProfileDir.location',
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
'cluster-id' : 'BaseParallelApplication.cluster_id',
})
base_flags = {
'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
)
}
base_flags.update(base_ip_flags)
class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
"""
crash_handler_class = ParallelCrashHandler
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
def _log_format_default(self):
"""override default log format to include time"""
return u"%(asctime)s.%(msecs).03d [%(name)s] %(message)s"
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
)
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
clean_logs = Bool(False, config=True,
help="whether to cleanup old logfiles before starting")
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
using multiple clusters with a single profile simultaneously.
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
"""
)
def _cluster_id_changed(self, name, old, new):
self.name = self.__class__.name
if new:
self.name += '-%s'%new
def _config_files_default(self):
return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
aliases = Dict(base_aliases)
flags = Dict(base_flags)
@catch_config_error
def initialize(self, argv=None):
"""initialize the app"""
super(BaseParallelApplication, self).initialize(argv)
self.to_work_dir()
self.reinit_logging()
def to_work_dir(self):
wd = self.work_dir
if unicode(wd) != os.getcwdu():
os.chdir(wd)
self.log.info("Changing to working dir: %s" % wd)
# This is the working dir by now.
sys.path.insert(0, '')
def reinit_logging(self):
# Remove old log files
log_dir = self.profile_dir.log_dir
if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
os.remove(os.path.join(log_dir, f))
if self.log_to_file:
# Start logging to the new log file
log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
else:
open_log_file = None
if open_log_file is not None:
while self.log.handlers:
self.log.removeHandler(self.log.handlers[0])
self._log_handler = logging.StreamHandler(open_log_file)
self.log.addHandler(self._log_handler)
else:
self._log_handler = self.log.handlers[0]
# Add timestamps to log format:
self._log_formatter = logging.Formatter(self.log_format,
datefmt="%Y-%m-%d %H:%M:%S")
self._log_handler.setFormatter(self._log_formatter)
# do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
)
with open(pid_file, 'w') as f:
self.log.info("Creating pid file: %s" % pid_file)
f.write(repr(os.getpid())+'\n')
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
``None``.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
try:
self.log.info("Removing pid file: %s" % pid_file)
os.remove(pid_file)
except:
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
"""
pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
s = f.read().strip()
try:
pid = int(s)
except:
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
return pid
else:
raise PIDFileError('pid file not found: %s' % pid_file)
def check_pid(self, pid):
if os.name == 'nt':
try:
import ctypes
# returns 0 if no such process (of ours) exists
# positive int otherwise
p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
except Exception:
self.log.warn(
"Could not determine whether pid %i is running via `OpenProcess`. "
" Making the likely assumption that it is."%pid
)
return True
return bool(p)
else:
try:
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
output,_ = p.communicate()
except OSError:
self.log.warn(
"Could not determine whether pid %i is running via `ps x`. "
" Making the likely assumption that it is."%pid
)
return True
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
return pid in pids