ipengineapp.py
270 lines
| 9.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython engine application | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import json | ||
MinRK
|
r3604 | import os | ||
import sys | ||||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
MinRK
|
r3992 | from IPython.core.newapplication import ProfileDir | ||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import BaseParallelApplication | ||
MinRK
|
r3604 | from IPython.zmq.log import EnginePUBHandler | ||
MinRK
|
r3985 | from IPython.config.configurable import Configurable | ||
from IPython.parallel.streamsession import StreamSession | ||||
MinRK
|
r3673 | from IPython.parallel.engine.engine import EngineFactory | ||
from IPython.parallel.engine.streamkernel import Kernel | ||||
MinRK
|
r3666 | from IPython.parallel.util import disambiguate_url | ||
MinRK
|
r3673 | |||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3989 | from IPython.utils.traitlets import Bool, Unicode, Dict, List | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | |||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
MinRK
|
r3672 | default_config_file_name = u'ipengine_config.py' | ||
MinRK
|
r3604 | |||
MinRK
|
r3990 | _description = """Start an IPython engine for parallel computing. | ||
MinRK
|
r3985 | |||
IPython engines run in parallel and perform computations on behalf of a client | ||||
and controller. A controller needs to be started before the engines. The | ||||
engine can be configured using command line options or using a cluster | ||||
directory. Cluster directories contain config, log and security files and are | ||||
usually located in your ipython directory and named as "cluster_<profile>". | ||||
MinRK
|
r3992 | See the `profile` and `profile_dir` options for details. | ||
MinRK
|
r3985 | """ | ||
#----------------------------------------------------------------------------- | ||||
# MPI configuration | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
mpi4py_init = """from mpi4py import MPI as mpi | ||||
mpi.size = mpi.COMM_WORLD.Get_size() | ||||
mpi.rank = mpi.COMM_WORLD.Get_rank() | ||||
""" | ||||
pytrilinos_init = """from PyTrilinos import Epetra | ||||
class SimpleStruct: | ||||
pass | ||||
mpi = SimpleStruct() | ||||
mpi.rank = 0 | ||||
mpi.size = 0 | ||||
""" | ||||
MinRK
|
r3985 | class MPI(Configurable): | ||
"""Configurable for MPI initialization""" | ||||
MinRK
|
r3988 | use = Unicode('', config=True, | ||
MinRK
|
r3985 | help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' | ||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | def _on_use_changed(self, old, new): | ||
# load default init script if it's not set | ||||
if not self.init_script: | ||||
self.init_script = self.default_inits.get(new, '') | ||||
MinRK
|
r3988 | init_script = Unicode('', config=True, | ||
MinRK
|
r3985 | help="Initialization code for MPI") | ||
default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init}, | ||||
config=True) | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3992 | class IPEngineApp(BaseParallelApplication): | ||
MinRK
|
r3985 | |||
app_name = Unicode(u'ipengine') | ||||
description = Unicode(_description) | ||||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3992 | classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI]) | ||
MinRK
|
r3985 | |||
startup_script = Unicode(u'', config=True, | ||||
help='specify a script to be run at startup') | ||||
MinRK
|
r3988 | startup_command = Unicode('', config=True, | ||
MinRK
|
r3604 | help='specify a command to be run at startup') | ||
MinRK
|
r3985 | url_file = Unicode(u'', config=True, | ||
help="""The full location of the file containing the connection information for | ||||
the controller. If this is not given, the file must be in the | ||||
security directory of the cluster directory. This location is | ||||
MinRK
|
r3992 | resolved using the `profile` or `profile_dir` options.""", | ||
MinRK
|
r3985 | ) | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | url_file_name = Unicode(u'ipcontroller-engine.json') | ||
MinRK
|
r3989 | log_url = Unicode('', config=True, | ||
help="""The URL for the iploggerapp instance, for forwarding | ||||
logging to a central location.""") | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | aliases = Dict(dict( | ||
file = 'IPEngineApp.url_file', | ||||
c = 'IPEngineApp.startup_command', | ||||
s = 'IPEngineApp.startup_script', | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | ident = 'StreamSession.session', | ||
user = 'StreamSession.username', | ||||
exec_key = 'StreamSession.keyfile', | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | url = 'EngineFactory.url', | ||
ip = 'EngineFactory.ip', | ||||
transport = 'EngineFactory.transport', | ||||
port = 'EngineFactory.regport', | ||||
location = 'EngineFactory.location', | ||||
timeout = 'EngineFactory.timeout', | ||||
MinRK
|
r3992 | profile = "IPEngineApp.profile", | ||
profile_dir = 'ProfileDir.location', | ||||
MinRK
|
r3985 | |||
mpi = 'MPI.use', | ||||
log_level = 'IPEngineApp.log_level', | ||||
MinRK
|
r3989 | log_url = 'IPEngineApp.log_url' | ||
MinRK
|
r3985 | )) | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | # def find_key_file(self): | ||
# """Set the key file. | ||||
# | ||||
# Here we don't try to actually see if it exists for is valid as that | ||||
# is hadled by the connection logic. | ||||
# """ | ||||
# config = self.master_config | ||||
# # Find the actual controller key file | ||||
# if not config.Global.key_file: | ||||
# try_this = os.path.join( | ||||
MinRK
|
r3992 | # config.Global.profile_dir, | ||
MinRK
|
r3614 | # config.Global.security_dir, | ||
# config.Global.key_file_name | ||||
# ) | ||||
# config.Global.key_file = try_this | ||||
def find_url_file(self): | ||||
MinRK
|
r3604 | """Set the key file. | ||
Here we don't try to actually see if it exists for is valid as that | ||||
is hadled by the connection logic. | ||||
""" | ||||
MinRK
|
r3985 | config = self.config | ||
MinRK
|
r3604 | # Find the actual controller key file | ||
MinRK
|
r3985 | if not self.url_file: | ||
self.url_file = os.path.join( | ||||
MinRK
|
r3992 | self.profile_dir.security_dir, | ||
MinRK
|
r3985 | self.url_file_name | ||
MinRK
|
r3604 | ) | ||
MinRK
|
r3985 | def init_engine(self): | ||
MinRK
|
r3604 | # This is the working dir by now. | ||
sys.path.insert(0, '') | ||||
MinRK
|
r3985 | config = self.config | ||
# print config | ||||
self.find_url_file() | ||||
MinRK
|
r3614 | # if os.path.exists(config.Global.key_file) and config.Global.secure: | ||
# config.SessionFactory.exec_key = config.Global.key_file | ||||
MinRK
|
r3985 | if os.path.exists(self.url_file): | ||
with open(self.url_file) as f: | ||||
MinRK
|
r3614 | d = json.loads(f.read()) | ||
for k,v in d.iteritems(): | ||||
if isinstance(v, unicode): | ||||
d[k] = v.encode() | ||||
if d['exec_key']: | ||||
MinRK
|
r3985 | config.StreamSession.key = d['exec_key'] | ||
MinRK
|
r3614 | d['url'] = disambiguate_url(d['url'], d['location']) | ||
MinRK
|
r3985 | config.EngineFactory.url = d['url'] | ||
MinRK
|
r3614 | config.EngineFactory.location = d['location'] | ||
MinRK
|
r3985 | try: | ||
exec_lines = config.Kernel.exec_lines | ||||
except AttributeError: | ||||
config.Kernel.exec_lines = [] | ||||
exec_lines = config.Kernel.exec_lines | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | if self.startup_script: | ||
enc = sys.getfilesystemencoding() or 'utf8' | ||||
cmd="execfile(%r)"%self.startup_script.encode(enc) | ||||
exec_lines.append(cmd) | ||||
if self.startup_command: | ||||
exec_lines.append(self.startup_command) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | # Create the underlying shell class and Engine | ||
MinRK
|
r3604 | # shell_class = import_item(self.master_config.Global.shell_class) | ||
MinRK
|
r3985 | # print self.config | ||
MinRK
|
r3604 | try: | ||
MinRK
|
r3985 | self.engine = EngineFactory(config=config, log=self.log) | ||
MinRK
|
r3604 | except: | ||
self.log.error("Couldn't start the Engine", exc_info=True) | ||||
self.exit(1) | ||||
MinRK
|
r3989 | def forward_logging(self): | ||
if self.log_url: | ||||
self.log.info("Forwarding logging to %s"%self.log_url) | ||||
context = self.engine.context | ||||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.log_url) | ||||
self.log.removeHandler(self._log_handler) | ||||
handler = EnginePUBHandler(self.engine, lsock) | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
self._log_handler = handler | ||||
MinRK
|
r3985 | # | ||
def init_mpi(self): | ||||
MinRK
|
r3604 | global mpi | ||
MinRK
|
r3985 | self.mpi = MPI(config=self.config) | ||
mpi_import_statement = self.mpi.init_script | ||||
if mpi_import_statement: | ||||
MinRK
|
r3604 | try: | ||
self.log.info("Initializing MPI:") | ||||
self.log.info(mpi_import_statement) | ||||
exec mpi_import_statement in globals() | ||||
except: | ||||
mpi = None | ||||
else: | ||||
mpi = None | ||||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
super(IPEngineApp, self).initialize(argv) | ||||
self.init_mpi() | ||||
self.init_engine() | ||||
MinRK
|
r3989 | self.forward_logging() | ||
MinRK
|
r3986 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3604 | self.engine.start() | ||
try: | ||||
self.engine.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Engine Interrupted, shutting down...\n") | ||||
def launch_new_instance(): | ||||
MinRK
|
r3985 | """Create and run the IPython engine""" | ||
MinRK
|
r3999 | app = IPEngineApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3604 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||