ipcontrollerapp.py
402 lines
| 14.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
# encoding: utf-8 | ||||
""" | ||||
The IPython controller application. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import with_statement | ||||
import os | ||||
MinRK
|
r3614 | import socket | ||
MinRK
|
r3631 | import stat | ||
import sys | ||||
MinRK
|
r3614 | import uuid | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | from multiprocessing import Process | ||
MinRK
|
r3604 | import zmq | ||
MinRK
|
r3985 | from zmq.devices import ProcessMonitoredQueue | ||
MinRK
|
r3604 | from zmq.log.handlers import PUBHandler | ||
MinRK
|
r3614 | from zmq.utils import jsonapi as json | ||
MinRK
|
r3604 | |||
MinRK
|
r4000 | from IPython.config.application import boolean_flag | ||
MinRK
|
r3992 | from IPython.core.newapplication import ProfileDir | ||
MinRK
|
r3688 | |||
MinRK
|
r3993 | from IPython.parallel.apps.baseapp import ( | ||
MinRK
|
r3992 | BaseParallelApplication, | ||
MinRK
|
r3985 | base_flags | ||
MinRK
|
r3604 | ) | ||
MinRK
|
r3985 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3988 | from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict | ||
MinRK
|
r3985 | |||
# from IPython.parallel.controller.controller import ControllerFactory | ||||
from IPython.parallel.streamsession import StreamSession | ||||
from IPython.parallel.controller.heartmonitor import HeartMonitor | ||||
MinRK
|
r3992 | from IPython.parallel.controller.hub import HubFactory | ||
MinRK
|
r3985 | from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler | ||
from IPython.parallel.controller.sqlitedb import SQLiteDB | ||||
MinRK
|
r3992 | from IPython.parallel.util import signal_children, split_url | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | # conditional import of MongoDB backend class | ||
try: | ||||
from IPython.parallel.controller.mongodb import MongoDB | ||||
except ImportError: | ||||
maybe_mongo = [] | ||||
else: | ||||
maybe_mongo = [MongoDB] | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# Module level variables | ||||
#----------------------------------------------------------------------------- | ||||
#: The default config file name for this application | ||||
MinRK
|
r3672 | default_config_file_name = u'ipcontroller_config.py' | ||
MinRK
|
r3604 | |||
_description = """Start the IPython controller for parallel computing. | ||||
The IPython controller provides a gateway between the IPython engines and | ||||
clients. The controller needs to be started before the engines and can be | ||||
configured using command line options or using a cluster directory. Cluster | ||||
directories contain config, log and security files and are usually located in | ||||
MinRK
|
r3990 | your ipython directory and named as "cluster_<profile>". See the `profile` | ||
MinRK
|
r3992 | and `profile_dir` options for details. | ||
MinRK
|
r3604 | """ | ||
#----------------------------------------------------------------------------- | ||||
# The main application | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3985 | flags = {} | ||
flags.update(base_flags) | ||||
flags.update({ | ||||
MinRK
|
r3990 | 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}}, | ||
MinRK
|
r3985 | 'Use threads instead of processes for the schedulers'), | ||
MinRK
|
r3994 | 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}}, | ||
MinRK
|
r3985 | 'use the SQLiteDB backend'), | ||
MinRK
|
r3994 | 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}}, | ||
MinRK
|
r3985 | 'use the MongoDB backend'), | ||
MinRK
|
r3994 | 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}}, | ||
MinRK
|
r3985 | 'use the in-memory DictDB backend'), | ||
MinRK
|
r3994 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, | ||
MinRK
|
r3990 | 'reuse existing json connection files') | ||
MinRK
|
r3985 | }) | ||
MinRK
|
r4000 | flags.update(boolean_flag('secure', 'IPControllerApp.secure', | ||
"Use HMAC digests for authentication of messages.", | ||||
"Don't authenticate messages." | ||||
)) | ||||
MinRK
|
r3985 | |||
MinRK
|
r3992 | class IPControllerApp(BaseParallelApplication): | ||
MinRK
|
r3604 | |||
MinRK
|
r3672 | name = u'ipcontroller' | ||
MinRK
|
r3604 | description = _description | ||
MinRK
|
r3991 | config_file_name = Unicode(default_config_file_name) | ||
MinRK
|
r3992 | classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo | ||
# change default to True | ||||
auto_create = Bool(True, config=True, | ||||
MinRK
|
r4000 | help="""Whether to create profile dir if it doesn't exist.""") | ||
MinRK
|
r3614 | |||
MinRK
|
r3985 | reuse_files = Bool(False, config=True, | ||
MinRK
|
r4000 | help='Whether to reuse existing json connection files.' | ||
MinRK
|
r3985 | ) | ||
secure = Bool(True, config=True, | ||||
MinRK
|
r4000 | help='Whether to use HMAC digests for extra message authentication.' | ||
MinRK
|
r3985 | ) | ||
ssh_server = Unicode(u'', config=True, | ||||
help="""ssh url for clients to use when connecting to the Controller | ||||
processes. It should be of the form: [user@]server[:port]. The | ||||
MinRK
|
r4000 | Controller's listening addresses must be accessible from the ssh server""", | ||
MinRK
|
r3985 | ) | ||
location = Unicode(u'', config=True, | ||||
help="""The external IP or domain name of the Controller, used for disambiguating | ||||
engine and client connections.""", | ||||
) | ||||
import_statements = List([], config=True, | ||||
help="import statements to be run at startup. Necessary in some environments" | ||||
) | ||||
MinRK
|
r3990 | use_threads = Bool(False, config=True, | ||
MinRK
|
r3985 | help='Use threads instead of processes for the schedulers', | ||
) | ||||
# internal | ||||
children = List() | ||||
MinRK
|
r3988 | mq_class = Unicode('zmq.devices.ProcessMonitoredQueue') | ||
MinRK
|
r3985 | |||
MinRK
|
r3990 | def _use_threads_changed(self, name, old, new): | ||
MinRK
|
r3985 | self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') | ||
aliases = Dict(dict( | ||||
log_level = 'IPControllerApp.log_level', | ||||
MinRK
|
r3989 | log_url = 'IPControllerApp.log_url', | ||
MinRK
|
r3985 | reuse_files = 'IPControllerApp.reuse_files', | ||
secure = 'IPControllerApp.secure', | ||||
ssh = 'IPControllerApp.ssh_server', | ||||
MinRK
|
r3990 | use_threads = 'IPControllerApp.use_threads', | ||
MinRK
|
r3985 | import_statements = 'IPControllerApp.import_statements', | ||
location = 'IPControllerApp.location', | ||||
ident = 'StreamSession.session', | ||||
user = 'StreamSession.username', | ||||
exec_key = 'StreamSession.keyfile', | ||||
url = 'HubFactory.url', | ||||
ip = 'HubFactory.ip', | ||||
transport = 'HubFactory.transport', | ||||
port = 'HubFactory.regport', | ||||
ping = 'HeartMonitor.period', | ||||
scheme = 'TaskScheduler.scheme_name', | ||||
hwm = 'TaskScheduler.hwm', | ||||
MinRK
|
r3992 | profile = "BaseIPythonApplication.profile", | ||
profile_dir = 'ProfileDir.location', | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | )) | ||
flags = Dict(flags) | ||||
MinRK
|
r3614 | |||
MinRK
|
r3985 | |||
MinRK
|
r3614 | def save_connection_dict(self, fname, cdict): | ||
"""save a connection dict to json file.""" | ||||
MinRK
|
r3985 | c = self.config | ||
MinRK
|
r3614 | url = cdict['url'] | ||
location = cdict['location'] | ||||
if not location: | ||||
try: | ||||
proto,ip,port = split_url(url) | ||||
except AssertionError: | ||||
pass | ||||
else: | ||||
location = socket.gethostbyname_ex(socket.gethostname())[2][-1] | ||||
cdict['location'] = location | ||||
MinRK
|
r3992 | fname = os.path.join(self.profile_dir.security_dir, fname) | ||
MinRK
|
r3614 | with open(fname, 'w') as f: | ||
f.write(json.dumps(cdict, indent=2)) | ||||
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) | ||||
MinRK
|
r3630 | |||
def load_config_from_json(self): | ||||
"""load config from existing json connector files.""" | ||||
MinRK
|
r3985 | c = self.config | ||
MinRK
|
r3630 | # load from engine config | ||
MinRK
|
r3992 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f: | ||
MinRK
|
r3630 | cfg = json.loads(f.read()) | ||
MinRK
|
r3985 | key = c.StreamSession.key = cfg['exec_key'] | ||
MinRK
|
r3630 | xport,addr = cfg['url'].split('://') | ||
c.HubFactory.engine_transport = xport | ||||
ip,ports = addr.split(':') | ||||
c.HubFactory.engine_ip = ip | ||||
c.HubFactory.regport = int(ports) | ||||
MinRK
|
r3985 | self.location = cfg['location'] | ||
MinRK
|
r3614 | |||
MinRK
|
r3630 | # load client config | ||
MinRK
|
r3992 | with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f: | ||
MinRK
|
r3630 | cfg = json.loads(f.read()) | ||
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" | ||||
xport,addr = cfg['url'].split('://') | ||||
c.HubFactory.client_transport = xport | ||||
ip,ports = addr.split(':') | ||||
c.HubFactory.client_ip = ip | ||||
MinRK
|
r3985 | self.ssh_server = cfg['ssh'] | ||
MinRK
|
r3630 | assert int(ports) == c.HubFactory.regport, "regport mismatch" | ||
MinRK
|
r3985 | def init_hub(self): | ||
c = self.config | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | self.do_import_statements() | ||
reusing = self.reuse_files | ||||
MinRK
|
r3630 | if reusing: | ||
try: | ||||
self.load_config_from_json() | ||||
except (AssertionError,IOError): | ||||
reusing=False | ||||
# check again, because reusing may have failed: | ||||
if reusing: | ||||
pass | ||||
MinRK
|
r3985 | elif self.secure: | ||
MinRK
|
r3630 | key = str(uuid.uuid4()) | ||
MinRK
|
r3992 | # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key) | ||
MinRK
|
r3985 | # with open(keyfile, 'w') as f: | ||
# f.write(key) | ||||
# os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) | ||||
c.StreamSession.key = key | ||||
MinRK
|
r3604 | else: | ||
MinRK
|
r3985 | key = c.StreamSession.key = '' | ||
MinRK
|
r3604 | |||
try: | ||||
MinRK
|
r3985 | self.factory = HubFactory(config=c, log=self.log) | ||
# self.start_logging() | ||||
self.factory.init_hub() | ||||
MinRK
|
r3604 | except: | ||
self.log.error("Couldn't construct the Controller", exc_info=True) | ||||
self.exit(1) | ||||
MinRK
|
r3614 | |||
MinRK
|
r3630 | if not reusing: | ||
# save to new json config files | ||||
f = self.factory | ||||
cdict = {'exec_key' : key, | ||||
MinRK
|
r3985 | 'ssh' : self.ssh_server, | ||
MinRK
|
r3630 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), | ||
MinRK
|
r3985 | 'location' : self.location | ||
MinRK
|
r3630 | } | ||
self.save_connection_dict('ipcontroller-client.json', cdict) | ||||
edict = cdict | ||||
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) | ||||
self.save_connection_dict('ipcontroller-engine.json', edict) | ||||
MinRK
|
r3985 | |||
# | ||||
def init_schedulers(self): | ||||
children = self.children | ||||
MinRK
|
r3989 | mq = import_item(str(self.mq_class)) | ||
MinRK
|
r3614 | |||
MinRK
|
r3985 | hub = self.factory | ||
MinRK
|
r3990 | # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url | ||
MinRK
|
r3985 | # IOPub relay (in a Process) | ||
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') | ||||
q.bind_in(hub.client_info['iopub']) | ||||
q.bind_out(hub.engine_info['iopub']) | ||||
q.setsockopt_out(zmq.SUBSCRIBE, '') | ||||
q.connect_mon(hub.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
# Multiplexer Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | ||||
q.bind_in(hub.client_info['mux']) | ||||
q.setsockopt_in(zmq.IDENTITY, 'mux') | ||||
q.bind_out(hub.engine_info['mux']) | ||||
q.connect_mon(hub.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
# Control Queue (in a Process) | ||||
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | ||||
q.bind_in(hub.client_info['control']) | ||||
q.setsockopt_in(zmq.IDENTITY, 'control') | ||||
q.bind_out(hub.engine_info['control']) | ||||
q.connect_mon(hub.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
try: | ||||
scheme = self.config.TaskScheduler.scheme_name | ||||
except AttributeError: | ||||
scheme = TaskScheduler.scheme_name.get_default_value() | ||||
# Task Queue (in a Process) | ||||
if scheme == 'pure': | ||||
self.log.warn("task::using pure XREQ Task scheduler") | ||||
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | ||||
# q.setsockopt_out(zmq.HWM, hub.hwm) | ||||
q.bind_in(hub.client_info['task'][1]) | ||||
q.setsockopt_in(zmq.IDENTITY, 'task') | ||||
q.bind_out(hub.engine_info['task']) | ||||
q.connect_mon(hub.monitor_url) | ||||
q.daemon=True | ||||
children.append(q) | ||||
elif scheme == 'none': | ||||
self.log.warn("task::using no Task scheduler") | ||||
else: | ||||
self.log.info("task::using Python %s Task scheduler"%scheme) | ||||
sargs = (hub.client_info['task'][1], hub.engine_info['task'], | ||||
hub.monitor_url, hub.client_info['notification']) | ||||
MinRK
|
r3989 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | ||
log_url = self.log_url, config=dict(self.config)) | ||||
MinRK
|
r3985 | q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) | ||
q.daemon=True | ||||
children.append(q) | ||||
MinRK
|
r3605 | |||
def save_urls(self): | ||||
"""save the registration urls to files.""" | ||||
MinRK
|
r3985 | c = self.config | ||
MinRK
|
r3605 | |||
MinRK
|
r3992 | sec_dir = self.profile_dir.security_dir | ||
MinRK
|
r3605 | cf = self.factory | ||
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: | ||||
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) | ||||
with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: | ||||
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3985 | def do_import_statements(self): | ||
statements = self.import_statements | ||||
MinRK
|
r3604 | for s in statements: | ||
try: | ||||
self.log.msg("Executing statement: '%s'" % s) | ||||
exec s in globals(), locals() | ||||
except: | ||||
self.log.msg("Error running statement: %s" % s) | ||||
MinRK
|
r3989 | def forward_logging(self): | ||
if self.log_url: | ||||
self.log.info("Forwarding logging to %s"%self.log_url) | ||||
context = zmq.Context.instance() | ||||
lsock = context.socket(zmq.PUB) | ||||
lsock.connect(self.log_url) | ||||
handler = PUBHandler(lsock) | ||||
self.log.removeHandler(self._log_handler) | ||||
handler.root_topic = 'controller' | ||||
handler.setLevel(self.log_level) | ||||
self.log.addHandler(handler) | ||||
self._log_handler = handler | ||||
MinRK
|
r3985 | # # | ||
MinRK
|
r3986 | |||
def initialize(self, argv=None): | ||||
super(IPControllerApp, self).initialize(argv) | ||||
MinRK
|
r3989 | self.forward_logging() | ||
MinRK
|
r3986 | self.init_hub() | ||
self.init_schedulers() | ||||
MinRK
|
r3985 | def start(self): | ||
MinRK
|
r3605 | # Start the subprocesses: | ||
MinRK
|
r3604 | self.factory.start() | ||
MinRK
|
r3985 | child_procs = [] | ||
for child in self.children: | ||||
child.start() | ||||
if isinstance(child, ProcessMonitoredQueue): | ||||
child_procs.append(child.launcher) | ||||
elif isinstance(child, Process): | ||||
child_procs.append(child) | ||||
if child_procs: | ||||
signal_children(child_procs) | ||||
MinRK
|
r3604 | self.write_pid_file(overwrite=True) | ||
MinRK
|
r3985 | |||
MinRK
|
r3604 | try: | ||
self.factory.loop.start() | ||||
except KeyboardInterrupt: | ||||
self.log.critical("Interrupted, Exiting...\n") | ||||
MinRK
|
r3986 | |||
MinRK
|
r3604 | |||
def launch_new_instance(): | ||||
"""Create and run the IPython controller""" | ||||
MinRK
|
r3999 | app = IPControllerApp.instance() | ||
MinRK
|
r3986 | app.initialize() | ||
MinRK
|
r3604 | app.start() | ||
if __name__ == '__main__': | ||||
launch_new_instance() | ||||