ipengineapp.py
391 lines
| 12.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython engine application | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Brian Granger | ||||
* MinRK | ||||
MinRK
|
r3604 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2008-2011 The IPython Development Team | ||
MinRK
|
r3604 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3631 | import json | ||
MinRK
|
r3604 | import os | ||
import sys | ||||
MinRK
|
r4120 | import time | ||
MinRK
|
r3604 | |||
import zmq | ||||
from zmq.eventloop import ioloop | ||||
MinRK
|
r4024 | from IPython.core.profiledir import ProfileDir | ||
MinRK
|
r4115 | from IPython.parallel.apps.baseapp import ( | ||
BaseParallelApplication, | ||||
base_aliases, | ||||
base_flags, | ||||
MinRK
|
r5214 | catch_config_error, | ||
MinRK
|
r4115 | ) | ||
MinRK
|
r3604 | from IPython.zmq.log import EnginePUBHandler | ||
MinRK
|
r6887 | from IPython.zmq.ipkernel import Kernel, IPKernelApp | ||
MinRK
|
r4962 | from IPython.zmq.session import ( | ||
Session, session_aliases, session_flags | ||||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | from IPython.config.configurable import Configurable | ||
MinRK
|
r4962 | |||
MinRK
|
r3673 | from IPython.parallel.engine.engine import EngineFactory | ||
MinRK
|
r7890 | from IPython.parallel.util import disambiguate_ip_address | ||
MinRK
|
r3673 | |||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r6813 | from IPython.utils.py3compat import cast_bytes | ||
MinRK
|
r6887 | from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float, Instance | ||
MinRK
|
r3604 | |||
MinRK
|
r3614 | |||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
MinRK
|
r3672 | default_config_file_name = u'ipengine_config.py' | ||
MinRK
|
r3604 | |||
MinRK
|
r3990 | _description = """Start an IPython engine for parallel computing. | ||
MinRK
|
r3985 | |||
IPython engines run in parallel and perform computations on behalf of a client | ||||
and controller. A controller needs to be started before the engines. The | ||||
engine can be configured using command line options or using a cluster | ||||
directory. Cluster directories contain config, log and security files and are | ||||
MinRK
|
r4024 | usually located in your ipython directory and named as "profile_name". | ||
Brian E. Granger
|
r4218 | See the `profile` and `profile-dir` options for details. | ||
MinRK
|
r3985 | """ | ||
Brian Granger
|
r4216 | _examples = """ | ||
ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port | ||||
Brian E. Granger
|
r4218 | ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity | ||
Brian Granger
|
r4216 | """ | ||
MinRK
|
r3985 | |||
#----------------------------------------------------------------------------- | ||||
# MPI configuration | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
mpi4py_init = """from mpi4py import MPI as mpi | ||||
mpi.size = mpi.COMM_WORLD.Get_size() | ||||
mpi.rank = mpi.COMM_WORLD.Get_rank() | ||||
""" | ||||
pytrilinos_init = """from PyTrilinos import Epetra | ||||
class SimpleStruct: | ||||
pass | ||||
mpi = SimpleStruct() | ||||
mpi.rank = 0 | ||||
mpi.size = 0 | ||||
""" | ||||
MinRK
|
r3985 | class MPI(Configurable): | ||
"""Configurable for MPI initialization""" | ||||
MinRK
|
r3988 | use = Unicode('', config=True, | ||
MinRK
|
r3985 | help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' | ||
) | ||||
MinRK
|
r3604 | |||
MinRK
|
r4850 | def _use_changed(self, name, old, new): | ||
MinRK
|
r3985 | # load default init script if it's not set | ||
if not self.init_script: | ||||
self.init_script = self.default_inits.get(new, '') | ||||
MinRK
|
r3988 | init_script = Unicode('', config=True, | ||
MinRK
|
r3985 | help="Initialization code for MPI") | ||
default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init}, | ||||
config=True) | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | # Main application | ||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4115 | aliases = dict( | ||
file = 'IPEngineApp.url_file', | ||||
c = 'IPEngineApp.startup_command', | ||||
s = 'IPEngineApp.startup_script', | ||||
MinRK
|
r3604 | |||
MinRK
|
r4115 | url = 'EngineFactory.url', | ||
MinRK
|
r4585 | ssh = 'EngineFactory.sshserver', | ||
sshkey = 'EngineFactory.sshkey', | ||||
MinRK
|
r4115 | ip = 'EngineFactory.ip', | ||
transport = 'EngineFactory.transport', | ||||
port = 'EngineFactory.regport', | ||||
location = 'EngineFactory.location', | ||||
timeout = 'EngineFactory.timeout', | ||||
mpi = 'MPI.use', | ||||
) | ||||
aliases.update(base_aliases) | ||||
MinRK
|
r4962 | aliases.update(session_aliases) | ||
flags = {} | ||||
flags.update(base_flags) | ||||
flags.update(session_flags) | ||||
Brian Granger
|
r4216 | |||
MinRK
|
r3992 | class IPEngineApp(BaseParallelApplication): | ||
MinRK
|
r3985 | |||
MinRK
|
r4847 | name = 'ipengine' | ||
description = _description | ||||
Brian Granger
|
r4216 | examples = _examples | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r4006 | classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI]) | ||
MinRK
|
r3985 | |||
startup_script = Unicode(u'', config=True, | ||||
help='specify a script to be run at startup') | ||||
MinRK
|
r3988 | startup_command = Unicode('', config=True, | ||
MinRK
|
r3604 | help='specify a command to be run at startup') | ||
MinRK
|
r3985 | url_file = Unicode(u'', config=True, | ||
help="""The full location of the file containing the connection information for | ||||
the controller. If this is not given, the file must be in the | ||||
security directory of the cluster directory. This location is | ||||
MinRK
|
r3992 | resolved using the `profile` or `profile_dir` options.""", | ||
MinRK
|
r3985 | ) | ||
MinRK
|
r4120 | wait_for_url_file = Float(5, config=True, | ||
help="""The maximum number of seconds to wait for url_file to exist. | ||||
This is useful for batch-systems and shared-filesystems where the | ||||
controller and engine are started at the same time and it | ||||
may take a moment for the controller to write the connector files.""") | ||||
MinRK
|
r3604 | |||
MinRK
|
r4847 | url_file_name = Unicode(u'ipcontroller-engine.json', config=True) | ||
def _cluster_id_changed(self, name, old, new): | ||||
if new: | ||||
MinRK
|
r4850 | base = 'ipcontroller-%s' % new | ||
MinRK
|
r4847 | else: | ||
base = 'ipcontroller' | ||||
MinRK
|
r4850 | self.url_file_name = "%s-engine.json" % base | ||
MinRK
|
r4847 | |||
MinRK
|
r3989 | log_url = Unicode('', config=True, | ||
help="""The URL for the iploggerapp instance, for forwarding | ||||
logging to a central location.""") | ||||
MinRK
|
r6887 | |||
# an IPKernelApp instance, used to setup listening for shell frontends | ||||
kernel_app = Instance(IPKernelApp) | ||||
MinRK
|
r3604 | |||
MinRK
|
r4115 | aliases = Dict(aliases) | ||
MinRK
|
r4962 | flags = Dict(flags) | ||
MinRK
|
r6879 | |||
@property | ||||
def kernel(self): | ||||
"""allow access to the Kernel object, so I look like IPKernelApp""" | ||||
return self.engine.kernel | ||||
MinRK
|
r3604 | |||
MinRK
|
r3614 | def find_url_file(self): | ||
MinRK
|
r4120 | """Set the url file. | ||
MinRK
|
r3604 | |||
Here we don't try to actually see if it exists for is valid as that | ||||
is hadled by the connection logic. | ||||
""" | ||||
MinRK
|
r3985 | config = self.config | ||
MinRK
|
r3604 | # Find the actual controller key file | ||
MinRK
|
r3985 | if not self.url_file: | ||
self.url_file = os.path.join( | ||||
MinRK
|
r3992 | self.profile_dir.security_dir, | ||
MinRK
|
r3985 | self.url_file_name | ||
MinRK
|
r3604 | ) | ||
MinRK
|
r4585 | |||
def load_connector_file(self): | ||||
"""load config from a JSON connector file, | ||||
at a *lower* priority than command-line/config files. | ||||
""" | ||||
MinRK
|
r5483 | self.log.info("Loading url_file %r", self.url_file) | ||
MinRK
|
r4585 | config = self.config | ||
with open(self.url_file) as f: | ||||
d = json.loads(f.read()) | ||||
MinRK
|
r7889 | # allow hand-override of location for disambiguation | ||
# and ssh-server | ||||
MinRK
|
r4585 | try: | ||
config.EngineFactory.location | ||||
except AttributeError: | ||||
config.EngineFactory.location = d['location'] | ||||
try: | ||||
config.EngineFactory.sshserver | ||||
except AttributeError: | ||||
MinRK
|
r7889 | config.EngineFactory.sshserver = d.get('ssh') | ||
location = config.EngineFactory.location | ||||
MinRK
|
r7890 | proto, ip = d['interface'].split('://') | ||
ip = disambiguate_ip_address(ip) | ||||
d['interface'] = '%s://%s' % (proto, ip) | ||||
MinRK
|
r7889 | |||
# DO NOT allow override of basic URLs, serialization, or exec_key | ||||
# JSON file takes top priority there | ||||
MinRK
|
r7890 | config.Session.key = cast_bytes(d['exec_key']) | ||
MinRK
|
r7889 | |||
MinRK
|
r7890 | config.EngineFactory.url = d['interface'] + ':%i' % d['registration'] | ||
MinRK
|
r7889 | |||
config.Session.packer = d['pack'] | ||||
config.Session.unpacker = d['unpack'] | ||||
self.log.debug("Config changed:") | ||||
self.log.debug("%r", config) | ||||
self.connection_info = d | ||||
MinRK
|
r6887 | |||
MinRK
|
r6893 | def bind_kernel(self, **kwargs): | ||
"""Promote engine to listening kernel, accessible to frontends.""" | ||||
MinRK
|
r6887 | if self.kernel_app is not None: | ||
return | ||||
MinRK
|
r6893 | self.log.info("Opening ports for direct connections as an IPython kernel") | ||
MinRK
|
r6887 | |||
kernel = self.kernel | ||||
MinRK
|
r6893 | kwargs.setdefault('config', self.config) | ||
kwargs.setdefault('log', self.log) | ||||
kwargs.setdefault('profile_dir', self.profile_dir) | ||||
kwargs.setdefault('session', self.engine.session) | ||||
app = self.kernel_app = IPKernelApp(**kwargs) | ||||
# allow IPKernelApp.instance(): | ||||
IPKernelApp._instance = app | ||||
MinRK
|
r6887 | app.init_connection_file() | ||
# relevant contents of init_sockets: | ||||
app.shell_port = app._bind_socket(kernel.shell_streams[0], app.shell_port) | ||||
app.log.debug("shell ROUTER Channel on port: %i", app.shell_port) | ||||
MinRK
|
r4585 | |||
MinRK
|
r6887 | app.iopub_port = app._bind_socket(kernel.iopub_socket, app.iopub_port) | ||
app.log.debug("iopub PUB Channel on port: %i", app.iopub_port) | ||||
kernel.stdin_socket = self.engine.context.socket(zmq.ROUTER) | ||||
app.stdin_port = app._bind_socket(kernel.stdin_socket, app.stdin_port) | ||||
app.log.debug("stdin ROUTER Channel on port: %i", app.stdin_port) | ||||
# start the heartbeat, and log connection info: | ||||
app.init_heartbeat() | ||||
app.log_connection_info() | ||||
app.write_connection_file() | ||||
MinRK
|
r3985 | def init_engine(self): | ||
MinRK
|
r3604 | # This is the working dir by now. | ||
sys.path.insert(0, '') | ||||
MinRK
|
r3985 | config = self.config | ||
# print config | ||||
self.find_url_file() | ||||
MinRK
|
r4120 | |||
# was the url manually specified? | ||||
keys = set(self.config.EngineFactory.keys()) | ||||
keys = keys.union(set(self.config.RegistrationFactory.keys())) | ||||
if keys.intersection(set(['ip', 'url', 'port'])): | ||||
# Connection info was specified, don't wait for the file | ||||
url_specified = True | ||||
self.wait_for_url_file = 0 | ||||
else: | ||||
url_specified = False | ||||
if self.wait_for_url_file and not os.path.exists(self.url_file): | ||||
MinRK
|
r5483 | self.log.warn("url_file %r not found", self.url_file) | ||
self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file) | ||||
MinRK
|
r4120 | tic = time.time() | ||
while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file): | ||||
MinRK
|
r5483 | # wait for url_file to exist, or until time limit | ||
MinRK
|
r4120 | time.sleep(0.1) | ||
MinRK
|
r3985 | if os.path.exists(self.url_file): | ||
MinRK
|
r4585 | self.load_connector_file() | ||
MinRK
|
r4120 | elif not url_specified: | ||
MinRK
|
r5483 | self.log.fatal("Fatal: url file never arrived: %s", self.url_file) | ||
MinRK
|
r4120 | self.exit(1) | ||
MinRK
|
r3614 | |||
MinRK
|
r3985 | try: | ||
exec_lines = config.Kernel.exec_lines | ||||
except AttributeError: | ||||
config.Kernel.exec_lines = [] | ||||
exec_lines = config.Kernel.exec_lines | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | if self.startup_script: | ||
enc = sys.getfilesystemencoding() or 'utf8' | ||||
MinRK
|
r5483 | cmd="execfile(%r)" % self.startup_script.encode(enc) | ||
MinRK
|
r3985 | exec_lines.append(cmd) | ||
if self.startup_command: | ||||
exec_lines.append(self.startup_command) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | # Create the underlying shell class and Engine | ||
MinRK
|
r3604 | # shell_class = import_item(self.master_config.Global.shell_class) | ||
MinRK
|
r3985 | # print self.config | ||
MinRK
|
r3604 | try: | ||
MinRK
|
r7889 | self.engine = EngineFactory(config=config, log=self.log, | ||
connection_info=self.connection_info, | ||||
) | ||||
MinRK
|
r3604 | except: | ||
self.log.error("Couldn't start the Engine", exc_info=True) | ||||
self.exit(1) | ||||
MinRK
|
r4585 | |||
MinRK
|
r3989 | def forward_logging(self): | ||
if self.log_url: | ||||
MinRK
|
r5483 | self.log.info("Forwarding logging to %s", self.log_url) | ||
MinRK
|
r3989 | context = self.engine.context | ||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.log_url) | ||||
handler = EnginePUBHandler(self.engine, lsock) | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
MinRK
|
r4585 | |||
MinRK
|
r3985 | def init_mpi(self): | ||
MinRK
|
r3604 | global mpi | ||
MinRK
|
r3985 | self.mpi = MPI(config=self.config) | ||
mpi_import_statement = self.mpi.init_script | ||||
if mpi_import_statement: | ||||
MinRK
|
r3604 | try: | ||
self.log.info("Initializing MPI:") | ||||
self.log.info(mpi_import_statement) | ||||
exec mpi_import_statement in globals() | ||||
except: | ||||
mpi = None | ||||
else: | ||||
mpi = None | ||||
MinRK
|
r5214 | @catch_config_error | ||
MinRK
|
r3986 | def initialize(self, argv=None): | ||
super(IPEngineApp, self).initialize(argv) | ||||
self.init_mpi() | ||||
self.init_engine() | ||||
MinRK
|
r3989 | self.forward_logging() | ||
MinRK
|
r3986 | |||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3604 | self.engine.start() | ||
try: | ||||
self.engine.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Engine Interrupted, shutting down...\n") | ||||
def launch_new_instance(): | ||||
MinRK
|
r3985 | """Create and run the IPython engine""" | ||
MinRK
|
r3999 | app = IPEngineApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3604 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||