##// END OF EJS Templates
add retries flag to LoadBalancedView...
add retries flag to LoadBalancedView also add some lbv tests, and related fixes closes gh-412

File last commit:

r3688:ef471568
r3873:6549d091
Show More
ipengineapp.py
303 lines | 10.8 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
"""
The IPython engine application
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import json
import os
import sys
import zmq
from zmq.eventloop import ioloop
from IPython.parallel.apps.clusterdir import (
ApplicationWithClusterDir,
ClusterDirConfigLoader
)
from IPython.zmq.log import EnginePUBHandler
from IPython.parallel import factory
from IPython.parallel.engine.engine import EngineFactory
from IPython.parallel.engine.streamkernel import Kernel
from IPython.parallel.util import disambiguate_url
from IPython.utils.importstring import import_item
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
default_config_file_name = u'ipengine_config.py'
mpi4py_init = """from mpi4py import MPI as mpi
mpi.size = mpi.COMM_WORLD.Get_size()
mpi.rank = mpi.COMM_WORLD.Get_rank()
"""
pytrilinos_init = """from PyTrilinos import Epetra
class SimpleStruct:
pass
mpi = SimpleStruct()
mpi.rank = 0
mpi.size = 0
"""
_description = """Start an IPython engine for parallel computing.\n\n
IPython engines run in parallel and perform computations on behalf of a client
and controller. A controller needs to be started before the engines. The
engine can be configured using command line options or using a cluster
directory. Cluster directories contain config, log and security files and are
usually located in your ipython directory and named as "cluster_<profile>".
See the --profile and --cluster-dir options for details.
"""
#-----------------------------------------------------------------------------
# Command line options
#-----------------------------------------------------------------------------
class IPEngineAppConfigLoader(ClusterDirConfigLoader):
def _add_arguments(self):
super(IPEngineAppConfigLoader, self)._add_arguments()
paa = self.parser.add_argument
# Controller config
paa('--file', '-f',
type=unicode, dest='Global.url_file',
help='The full location of the file containing the connection information fo '
'controller. If this is not given, the file must be in the '
'security directory of the cluster directory. This location is '
'resolved using the --profile and --app-dir options.',
metavar='Global.url_file')
# MPI
paa('--mpi',
type=str, dest='MPI.use',
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
metavar='MPI.use')
# Global config
paa('--log-to-file',
action='store_true', dest='Global.log_to_file',
help='Log to a file in the log directory (default is stdout)')
paa('--log-url',
dest='Global.log_url',
help="url of ZMQ logger, as started with iploggerz")
# paa('--execkey',
# type=str, dest='Global.exec_key',
# help='path to a file containing an execution key.',
# metavar='keyfile')
# paa('--no-secure',
# action='store_false', dest='Global.secure',
# help='Turn off execution keys.')
# paa('--secure',
# action='store_true', dest='Global.secure',
# help='Turn on execution keys (default).')
# init command
paa('-c',
type=str, dest='Global.extra_exec_lines',
help='specify a command to be run at startup')
paa('-s',
type=unicode, dest='Global.extra_exec_file',
help='specify a script to be run at startup')
factory.add_session_arguments(self.parser)
factory.add_registration_arguments(self.parser)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
class IPEngineApp(ApplicationWithClusterDir):
name = u'ipengine'
description = _description
command_line_loader = IPEngineAppConfigLoader
default_config_file_name = default_config_file_name
auto_create_cluster_dir = True
def create_default_config(self):
super(IPEngineApp, self).create_default_config()
# The engine should not clean logs as we don't want to remove the
# active log files of other running engines.
self.default_config.Global.clean_logs = False
self.default_config.Global.secure = True
# Global config attributes
self.default_config.Global.exec_lines = []
self.default_config.Global.extra_exec_lines = ''
self.default_config.Global.extra_exec_file = u''
# Configuration related to the controller
# This must match the filename (path not included) that the controller
# used for the FURL file.
self.default_config.Global.url_file = u''
self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
# If given, this is the actual location of the controller's FURL file.
# If not, this is computed using the profile, app_dir and furl_file_name
# self.default_config.Global.key_file_name = u'exec_key.key'
# self.default_config.Global.key_file = u''
# MPI related config attributes
self.default_config.MPI.use = ''
self.default_config.MPI.mpi4py = mpi4py_init
self.default_config.MPI.pytrilinos = pytrilinos_init
def post_load_command_line_config(self):
pass
def pre_construct(self):
super(IPEngineApp, self).pre_construct()
# self.find_cont_url_file()
self.find_url_file()
if self.master_config.Global.extra_exec_lines:
self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
if self.master_config.Global.extra_exec_file:
enc = sys.getfilesystemencoding() or 'utf8'
cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
self.master_config.Global.exec_lines.append(cmd)
# def find_key_file(self):
# """Set the key file.
#
# Here we don't try to actually see if it exists for is valid as that
# is hadled by the connection logic.
# """
# config = self.master_config
# # Find the actual controller key file
# if not config.Global.key_file:
# try_this = os.path.join(
# config.Global.cluster_dir,
# config.Global.security_dir,
# config.Global.key_file_name
# )
# config.Global.key_file = try_this
def find_url_file(self):
"""Set the key file.
Here we don't try to actually see if it exists for is valid as that
is hadled by the connection logic.
"""
config = self.master_config
# Find the actual controller key file
if not config.Global.url_file:
try_this = os.path.join(
config.Global.cluster_dir,
config.Global.security_dir,
config.Global.url_file_name
)
config.Global.url_file = try_this
def construct(self):
# This is the working dir by now.
sys.path.insert(0, '')
config = self.master_config
# if os.path.exists(config.Global.key_file) and config.Global.secure:
# config.SessionFactory.exec_key = config.Global.key_file
if os.path.exists(config.Global.url_file):
with open(config.Global.url_file) as f:
d = json.loads(f.read())
for k,v in d.iteritems():
if isinstance(v, unicode):
d[k] = v.encode()
if d['exec_key']:
config.SessionFactory.exec_key = d['exec_key']
d['url'] = disambiguate_url(d['url'], d['location'])
config.RegistrationFactory.url=d['url']
config.EngineFactory.location = d['location']
config.Kernel.exec_lines = config.Global.exec_lines
self.start_mpi()
# Create the underlying shell class and EngineService
# shell_class = import_item(self.master_config.Global.shell_class)
try:
self.engine = EngineFactory(config=config, logname=self.log.name)
except:
self.log.error("Couldn't start the Engine", exc_info=True)
self.exit(1)
self.start_logging()
# Create the service hierarchy
# self.main_service = service.MultiService()
# self.engine_service.setServiceParent(self.main_service)
# self.tub_service = Tub()
# self.tub_service.setServiceParent(self.main_service)
# # This needs to be called before the connection is initiated
# self.main_service.startService()
# This initiates the connection to the controller and calls
# register_engine to tell the controller we are ready to do work
# self.engine_connector = EngineConnector(self.tub_service)
# self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
# reactor.callWhenRunning(self.call_connect)
def start_logging(self):
super(IPEngineApp, self).start_logging()
if self.master_config.Global.log_url:
context = self.engine.context
lsock = context.socket(zmq.PUB)
lsock.connect(self.master_config.Global.log_url)
handler = EnginePUBHandler(self.engine, lsock)
handler.setLevel(self.log_level)
self.log.addHandler(handler)
def start_mpi(self):
global mpi
mpikey = self.master_config.MPI.use
mpi_import_statement = self.master_config.MPI.get(mpikey, None)
if mpi_import_statement is not None:
try:
self.log.info("Initializing MPI:")
self.log.info(mpi_import_statement)
exec mpi_import_statement in globals()
except:
mpi = None
else:
mpi = None
def start_app(self):
self.engine.start()
try:
self.engine.loop.start()
except KeyboardInterrupt:
self.log.critical("Engine Interrupted, shutting down...\n")
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPEngineApp()
app.start()
if __name__ == '__main__':
launch_new_instance()