ipengineapp.py
261 lines
| 9.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython engine application | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import os | ||||
import sys | ||||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
from IPython.zmq.parallel.clusterdir import ( | ||||
ApplicationWithClusterDir, | ||||
ClusterDirConfigLoader | ||||
) | ||||
from IPython.zmq.log import EnginePUBHandler | ||||
from IPython.zmq.parallel import factory | ||||
from IPython.zmq.parallel.engine import EngineFactory | ||||
from IPython.zmq.parallel.streamkernel import Kernel | ||||
from IPython.utils.importstring import import_item | ||||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
default_config_file_name = u'ipengine_config.py' | ||||
mpi4py_init = """from mpi4py import MPI as mpi | ||||
mpi.size = mpi.COMM_WORLD.Get_size() | ||||
mpi.rank = mpi.COMM_WORLD.Get_rank() | ||||
""" | ||||
pytrilinos_init = """from PyTrilinos import Epetra | ||||
class SimpleStruct: | ||||
pass | ||||
mpi = SimpleStruct() | ||||
mpi.rank = 0 | ||||
mpi.size = 0 | ||||
""" | ||||
_description = """Start an IPython engine for parallel computing.\n\n | ||||
IPython engines run in parallel and perform computations on behalf of a client | ||||
and controller. A controller needs to be started before the engines. The | ||||
engine can be configured using command line options or using a cluster | ||||
directory. Cluster directories contain config, log and security files and are | ||||
usually located in your .ipython directory and named as "cluster_<profile>". | ||||
See the --profile and --cluster-dir options for details. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Command line options | ||||
#----------------------------------------------------------------------------- | ||||
class IPEngineAppConfigLoader(ClusterDirConfigLoader): | ||||
def _add_arguments(self): | ||||
super(IPEngineAppConfigLoader, self)._add_arguments() | ||||
paa = self.parser.add_argument | ||||
# Controller config | ||||
paa('--url-file', | ||||
type=unicode, dest='Global.url_file', | ||||
help='The full location of the file containing the FURL of the ' | ||||
'controller. If this is not given, the FURL file must be in the ' | ||||
'security directory of the cluster directory. This location is ' | ||||
'resolved using the --profile and --app-dir options.', | ||||
metavar='Global.url_file') | ||||
# MPI | ||||
paa('--mpi', | ||||
type=str, dest='MPI.use', | ||||
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).', | ||||
metavar='MPI.use') | ||||
# Global config | ||||
paa('--log-to-file', | ||||
action='store_true', dest='Global.log_to_file', | ||||
help='Log to a file in the log directory (default is stdout)') | ||||
paa('--log-url', | ||||
dest='Global.log_url', | ||||
help="url of ZMQ logger, as started with iploggerz") | ||||
paa('--execkey', | ||||
type=str, dest='Global.exec_key', | ||||
help='path to a file containing an execution key.', | ||||
metavar='keyfile') | ||||
paa('--no-secure', | ||||
action='store_false', dest='Global.secure', | ||||
help='Turn off execution keys.') | ||||
paa('--secure', | ||||
action='store_true', dest='Global.secure', | ||||
help='Turn on execution keys (default).') | ||||
# init command | ||||
paa('-c', | ||||
type=str, dest='Global.extra_exec_lines', | ||||
help='specify a command to be run at startup') | ||||
factory.add_session_arguments(self.parser) | ||||
factory.add_registration_arguments(self.parser) | ||||
#----------------------------------------------------------------------------- | ||||
# Main application | ||||
#----------------------------------------------------------------------------- | ||||
class IPEngineApp(ApplicationWithClusterDir): | ||||
name = u'ipenginez' | ||||
description = _description | ||||
command_line_loader = IPEngineAppConfigLoader | ||||
default_config_file_name = default_config_file_name | ||||
auto_create_cluster_dir = True | ||||
def create_default_config(self): | ||||
super(IPEngineApp, self).create_default_config() | ||||
# The engine should not clean logs as we don't want to remove the | ||||
# active log files of other running engines. | ||||
self.default_config.Global.clean_logs = False | ||||
self.default_config.Global.secure = True | ||||
# Global config attributes | ||||
self.default_config.Global.exec_lines = [] | ||||
self.default_config.Global.extra_exec_lines = '' | ||||
# Configuration related to the controller | ||||
# This must match the filename (path not included) that the controller | ||||
# used for the FURL file. | ||||
self.default_config.Global.url = u'tcp://localhost:10101' | ||||
# If given, this is the actual location of the controller's FURL file. | ||||
# If not, this is computed using the profile, app_dir and furl_file_name | ||||
self.default_config.Global.key_file_name = u'exec_key.key' | ||||
self.default_config.Global.key_file = u'' | ||||
# MPI related config attributes | ||||
self.default_config.MPI.use = '' | ||||
self.default_config.MPI.mpi4py = mpi4py_init | ||||
self.default_config.MPI.pytrilinos = pytrilinos_init | ||||
def post_load_command_line_config(self): | ||||
pass | ||||
def pre_construct(self): | ||||
super(IPEngineApp, self).pre_construct() | ||||
# self.find_cont_url_file() | ||||
self.find_key_file() | ||||
if self.master_config.Global.extra_exec_lines: | ||||
self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines) | ||||
def find_key_file(self): | ||||
"""Set the key file. | ||||
Here we don't try to actually see if it exists for is valid as that | ||||
is hadled by the connection logic. | ||||
""" | ||||
config = self.master_config | ||||
# Find the actual controller key file | ||||
if not config.Global.key_file: | ||||
try_this = os.path.join( | ||||
config.Global.cluster_dir, | ||||
config.Global.security_dir, | ||||
config.Global.key_file_name | ||||
) | ||||
config.Global.key_file = try_this | ||||
def construct(self): | ||||
# This is the working dir by now. | ||||
sys.path.insert(0, '') | ||||
config = self.master_config | ||||
if os.path.exists(config.Global.key_file) and config.Global.secure: | ||||
config.SessionFactory.exec_key = config.Global.key_file | ||||
config.Kernel.exec_lines = config.Global.exec_lines | ||||
self.start_mpi() | ||||
# Create the underlying shell class and EngineService | ||||
# shell_class = import_item(self.master_config.Global.shell_class) | ||||
try: | ||||
self.engine = EngineFactory(config=config) | ||||
except: | ||||
self.log.error("Couldn't start the Engine", exc_info=True) | ||||
self.exit(1) | ||||
self.start_logging() | ||||
# Create the service hierarchy | ||||
# self.main_service = service.MultiService() | ||||
# self.engine_service.setServiceParent(self.main_service) | ||||
# self.tub_service = Tub() | ||||
# self.tub_service.setServiceParent(self.main_service) | ||||
# # This needs to be called before the connection is initiated | ||||
# self.main_service.startService() | ||||
# This initiates the connection to the controller and calls | ||||
# register_engine to tell the controller we are ready to do work | ||||
# self.engine_connector = EngineConnector(self.tub_service) | ||||
# self.log.info("Using furl file: %s" % self.master_config.Global.furl_file) | ||||
# reactor.callWhenRunning(self.call_connect) | ||||
def start_logging(self): | ||||
super(IPEngineApp, self).start_logging() | ||||
if self.master_config.Global.log_url: | ||||
context = self.engine.context | ||||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.master_config.Global.log_url) | ||||
handler = EnginePUBHandler(self.engine, lsock) | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
def start_mpi(self): | ||||
global mpi | ||||
mpikey = self.master_config.MPI.use | ||||
mpi_import_statement = self.master_config.MPI.get(mpikey, None) | ||||
if mpi_import_statement is not None: | ||||
try: | ||||
self.log.info("Initializing MPI:") | ||||
self.log.info(mpi_import_statement) | ||||
exec mpi_import_statement in globals() | ||||
except: | ||||
mpi = None | ||||
else: | ||||
mpi = None | ||||
def start_app(self): | ||||
self.engine.start() | ||||
try: | ||||
self.engine.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Engine Interrupted, shutting down...\n") | ||||
def launch_new_instance(): | ||||
"""Create and run the IPython controller""" | ||||
app = IPEngineApp() | ||||
app.start() | ||||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||