##// END OF EJS Templates
* Fixed heartbeat thread not stopping cleanly....
* Fixed heartbeat thread not stopping cleanly. * Fixed typo in kernel_restart that led to errors in some cases.

File last commit:

r2511:f5df2b16
r2915:c57f5115
Show More
ipengineapp.py
242 lines | 8.4 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import os
import sys
from twisted.application import service
from twisted.internet import reactor
from twisted.python import log
from IPython.kernel.clusterdir import (
ApplicationWithClusterDir,
ClusterDirConfigLoader
)
from IPython.kernel.engineconnector import EngineConnector
from IPython.kernel.engineservice import EngineService
from IPython.kernel.fcutil import Tub
from IPython.utils.importstring import import_item
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
default_config_file_name = u'ipengine_config.py'
mpi4py_init = """from mpi4py import MPI as mpi
mpi.size = mpi.COMM_WORLD.Get_size()
mpi.rank = mpi.COMM_WORLD.Get_rank()
"""
pytrilinos_init = """from PyTrilinos import Epetra
class SimpleStruct:
pass
mpi = SimpleStruct()
mpi.rank = 0
mpi.size = 0
"""
_description = """Start an IPython engine for parallel computing.\n\n
IPython engines run in parallel and perform computations on behalf of a client
and controller. A controller needs to be started before the engines. The
engine can be configured using command line options or using a cluster
directory. Cluster directories contain config, log and security files and are
usually located in your .ipython directory and named as "cluster_<profile>".
See the --profile and --cluster-dir options for details.
"""
#-----------------------------------------------------------------------------
# Command line options
#-----------------------------------------------------------------------------
class IPEngineAppConfigLoader(ClusterDirConfigLoader):
def _add_arguments(self):
super(IPEngineAppConfigLoader, self)._add_arguments()
paa = self.parser.add_argument
# Controller config
paa('--furl-file',
type=unicode, dest='Global.furl_file',
help='The full location of the file containing the FURL of the '
'controller. If this is not given, the FURL file must be in the '
'security directory of the cluster directory. This location is '
'resolved using the --profile and --app-dir options.',
metavar='Global.furl_file')
# MPI
paa('--mpi',
type=str, dest='MPI.use',
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
metavar='MPI.use')
# Global config
paa('--log-to-file',
action='store_true', dest='Global.log_to_file',
help='Log to a file in the log directory (default is stdout)')
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
class IPEngineApp(ApplicationWithClusterDir):
name = u'ipengine'
description = _description
command_line_loader = IPEngineAppConfigLoader
default_config_file_name = default_config_file_name
auto_create_cluster_dir = True
def create_default_config(self):
super(IPEngineApp, self).create_default_config()
# The engine should not clean logs as we don't want to remove the
# active log files of other running engines.
self.default_config.Global.clean_logs = False
# Global config attributes
self.default_config.Global.exec_lines = []
self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
# Configuration related to the controller
# This must match the filename (path not included) that the controller
# used for the FURL file.
self.default_config.Global.furl_file_name = u'ipcontroller-engine.furl'
# If given, this is the actual location of the controller's FURL file.
# If not, this is computed using the profile, app_dir and furl_file_name
self.default_config.Global.furl_file = u''
# The max number of connection attemps and the initial delay between
# those attemps.
self.default_config.Global.connect_delay = 0.1
self.default_config.Global.connect_max_tries = 15
# MPI related config attributes
self.default_config.MPI.use = ''
self.default_config.MPI.mpi4py = mpi4py_init
self.default_config.MPI.pytrilinos = pytrilinos_init
def post_load_command_line_config(self):
pass
def pre_construct(self):
super(IPEngineApp, self).pre_construct()
self.find_cont_furl_file()
def find_cont_furl_file(self):
"""Set the furl file.
Here we don't try to actually see if it exists for is valid as that
is hadled by the connection logic.
"""
config = self.master_config
# Find the actual controller FURL file
if not config.Global.furl_file:
try_this = os.path.join(
config.Global.cluster_dir,
config.Global.security_dir,
config.Global.furl_file_name
)
config.Global.furl_file = try_this
def construct(self):
# This is the working dir by now.
sys.path.insert(0, '')
self.start_mpi()
self.start_logging()
# Create the underlying shell class and EngineService
shell_class = import_item(self.master_config.Global.shell_class)
self.engine_service = EngineService(shell_class, mpi=mpi)
self.exec_lines()
# Create the service hierarchy
self.main_service = service.MultiService()
self.engine_service.setServiceParent(self.main_service)
self.tub_service = Tub()
self.tub_service.setServiceParent(self.main_service)
# This needs to be called before the connection is initiated
self.main_service.startService()
# This initiates the connection to the controller and calls
# register_engine to tell the controller we are ready to do work
self.engine_connector = EngineConnector(self.tub_service)
log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
reactor.callWhenRunning(self.call_connect)
def call_connect(self):
d = self.engine_connector.connect_to_controller(
self.engine_service,
self.master_config.Global.furl_file,
self.master_config.Global.connect_delay,
self.master_config.Global.connect_max_tries
)
def handle_error(f):
log.msg('Error connecting to controller. This usually means that '
'i) the controller was not started, ii) a firewall was blocking '
'the engine from connecting to the controller or iii) the engine '
' was not pointed at the right FURL file:')
log.msg(f.getErrorMessage())
reactor.callLater(0.1, reactor.stop)
d.addErrback(handle_error)
def start_mpi(self):
global mpi
mpikey = self.master_config.MPI.use
mpi_import_statement = self.master_config.MPI.get(mpikey, None)
if mpi_import_statement is not None:
try:
self.log.info("Initializing MPI:")
self.log.info(mpi_import_statement)
exec mpi_import_statement in globals()
except:
mpi = None
else:
mpi = None
def exec_lines(self):
for line in self.master_config.Global.exec_lines:
try:
log.msg("Executing statement: '%s'" % line)
self.engine_service.execute(line)
except:
log.msg("Error executing statement: %s" % line)
def start_app(self):
reactor.run()
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPEngineApp()
app.start()
if __name__ == '__main__':
launch_new_instance()