ipengineapp.py
344 lines
| 11.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython engine application | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Brian Granger | ||||
* MinRK | ||||
MinRK
|
r3604 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2008-2011 The IPython Development Team | ||
MinRK
|
r3604 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import json | ||
MinRK
|
r3604 | import os | ||
import sys | ||||
MinRK
|
r4120 | import time | ||
MinRK
|
r3604 | |||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
MinRK
|
r4024 | from IPython.core.profiledir import ProfileDir | ||
MinRK
|
r4115 | from IPython.parallel.apps.baseapp import ( | ||
BaseParallelApplication, | ||||
base_aliases, | ||||
base_flags, | ||||
) | ||||
MinRK
|
r3604 | from IPython.zmq.log import EnginePUBHandler | ||
MinRK
|
r4962 | from IPython.zmq.session import ( | ||
Session, session_aliases, session_flags | ||||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | from IPython.config.configurable import Configurable | ||
MinRK
|
r4962 | |||
MinRK
|
r3673 | from IPython.parallel.engine.engine import EngineFactory | ||
from IPython.parallel.engine.streamkernel import Kernel | ||||
MinRK
|
r4161 | from IPython.parallel.util import disambiguate_url, asbytes | ||
MinRK
|
r3673 | |||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r4120 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | |||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
MinRK
|
r3672 | default_config_file_name = u'ipengine_config.py' | ||
MinRK
|
r3604 | |||
MinRK
|
r3990 | _description = """Start an IPython engine for parallel computing. | ||
MinRK
|
r3985 | |||
IPython engines run in parallel and perform computations on behalf of a client | ||||
and controller. A controller needs to be started before the engines. The | ||||
engine can be configured using command line options or using a cluster | ||||
directory. Cluster directories contain config, log and security files and are | ||||
MinRK
|
r4024 | usually located in your ipython directory and named as "profile_name". | ||
Brian E. Granger
|
r4218 | See the `profile` and `profile-dir` options for details. | ||
MinRK
|
r3985 | """ | ||
Brian Granger
|
r4216 | _examples = """ | ||
ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port | ||||
Brian E. Granger
|
r4218 | ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity | ||
Brian Granger
|
r4216 | """ | ||
MinRK
|
r3985 | |||
#----------------------------------------------------------------------------- | ||||
# MPI configuration | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
mpi4py_init = """from mpi4py import MPI as mpi | ||||
mpi.size = mpi.COMM_WORLD.Get_size() | ||||
mpi.rank = mpi.COMM_WORLD.Get_rank() | ||||
""" | ||||
pytrilinos_init = """from PyTrilinos import Epetra | ||||
class SimpleStruct: | ||||
pass | ||||
mpi = SimpleStruct() | ||||
mpi.rank = 0 | ||||
mpi.size = 0 | ||||
""" | ||||
MinRK
|
r3985 | class MPI(Configurable): | ||
"""Configurable for MPI initialization""" | ||||
MinRK
|
r3988 | use = Unicode('', config=True, | ||
MinRK
|
r3985 | help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' | ||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r4850 | def _use_changed(self, name, old, new): | ||
MinRK
|
r3985 | # load default init script if it's not set | ||
if not self.init_script: | ||||
self.init_script = self.default_inits.get(new, '') | ||||
MinRK
|
r3988 | init_script = Unicode('', config=True, | ||
MinRK
|
r3985 | help="Initialization code for MPI") | ||
default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init}, | ||||
config=True) | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4115 | aliases = dict( | ||
file = 'IPEngineApp.url_file', | ||||
c = 'IPEngineApp.startup_command', | ||||
s = 'IPEngineApp.startup_script', | ||||
MinRK
|
r3604 | |||
MinRK
|
r4115 | url = 'EngineFactory.url', | ||
MinRK
|
r4585 | ssh = 'EngineFactory.sshserver', | ||
sshkey = 'EngineFactory.sshkey', | ||||
MinRK
|
r4115 | ip = 'EngineFactory.ip', | ||
transport = 'EngineFactory.transport', | ||||
port = 'EngineFactory.regport', | ||||
location = 'EngineFactory.location', | ||||
timeout = 'EngineFactory.timeout', | ||||
mpi = 'MPI.use', | ||||
) | ||||
aliases.update(base_aliases) | ||||
MinRK
|
r4962 | aliases.update(session_aliases) | ||
flags = {} | ||||
flags.update(base_flags) | ||||
flags.update(session_flags) | ||||
Brian Granger
|
r4216 | |||
MinRK
|
r3992 | class IPEngineApp(BaseParallelApplication): | ||
MinRK
|
r3985 | |||
MinRK
|
r4847 | name = 'ipengine' | ||
description = _description | ||||
Brian Granger
|
r4216 | examples = _examples | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r4006 | classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI]) | ||
MinRK
|
r3985 | |||
startup_script = Unicode(u'', config=True, | ||||
help='specify a script to be run at startup') | ||||
MinRK
|
r3988 | startup_command = Unicode('', config=True, | ||
MinRK
|
r3604 | help='specify a command to be run at startup') | ||
MinRK
|
r3985 | url_file = Unicode(u'', config=True, | ||
help="""The full location of the file containing the connection information for | ||||
the controller. If this is not given, the file must be in the | ||||
security directory of the cluster directory. This location is | ||||
MinRK
|
r3992 | resolved using the `profile` or `profile_dir` options.""", | ||
MinRK
|
r3985 | ) | ||
MinRK
|
r4120 | wait_for_url_file = Float(5, config=True, | ||
help="""The maximum number of seconds to wait for url_file to exist. | ||||
This is useful for batch-systems and shared-filesystems where the | ||||
controller and engine are started at the same time and it | ||||
may take a moment for the controller to write the connector files.""") | ||||
MinRK
|
r3604 | |||
MinRK
|
r4847 | url_file_name = Unicode(u'ipcontroller-engine.json', config=True) | ||
def _cluster_id_changed(self, name, old, new): | ||||
if new: | ||||
MinRK
|
r4850 | base = 'ipcontroller-%s' % new | ||
MinRK
|
r4847 | else: | ||
base = 'ipcontroller' | ||||
MinRK
|
r4850 | self.url_file_name = "%s-engine.json" % base | ||
MinRK
|
r4847 | |||
MinRK
|
r3989 | log_url = Unicode('', config=True, | ||
help="""The URL for the iploggerapp instance, for forwarding | ||||
logging to a central location.""") | ||||
MinRK
|
r3604 | |||
MinRK
|
r4115 | aliases = Dict(aliases) | ||
MinRK
|
r4962 | flags = Dict(flags) | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | # def find_key_file(self): | ||
# """Set the key file. | ||||
# | ||||
# Here we don't try to actually see if it exists for is valid as that | ||||
# is hadled by the connection logic. | ||||
# """ | ||||
# config = self.master_config | ||||
# # Find the actual controller key file | ||||
# if not config.Global.key_file: | ||||
# try_this = os.path.join( | ||||
MinRK
|
r3992 | # config.Global.profile_dir, | ||
MinRK
|
r3614 | # config.Global.security_dir, | ||
# config.Global.key_file_name | ||||
# ) | ||||
# config.Global.key_file = try_this | ||||
def find_url_file(self): | ||||
MinRK
|
r4120 | """Set the url file. | ||
MinRK
|
r3604 | |||
Here we don't try to actually see if it exists for is valid as that | ||||
is hadled by the connection logic. | ||||
""" | ||||
MinRK
|
r3985 | config = self.config | ||
MinRK
|
r3604 | # Find the actual controller key file | ||
MinRK
|
r3985 | if not self.url_file: | ||
self.url_file = os.path.join( | ||||
MinRK
|
r3992 | self.profile_dir.security_dir, | ||
MinRK
|
r3985 | self.url_file_name | ||
MinRK
|
r3604 | ) | ||
MinRK
|
r4585 | |||
def load_connector_file(self): | ||||
"""load config from a JSON connector file, | ||||
at a *lower* priority than command-line/config files. | ||||
""" | ||||
self.log.info("Loading url_file %r"%self.url_file) | ||||
config = self.config | ||||
with open(self.url_file) as f: | ||||
d = json.loads(f.read()) | ||||
MinRK
|
r4962 | if 'exec_key' in d: | ||
config.Session.key = asbytes(d['exec_key']) | ||||
MinRK
|
r4585 | try: | ||
config.EngineFactory.location | ||||
except AttributeError: | ||||
config.EngineFactory.location = d['location'] | ||||
d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) | ||||
try: | ||||
config.EngineFactory.url | ||||
except AttributeError: | ||||
config.EngineFactory.url = d['url'] | ||||
try: | ||||
config.EngineFactory.sshserver | ||||
except AttributeError: | ||||
config.EngineFactory.sshserver = d['ssh'] | ||||
MinRK
|
r3985 | def init_engine(self): | ||
MinRK
|
r3604 | # This is the working dir by now. | ||
sys.path.insert(0, '') | ||||
MinRK
|
r3985 | config = self.config | ||
# print config | ||||
self.find_url_file() | ||||
MinRK
|
r4120 | |||
# was the url manually specified? | ||||
keys = set(self.config.EngineFactory.keys()) | ||||
keys = keys.union(set(self.config.RegistrationFactory.keys())) | ||||
if keys.intersection(set(['ip', 'url', 'port'])): | ||||
# Connection info was specified, don't wait for the file | ||||
url_specified = True | ||||
self.wait_for_url_file = 0 | ||||
else: | ||||
url_specified = False | ||||
if self.wait_for_url_file and not os.path.exists(self.url_file): | ||||
self.log.warn("url_file %r not found"%self.url_file) | ||||
self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file) | ||||
tic = time.time() | ||||
while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file): | ||||
# wait for url_file to exist, for up to 10 seconds | ||||
time.sleep(0.1) | ||||
MinRK
|
r3985 | if os.path.exists(self.url_file): | ||
MinRK
|
r4585 | self.load_connector_file() | ||
MinRK
|
r4120 | elif not url_specified: | ||
self.log.critical("Fatal: url file never arrived: %s"%self.url_file) | ||||
self.exit(1) | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | try: | ||
exec_lines = config.Kernel.exec_lines | ||||
except AttributeError: | ||||
config.Kernel.exec_lines = [] | ||||
exec_lines = config.Kernel.exec_lines | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | if self.startup_script: | ||
enc = sys.getfilesystemencoding() or 'utf8' | ||||
cmd="execfile(%r)"%self.startup_script.encode(enc) | ||||
exec_lines.append(cmd) | ||||
if self.startup_command: | ||||
exec_lines.append(self.startup_command) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | # Create the underlying shell class and Engine | ||
MinRK
|
r3604 | # shell_class = import_item(self.master_config.Global.shell_class) | ||
MinRK
|
r3985 | # print self.config | ||
MinRK
|
r3604 | try: | ||
MinRK
|
r3985 | self.engine = EngineFactory(config=config, log=self.log) | ||
MinRK
|
r3604 | except: | ||
self.log.error("Couldn't start the Engine", exc_info=True) | ||||
self.exit(1) | ||||
MinRK
|
r4585 | |||
MinRK
|
r3989 | def forward_logging(self): | ||
if self.log_url: | ||||
self.log.info("Forwarding logging to %s"%self.log_url) | ||||
context = self.engine.context | ||||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.log_url) | ||||
self.log.removeHandler(self._log_handler) | ||||
handler = EnginePUBHandler(self.engine, lsock) | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
self._log_handler = handler | ||||
MinRK
|
r4585 | |||
MinRK
|
r3985 | def init_mpi(self): | ||
MinRK
|
r3604 | global mpi | ||
MinRK
|
r3985 | self.mpi = MPI(config=self.config) | ||
mpi_import_statement = self.mpi.init_script | ||||
if mpi_import_statement: | ||||
MinRK
|
r3604 | try: | ||
self.log.info("Initializing MPI:") | ||||
self.log.info(mpi_import_statement) | ||||
exec mpi_import_statement in globals() | ||||
except: | ||||
mpi = None | ||||
else: | ||||
mpi = None | ||||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
super(IPEngineApp, self).initialize(argv) | ||||
self.init_mpi() | ||||
self.init_engine() | ||||
MinRK
|
r3989 | self.forward_logging() | ||
MinRK
|
r3986 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3604 | self.engine.start() | ||
try: | ||||
self.engine.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Engine Interrupted, shutting down...\n") | ||||
def launch_new_instance(): | ||||
MinRK
|
r3985 | """Create and run the IPython engine""" | ||
MinRK
|
r3999 | app = IPEngineApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3604 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||