ipengineapp.py
301 lines
| 9.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython engine application | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import json | ||
MinRK
|
r3604 | import os | ||
import sys | ||||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
MinRK
|
r3688 | from IPython.parallel.apps.clusterdir import ( | ||
MinRK
|
r3985 | ClusterDirApplication, | ||
ClusterDir, | ||||
base_aliases, | ||||
# ClusterDirConfigLoader | ||||
MinRK
|
r3604 | ) | ||
from IPython.zmq.log import EnginePUBHandler | ||||
MinRK
|
r3985 | from IPython.config.configurable import Configurable | ||
from IPython.parallel.streamsession import StreamSession | ||||
MinRK
|
r3673 | from IPython.parallel.engine.engine import EngineFactory | ||
from IPython.parallel.engine.streamkernel import Kernel | ||||
MinRK
|
r3666 | from IPython.parallel.util import disambiguate_url | ||
MinRK
|
r3673 | |||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3985 | from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | |||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
MinRK
|
r3672 | default_config_file_name = u'ipengine_config.py' | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | _description = """Start an IPython engine for parallel computing.\n\n | ||
IPython engines run in parallel and perform computations on behalf of a client | ||||
and controller. A controller needs to be started before the engines. The | ||||
engine can be configured using command line options or using a cluster | ||||
directory. Cluster directories contain config, log and security files and are | ||||
usually located in your ipython directory and named as "cluster_<profile>". | ||||
See the `profile` and `cluster_dir` options for details. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# MPI configuration | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
mpi4py_init = """from mpi4py import MPI as mpi | ||||
mpi.size = mpi.COMM_WORLD.Get_size() | ||||
mpi.rank = mpi.COMM_WORLD.Get_rank() | ||||
""" | ||||
pytrilinos_init = """from PyTrilinos import Epetra | ||||
class SimpleStruct: | ||||
pass | ||||
mpi = SimpleStruct() | ||||
mpi.rank = 0 | ||||
mpi.size = 0 | ||||
""" | ||||
MinRK
|
r3985 | class MPI(Configurable): | ||
"""Configurable for MPI initialization""" | ||||
use = Str('', config=True, | ||||
help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' | ||||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | def _on_use_changed(self, old, new): | ||
# load default init script if it's not set | ||||
if not self.init_script: | ||||
self.init_script = self.default_inits.get(new, '') | ||||
init_script = Str('', config=True, | ||||
help="Initialization code for MPI") | ||||
default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init}, | ||||
config=True) | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3985 | class IPEngineApp(ClusterDirApplication): | ||
app_name = Unicode(u'ipengine') | ||||
description = Unicode(_description) | ||||
default_config_file_name = default_config_file_name | ||||
classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI]) | ||||
startup_script = Unicode(u'', config=True, | ||||
help='specify a script to be run at startup') | ||||
startup_command = Str('', config=True, | ||||
MinRK
|
r3604 | help='specify a command to be run at startup') | ||
MinRK
|
r3985 | url_file = Unicode(u'', config=True, | ||
help="""The full location of the file containing the connection information for | ||||
the controller. If this is not given, the file must be in the | ||||
security directory of the cluster directory. This location is | ||||
resolved using the `profile` or `cluster_dir` options.""", | ||||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | url_file_name = Unicode(u'ipcontroller-engine.json') | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | aliases = Dict(dict( | ||
config = 'IPEngineApp.config_file', | ||||
file = 'IPEngineApp.url_file', | ||||
c = 'IPEngineApp.startup_command', | ||||
s = 'IPEngineApp.startup_script', | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | ident = 'StreamSession.session', | ||
user = 'StreamSession.username', | ||||
exec_key = 'StreamSession.keyfile', | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | url = 'EngineFactory.url', | ||
ip = 'EngineFactory.ip', | ||||
transport = 'EngineFactory.transport', | ||||
port = 'EngineFactory.regport', | ||||
location = 'EngineFactory.location', | ||||
timeout = 'EngineFactory.timeout', | ||||
profile = "ClusterDir.profile", | ||||
cluster_dir = 'ClusterDir.location', | ||||
mpi = 'MPI.use', | ||||
log_level = 'IPEngineApp.log_level', | ||||
)) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3614 | # def find_key_file(self): | ||
# """Set the key file. | ||||
# | ||||
# Here we don't try to actually see if it exists for is valid as that | ||||
# is hadled by the connection logic. | ||||
# """ | ||||
# config = self.master_config | ||||
# # Find the actual controller key file | ||||
# if not config.Global.key_file: | ||||
# try_this = os.path.join( | ||||
# config.Global.cluster_dir, | ||||
# config.Global.security_dir, | ||||
# config.Global.key_file_name | ||||
# ) | ||||
# config.Global.key_file = try_this | ||||
def find_url_file(self): | ||||
MinRK
|
r3604 | """Set the key file. | ||
Here we don't try to actually see if it exists for is valid as that | ||||
is hadled by the connection logic. | ||||
""" | ||||
MinRK
|
r3985 | config = self.config | ||
MinRK
|
r3604 | # Find the actual controller key file | ||
MinRK
|
r3985 | if not self.url_file: | ||
self.url_file = os.path.join( | ||||
self.cluster_dir.security_dir, | ||||
self.url_file_name | ||||
MinRK
|
r3604 | ) | ||
MinRK
|
r3985 | def init_engine(self): | ||
MinRK
|
r3604 | # This is the working dir by now. | ||
sys.path.insert(0, '') | ||||
MinRK
|
r3985 | config = self.config | ||
# print config | ||||
self.find_url_file() | ||||
MinRK
|
r3614 | # if os.path.exists(config.Global.key_file) and config.Global.secure: | ||
# config.SessionFactory.exec_key = config.Global.key_file | ||||
MinRK
|
r3985 | if os.path.exists(self.url_file): | ||
with open(self.url_file) as f: | ||||
MinRK
|
r3614 | d = json.loads(f.read()) | ||
for k,v in d.iteritems(): | ||||
if isinstance(v, unicode): | ||||
d[k] = v.encode() | ||||
if d['exec_key']: | ||||
MinRK
|
r3985 | config.StreamSession.key = d['exec_key'] | ||
MinRK
|
r3614 | d['url'] = disambiguate_url(d['url'], d['location']) | ||
MinRK
|
r3985 | config.EngineFactory.url = d['url'] | ||
MinRK
|
r3614 | config.EngineFactory.location = d['location'] | ||
MinRK
|
r3985 | try: | ||
exec_lines = config.Kernel.exec_lines | ||||
except AttributeError: | ||||
config.Kernel.exec_lines = [] | ||||
exec_lines = config.Kernel.exec_lines | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | if self.startup_script: | ||
enc = sys.getfilesystemencoding() or 'utf8' | ||||
cmd="execfile(%r)"%self.startup_script.encode(enc) | ||||
exec_lines.append(cmd) | ||||
if self.startup_command: | ||||
exec_lines.append(self.startup_command) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | # Create the underlying shell class and Engine | ||
MinRK
|
r3604 | # shell_class = import_item(self.master_config.Global.shell_class) | ||
MinRK
|
r3985 | # print self.config | ||
MinRK
|
r3604 | try: | ||
MinRK
|
r3985 | self.engine = EngineFactory(config=config, log=self.log) | ||
MinRK
|
r3604 | except: | ||
self.log.error("Couldn't start the Engine", exc_info=True) | ||||
self.exit(1) | ||||
MinRK
|
r3985 | # self.start_logging() | ||
MinRK
|
r3604 | |||
# Create the service hierarchy | ||||
# self.main_service = service.MultiService() | ||||
# self.engine_service.setServiceParent(self.main_service) | ||||
# self.tub_service = Tub() | ||||
# self.tub_service.setServiceParent(self.main_service) | ||||
# # This needs to be called before the connection is initiated | ||||
# self.main_service.startService() | ||||
# This initiates the connection to the controller and calls | ||||
# register_engine to tell the controller we are ready to do work | ||||
# self.engine_connector = EngineConnector(self.tub_service) | ||||
# self.log.info("Using furl file: %s" % self.master_config.Global.furl_file) | ||||
# reactor.callWhenRunning(self.call_connect) | ||||
MinRK
|
r3985 | # def start_logging(self): | ||
# super(IPEngineApp, self).start_logging() | ||||
# if self.master_config.Global.log_url: | ||||
# context = self.engine.context | ||||
# lsock = context.socket(zmq.PUB) | ||||
# lsock.connect(self.master_config.Global.log_url) | ||||
# handler = EnginePUBHandler(self.engine, lsock) | ||||
# handler.setLevel(self.log_level) | ||||
# self.log.addHandler(handler) | ||||
# | ||||
def init_mpi(self): | ||||
MinRK
|
r3604 | global mpi | ||
MinRK
|
r3985 | self.mpi = MPI(config=self.config) | ||
mpi_import_statement = self.mpi.init_script | ||||
if mpi_import_statement: | ||||
MinRK
|
r3604 | try: | ||
self.log.info("Initializing MPI:") | ||||
self.log.info(mpi_import_statement) | ||||
exec mpi_import_statement in globals() | ||||
except: | ||||
mpi = None | ||||
else: | ||||
mpi = None | ||||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3604 | self.engine.start() | ||
try: | ||||
self.engine.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Engine Interrupted, shutting down...\n") | ||||
def launch_new_instance(): | ||||
MinRK
|
r3985 | """Create and run the IPython engine""" | ||
MinRK
|
r3604 | app = IPEngineApp() | ||
MinRK
|
r3985 | app.parse_command_line() | ||
cl_config = app.config | ||||
app.init_clusterdir() | ||||
# app.load_config_file() | ||||
# print app.config | ||||
if app.config_file: | ||||
app.load_config_file(app.config_file) | ||||
else: | ||||
app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) | ||||
# command-line should *override* config file, but command-line is necessary | ||||
# to determine clusterdir, etc. | ||||
app.update_config(cl_config) | ||||
# print app.config | ||||
app.to_work_dir() | ||||
app.init_mpi() | ||||
app.init_engine() | ||||
print app.config | ||||
MinRK
|
r3604 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||