##// END OF EJS Templates
Adding temp refs to msg_type to prevent nested dict gets.
Adding temp refs to msg_type to prevent nested dict gets.

File last commit:

r4218:393ea007
r4236:ff292f9c
Show More
ipcontrollerapp.py
425 lines | 15.1 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
Refactor newparallel to use Config system...
r3604 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
resort imports in a cleaner order
r3631 import stat
import sys
MinRK
persist connection data to disk as json
r3614 import uuid
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from multiprocessing import Process
MinRK
Refactor newparallel to use Config system...
r3604 import zmq
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from zmq.devices import ProcessMonitoredQueue
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.log.handlers import PUBHandler
MinRK
persist connection data to disk as json
r3614 from zmq.utils import jsonapi as json
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 from IPython.config.application import boolean_flag
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 from IPython.core.profiledir import ProfileDir
MinRK
fix residual import issues with IPython.parallel reorganization
r3688
MinRK
rename clusterdir to more descriptive baseapp...
r3993 from IPython.parallel.apps.baseapp import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseParallelApplication,
MinRK
cleanup aliases in parallel apps...
r4115 base_aliases,
base_flags,
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.importstring import import_item
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
# from IPython.parallel.controller.controller import ControllerFactory
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 from IPython.zmq.session import Session
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.heartmonitor import HeartMonitor
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.parallel.controller.hub import HubFactory
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB
MinRK
cleanup per review...
r4161 from IPython.parallel.util import signal_children, split_url, asbytes
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # conditional import of MongoDB backend class
try:
from IPython.parallel.controller.mongodb import MongoDB
except ImportError:
maybe_mongo = []
else:
maybe_mongo = [MongoDB]
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipcontroller_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
MinRK
move ipcluster create|list to `ipython profile create|list`...
r4024 your ipython directory and named as "profile_name". See the `profile`
Brian E. Granger
Finishing up help string work.
r4218 and `profile-dir` options for details.
MinRK
Refactor newparallel to use Config system...
r3604 """
Brian Granger
More work on adding examples to help strings.
r4216 _examples = """
ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
ipcontroller --scheme=pure # use the pure zeromq scheduler
"""
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = {}
flags.update(base_flags)
flags.update({
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'Use threads instead of processes for the schedulers'),
MinRK
remove uneccesary Config objects from flags.
r3994 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the SQLiteDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the MongoDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the in-memory DictDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'reuse existing json connection files')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 })
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
"Use HMAC digests for authentication of messages.",
"Don't authenticate messages."
))
MinRK
cleanup aliases in parallel apps...
r4115 aliases = dict(
secure = 'IPControllerApp.secure',
ssh = 'IPControllerApp.ssh_server',
location = 'IPControllerApp.location',
ident = 'Session.session',
user = 'Session.username',
MinRK
aliases match flag pattern ('-' as wordsep, not '_')...
r4214 keyfile = 'Session.keyfile',
MinRK
cleanup aliases in parallel apps...
r4115
url = 'HubFactory.url',
ip = 'HubFactory.ip',
transport = 'HubFactory.transport',
port = 'HubFactory.regport',
ping = 'HeartMonitor.period',
scheme = 'TaskScheduler.scheme_name',
hwm = 'TaskScheduler.hwm',
)
aliases.update(base_aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
Brian Granger
More work on adding examples to help strings.
r4216
MinRK
update parallel apps to use ProfileDir
r3992 class IPControllerApp(BaseParallelApplication):
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipcontroller'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
Brian Granger
More work on adding examples to help strings.
r4216 examples = _examples
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 config_file_name = Unicode(default_config_file_name)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
MinRK
update parallel apps to use ProfileDir
r3992
# change default to True
auto_create = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help="""Whether to create profile dir if it doesn't exist.""")
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = Bool(False, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help='Whether to reuse existing json connection files.'
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
secure = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help='Whether to use HMAC digests for extra message authentication.'
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 Controller's listening addresses must be accessible from the ssh server""",
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
location = Unicode(u'', config=True,
help="""The external IP or domain name of the Controller, used for disambiguating
engine and client connections.""",
)
import_statements = List([], config=True,
help="import statements to be run at startup. Necessary in some environments"
)
MinRK
parallel docs, tests, default config updated to newconfig
r3990 use_threads = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='Use threads instead of processes for the schedulers',
)
# internal
children = List()
MinRK
cleanup parallel traits...
r3988 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
parallel docs, tests, default config updated to newconfig
r3990 def _use_threads_changed(self, name, old, new):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
MinRK
cleanup aliases in parallel apps...
r4115 aliases = Dict(aliases)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = Dict(flags)
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
persist connection data to disk as json
r3614 def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
persist connection data to disk as json
r3614 url = cdict['url']
location = cdict['location']
if not location:
try:
proto,ip,port = split_url(url)
except AssertionError:
pass
else:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
cdict['location'] = location
MinRK
update parallel apps to use ProfileDir
r3992 fname = os.path.join(self.profile_dir.security_dir, fname)
Thomas Kluyver
Tweaks to improve automated conversion to Python 3 code.
r4110 with open(fname, 'wb') as f:
MinRK
persist connection data to disk as json
r3614 f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
MinRK
add default ip<x>z_config files
r3630
def load_config_from_json(self):
"""load config from existing json connector files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
add default ip<x>z_config files
r3630 # load from engine config
MinRK
update parallel apps to use ProfileDir
r3992 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
MinRK
cleanup per review...
r4161 key = c.Session.key = asbytes(cfg['exec_key'])
MinRK
add default ip<x>z_config files
r3630 xport,addr = cfg['url'].split('://')
c.HubFactory.engine_transport = xport
ip,ports = addr.split(':')
c.HubFactory.engine_ip = ip
c.HubFactory.regport = int(ports)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.location = cfg['location']
MinRK
add default ip<x>z_config files
r3630 # load client config
MinRK
update parallel apps to use ProfileDir
r3992 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
c.HubFactory.client_transport = xport
ip,ports = addr.split(':')
c.HubFactory.client_ip = ip
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ssh_server = cfg['ssh']
MinRK
add default ip<x>z_config files
r3630 assert int(ports) == c.HubFactory.regport, "regport mismatch"
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
c = self.config
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.do_import_statements()
reusing = self.reuse_files
MinRK
add default ip<x>z_config files
r3630 if reusing:
try:
self.load_config_from_json()
except (AssertionError,IOError):
reusing=False
# check again, because reusing may have failed:
if reusing:
pass
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 elif self.secure:
MinRK
add default ip<x>z_config files
r3630 key = str(uuid.uuid4())
MinRK
update parallel apps to use ProfileDir
r3992 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # with open(keyfile, 'w') as f:
# f.write(key)
# os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
MinRK
cleanup per review...
r4161 c.Session.key = asbytes(key)
MinRK
Refactor newparallel to use Config system...
r3604 else:
MinRK
update parallel code for py3k...
r4155 key = c.Session.key = b''
MinRK
Refactor newparallel to use Config system...
r3604
try:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.factory = HubFactory(config=c, log=self.log)
# self.start_logging()
self.factory.init_hub()
MinRK
Refactor newparallel to use Config system...
r3604 except:
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
MinRK
persist connection data to disk as json
r3614
MinRK
add default ip<x>z_config files
r3630 if not reusing:
# save to new json config files
f = self.factory
cdict = {'exec_key' : key,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'ssh' : self.ssh_server,
MinRK
add default ip<x>z_config files
r3630 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'location' : self.location
MinRK
add default ip<x>z_config files
r3630 }
self.save_connection_dict('ipcontroller-client.json', cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
self.save_connection_dict('ipcontroller-engine.json', edict)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
#
def init_schedulers(self):
children = self.children
MinRK
re-enable log forwarding and iplogger
r3989 mq = import_item(str(self.mq_class))
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 hub = self.factory
MinRK
parallel docs, tests, default config updated to newconfig
r3990 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # IOPub relay (in a Process)
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['iopub'])
q.bind_out(hub.engine_info['iopub'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_out(zmq.SUBSCRIBE, b'')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['mux'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'mux')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['mux'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_in(hub.client_info['control'])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'control')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['control'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
scheme = TaskScheduler.scheme_name.get_default_value()
# Task Queue (in a Process)
if scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
MinRK
update parallel code for py3k...
r4155 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # q.setsockopt_out(zmq.HWM, hub.hwm)
q.bind_in(hub.client_info['task'][1])
MinRK
update parallel code for py3k...
r4155 q.setsockopt_in(zmq.IDENTITY, b'task')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q.bind_out(hub.engine_info['task'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
elif scheme == 'none':
self.log.warn("task::using no Task scheduler")
else:
self.log.info("task::using Python %s Task scheduler"%scheme)
sargs = (hub.client_info['task'][1], hub.engine_info['task'],
hub.monitor_url, hub.client_info['notification'])
MinRK
re-enable log forwarding and iplogger
r3989 kwargs = dict(logname='scheduler', loglevel=self.log_level,
log_url = self.log_url, config=dict(self.config))
MinRK
allow true single-threaded Controller...
r4092 if 'Process' in self.mq_class:
# run the Python scheduler in a Process
q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
else:
# single-threaded Controller
kwargs['in_thread'] = True
launch_scheduler(*sargs, **kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def save_urls(self):
"""save the registration urls to files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
update parallel apps to use ProfileDir
r3992 sec_dir = self.profile_dir.security_dir
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 cf = self.factory
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def do_import_statements(self):
statements = self.import_statements
MinRK
Refactor newparallel to use Config system...
r3604 for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
MinRK
re-enable log forwarding and iplogger
r3989 def forward_logging(self):
if self.log_url:
self.log.info("Forwarding logging to %s"%self.log_url)
context = zmq.Context.instance()
lsock = context.socket(zmq.PUB)
lsock.connect(self.log_url)
handler = PUBHandler(lsock)
self.log.removeHandler(self._log_handler)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
self._log_handler = handler
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # #
MinRK
ipcluster implemented with new subcommands
r3986
def initialize(self, argv=None):
super(IPControllerApp, self).initialize(argv)
MinRK
re-enable log forwarding and iplogger
r3989 self.forward_logging()
MinRK
ipcluster implemented with new subcommands
r3986 self.init_hub()
self.init_schedulers()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def start(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Start the subprocesses:
MinRK
Refactor newparallel to use Config system...
r3604 self.factory.start()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 child_procs = []
for child in self.children:
child.start()
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
MinRK
Refactor newparallel to use Config system...
r3604 self.write_pid_file(overwrite=True)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604
def launch_new_instance():
"""Create and run the IPython controller"""
MinRK
prevent infinite Controllers on Windows...
r4095 if sys.platform == 'win32':
# make sure we don't get called from a multiprocessing subprocess
# this can result in infinite Controllers being started on Windows
# which doesn't have a proper fork, so multiprocessing is wonky
# this only comes up when IPython has been installed using vanilla
# setuptools, and *not* distribute.
MinRK
use multiprocessing instead of inspect to prevent spurious controllers...
r4096 import multiprocessing
p = multiprocessing.current_process()
# the main process has name 'MainProcess'
# subprocesses will have names like 'Process-1'
if p.name != 'MainProcess':
# we are a subprocess, don't start another Controller!
return
MinRK
use App.instance() in launch_new_instance (parallel apps)...
r3999 app = IPControllerApp.instance()
MinRK
ipcluster implemented with new subcommands
r3986 app.initialize()
MinRK
Refactor newparallel to use Config system...
r3604 app.start()
if __name__ == '__main__':
launch_new_instance()