##// END OF EJS Templates
add tests for parallel magics...
add tests for parallel magics tests revealed some changes that should be made: %px also prints remote stdout in blocking mode, and actually raises the RemoteError (not just display) if there is one

File last commit:

r3688:ef471568
r3736:8cf5bb69
Show More
ipengineapp.py
303 lines | 10.8 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython engine application
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
resort imports in a cleaner order
r3631 import json
MinRK
Refactor newparallel to use Config system...
r3604 import os
import sys
import zmq
from zmq.eventloop import ioloop
MinRK
fix residual import issues with IPython.parallel reorganization
r3688 from IPython.parallel.apps.clusterdir import (
MinRK
Refactor newparallel to use Config system...
r3604 ApplicationWithClusterDir,
ClusterDirConfigLoader
)
from IPython.zmq.log import EnginePUBHandler
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import factory
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.engine.engine import EngineFactory
from IPython.parallel.engine.streamkernel import Kernel
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel.util import disambiguate_url
MinRK
organize IPython.parallel into subpackages
r3673
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
MinRK
persist connection data to disk as json
r3614
MinRK
Refactor newparallel to use Config system...
r3604 #-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipengine_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
mpi4py_init = """from mpi4py import MPI as mpi
mpi.size = mpi.COMM_WORLD.Get_size()
mpi.rank = mpi.COMM_WORLD.Get_rank()
"""
pytrilinos_init = """from PyTrilinos import Epetra
class SimpleStruct:
pass
mpi = SimpleStruct()
mpi.rank = 0
mpi.size = 0
"""
_description = """Start an IPython engine for parallel computing.\n\n
IPython engines run in parallel and perform computations on behalf of a client
and controller. A controller needs to be started before the engines. The
engine can be configured using command line options or using a cluster
directory. Cluster directories contain config, log and security files and are
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 usually located in your ipython directory and named as "cluster_<profile>".
MinRK
Refactor newparallel to use Config system...
r3604 See the --profile and --cluster-dir options for details.
"""
#-----------------------------------------------------------------------------
# Command line options
#-----------------------------------------------------------------------------
class IPEngineAppConfigLoader(ClusterDirConfigLoader):
def _add_arguments(self):
super(IPEngineAppConfigLoader, self)._add_arguments()
paa = self.parser.add_argument
# Controller config
MinRK
add default ip<x>z_config files
r3630 paa('--file', '-f',
MinRK
Refactor newparallel to use Config system...
r3604 type=unicode, dest='Global.url_file',
MinRK
persist connection data to disk as json
r3614 help='The full location of the file containing the connection information fo '
'controller. If this is not given, the file must be in the '
MinRK
Refactor newparallel to use Config system...
r3604 'security directory of the cluster directory. This location is '
'resolved using the --profile and --app-dir options.',
metavar='Global.url_file')
# MPI
paa('--mpi',
type=str, dest='MPI.use',
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
metavar='MPI.use')
# Global config
paa('--log-to-file',
action='store_true', dest='Global.log_to_file',
help='Log to a file in the log directory (default is stdout)')
paa('--log-url',
dest='Global.log_url',
help="url of ZMQ logger, as started with iploggerz")
MinRK
persist connection data to disk as json
r3614 # paa('--execkey',
# type=str, dest='Global.exec_key',
# help='path to a file containing an execution key.',
# metavar='keyfile')
# paa('--no-secure',
# action='store_false', dest='Global.secure',
# help='Turn off execution keys.')
# paa('--secure',
# action='store_true', dest='Global.secure',
# help='Turn on execution keys (default).')
MinRK
Refactor newparallel to use Config system...
r3604 # init command
paa('-c',
type=str, dest='Global.extra_exec_lines',
help='specify a command to be run at startup')
MinRK
add '-s' for startup script in ipengine...
r3684 paa('-s',
type=unicode, dest='Global.extra_exec_file',
help='specify a script to be run at startup')
MinRK
Refactor newparallel to use Config system...
r3604
factory.add_session_arguments(self.parser)
factory.add_registration_arguments(self.parser)
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
class IPEngineApp(ApplicationWithClusterDir):
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipengine'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
command_line_loader = IPEngineAppConfigLoader
default_config_file_name = default_config_file_name
auto_create_cluster_dir = True
def create_default_config(self):
super(IPEngineApp, self).create_default_config()
# The engine should not clean logs as we don't want to remove the
# active log files of other running engines.
self.default_config.Global.clean_logs = False
self.default_config.Global.secure = True
# Global config attributes
self.default_config.Global.exec_lines = []
self.default_config.Global.extra_exec_lines = ''
MinRK
add '-s' for startup script in ipengine...
r3684 self.default_config.Global.extra_exec_file = u''
MinRK
Refactor newparallel to use Config system...
r3604
# Configuration related to the controller
# This must match the filename (path not included) that the controller
# used for the FURL file.
MinRK
persist connection data to disk as json
r3614 self.default_config.Global.url_file = u''
self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
MinRK
Refactor newparallel to use Config system...
r3604 # If given, this is the actual location of the controller's FURL file.
# If not, this is computed using the profile, app_dir and furl_file_name
MinRK
persist connection data to disk as json
r3614 # self.default_config.Global.key_file_name = u'exec_key.key'
# self.default_config.Global.key_file = u''
MinRK
Refactor newparallel to use Config system...
r3604
# MPI related config attributes
self.default_config.MPI.use = ''
self.default_config.MPI.mpi4py = mpi4py_init
self.default_config.MPI.pytrilinos = pytrilinos_init
def post_load_command_line_config(self):
pass
def pre_construct(self):
super(IPEngineApp, self).pre_construct()
# self.find_cont_url_file()
MinRK
persist connection data to disk as json
r3614 self.find_url_file()
MinRK
Refactor newparallel to use Config system...
r3604 if self.master_config.Global.extra_exec_lines:
self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
MinRK
add '-s' for startup script in ipengine...
r3684 if self.master_config.Global.extra_exec_file:
enc = sys.getfilesystemencoding() or 'utf8'
cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
self.master_config.Global.exec_lines.append(cmd)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
persist connection data to disk as json
r3614 # def find_key_file(self):
# """Set the key file.
#
# Here we don't try to actually see if it exists for is valid as that
# is hadled by the connection logic.
# """
# config = self.master_config
# # Find the actual controller key file
# if not config.Global.key_file:
# try_this = os.path.join(
# config.Global.cluster_dir,
# config.Global.security_dir,
# config.Global.key_file_name
# )
# config.Global.key_file = try_this
def find_url_file(self):
MinRK
Refactor newparallel to use Config system...
r3604 """Set the key file.
Here we don't try to actually see if it exists for is valid as that
is hadled by the connection logic.
"""
config = self.master_config
# Find the actual controller key file
MinRK
persist connection data to disk as json
r3614 if not config.Global.url_file:
MinRK
Refactor newparallel to use Config system...
r3604 try_this = os.path.join(
config.Global.cluster_dir,
config.Global.security_dir,
MinRK
persist connection data to disk as json
r3614 config.Global.url_file_name
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
persist connection data to disk as json
r3614 config.Global.url_file = try_this
MinRK
Refactor newparallel to use Config system...
r3604
def construct(self):
# This is the working dir by now.
sys.path.insert(0, '')
config = self.master_config
MinRK
persist connection data to disk as json
r3614 # if os.path.exists(config.Global.key_file) and config.Global.secure:
# config.SessionFactory.exec_key = config.Global.key_file
if os.path.exists(config.Global.url_file):
with open(config.Global.url_file) as f:
d = json.loads(f.read())
for k,v in d.iteritems():
if isinstance(v, unicode):
d[k] = v.encode()
if d['exec_key']:
config.SessionFactory.exec_key = d['exec_key']
d['url'] = disambiguate_url(d['url'], d['location'])
config.RegistrationFactory.url=d['url']
config.EngineFactory.location = d['location']
MinRK
Refactor newparallel to use Config system...
r3604
config.Kernel.exec_lines = config.Global.exec_lines
self.start_mpi()
# Create the underlying shell class and EngineService
# shell_class = import_item(self.master_config.Global.shell_class)
try:
MinRK
rework logging connections
r3610 self.engine = EngineFactory(config=config, logname=self.log.name)
MinRK
Refactor newparallel to use Config system...
r3604 except:
self.log.error("Couldn't start the Engine", exc_info=True)
self.exit(1)
self.start_logging()
# Create the service hierarchy
# self.main_service = service.MultiService()
# self.engine_service.setServiceParent(self.main_service)
# self.tub_service = Tub()
# self.tub_service.setServiceParent(self.main_service)
# # This needs to be called before the connection is initiated
# self.main_service.startService()
# This initiates the connection to the controller and calls
# register_engine to tell the controller we are ready to do work
# self.engine_connector = EngineConnector(self.tub_service)
# self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
# reactor.callWhenRunning(self.call_connect)
def start_logging(self):
super(IPEngineApp, self).start_logging()
if self.master_config.Global.log_url:
context = self.engine.context
lsock = context.socket(zmq.PUB)
lsock.connect(self.master_config.Global.log_url)
handler = EnginePUBHandler(self.engine, lsock)
handler.setLevel(self.log_level)
self.log.addHandler(handler)
def start_mpi(self):
global mpi
mpikey = self.master_config.MPI.use
mpi_import_statement = self.master_config.MPI.get(mpikey, None)
if mpi_import_statement is not None:
try:
self.log.info("Initializing MPI:")
self.log.info(mpi_import_statement)
exec mpi_import_statement in globals()
except:
mpi = None
else:
mpi = None
def start_app(self):
self.engine.start()
try:
self.engine.loop.start()
except KeyboardInterrupt:
self.log.critical("Engine Interrupted, shutting down...\n")
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPEngineApp()
app.start()
if __name__ == '__main__':
launch_new_instance()