##// END OF EJS Templates
don't pass profile_dir as kwarg in ipclusterapp...
don't pass profile_dir as kwarg in ipclusterapp also fix PBS->SGE typo from copy/paste

File last commit:

r4000:59bfd5de
r4001:f33ae765
Show More
ipcontrollerapp.py
402 lines | 14.3 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
resort imports in a cleaner order
r3631 import stat
import sys
MinRK
persist connection data to disk as json
r3614 import uuid
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from multiprocessing import Process
MinRK
Refactor newparallel to use Config system...
r3604 import zmq
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from zmq.devices import ProcessMonitoredQueue
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.log.handlers import PUBHandler
MinRK
persist connection data to disk as json
r3614 from zmq.utils import jsonapi as json
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 from IPython.config.application import boolean_flag
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.core.newapplication import ProfileDir
MinRK
fix residual import issues with IPython.parallel reorganization
r3688
MinRK
rename clusterdir to more descriptive baseapp...
r3993 from IPython.parallel.apps.baseapp import (
MinRK
update parallel apps to use ProfileDir
r3992 BaseParallelApplication,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 base_flags
MinRK
Refactor newparallel to use Config system...
r3604 )
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.importstring import import_item
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
# from IPython.parallel.controller.controller import ControllerFactory
from IPython.parallel.streamsession import StreamSession
from IPython.parallel.controller.heartmonitor import HeartMonitor
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.parallel.controller.hub import HubFactory
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB
MinRK
update parallel apps to use ProfileDir
r3992 from IPython.parallel.util import signal_children, split_url
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # conditional import of MongoDB backend class
try:
from IPython.parallel.controller.mongodb import MongoDB
except ImportError:
maybe_mongo = []
else:
maybe_mongo = [MongoDB]
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 default_config_file_name = u'ipcontroller_config.py'
MinRK
Refactor newparallel to use Config system...
r3604
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
MinRK
parallel docs, tests, default config updated to newconfig
r3990 your ipython directory and named as "cluster_<profile>". See the `profile`
MinRK
update parallel apps to use ProfileDir
r3992 and `profile_dir` options for details.
MinRK
Refactor newparallel to use Config system...
r3604 """
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 flags = {}
flags.update(base_flags)
flags.update({
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'Use threads instead of processes for the schedulers'),
MinRK
remove uneccesary Config objects from flags.
r3994 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the SQLiteDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the MongoDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'use the in-memory DictDB backend'),
MinRK
remove uneccesary Config objects from flags.
r3994 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
MinRK
parallel docs, tests, default config updated to newconfig
r3990 'reuse existing json connection files')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 })
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
"Use HMAC digests for authentication of messages.",
"Don't authenticate messages."
))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
update parallel apps to use ProfileDir
r3992 class IPControllerApp(BaseParallelApplication):
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 name = u'ipcontroller'
MinRK
Refactor newparallel to use Config system...
r3604 description = _description
MinRK
use BaseIPythonApp.load_config, not Application.load_config
r3991 config_file_name = Unicode(default_config_file_name)
MinRK
update parallel apps to use ProfileDir
r3992 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
# change default to True
auto_create = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help="""Whether to create profile dir if it doesn't exist.""")
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = Bool(False, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help='Whether to reuse existing json connection files.'
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
secure = Bool(True, config=True,
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 help='Whether to use HMAC digests for extra message authentication.'
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
ssh_server = Unicode(u'', config=True,
help="""ssh url for clients to use when connecting to the Controller
processes. It should be of the form: [user@]server[:port]. The
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 Controller's listening addresses must be accessible from the ssh server""",
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
location = Unicode(u'', config=True,
help="""The external IP or domain name of the Controller, used for disambiguating
engine and client connections.""",
)
import_statements = List([], config=True,
help="import statements to be run at startup. Necessary in some environments"
)
MinRK
parallel docs, tests, default config updated to newconfig
r3990 use_threads = Bool(False, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='Use threads instead of processes for the schedulers',
)
# internal
children = List()
MinRK
cleanup parallel traits...
r3988 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
parallel docs, tests, default config updated to newconfig
r3990 def _use_threads_changed(self, name, old, new):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
aliases = Dict(dict(
log_level = 'IPControllerApp.log_level',
MinRK
re-enable log forwarding and iplogger
r3989 log_url = 'IPControllerApp.log_url',
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 reuse_files = 'IPControllerApp.reuse_files',
secure = 'IPControllerApp.secure',
ssh = 'IPControllerApp.ssh_server',
MinRK
parallel docs, tests, default config updated to newconfig
r3990 use_threads = 'IPControllerApp.use_threads',
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 import_statements = 'IPControllerApp.import_statements',
location = 'IPControllerApp.location',
ident = 'StreamSession.session',
user = 'StreamSession.username',
exec_key = 'StreamSession.keyfile',
url = 'HubFactory.url',
ip = 'HubFactory.ip',
transport = 'HubFactory.transport',
port = 'HubFactory.regport',
ping = 'HeartMonitor.period',
scheme = 'TaskScheduler.scheme_name',
hwm = 'TaskScheduler.hwm',
MinRK
update parallel apps to use ProfileDir
r3992 profile = "BaseIPythonApplication.profile",
profile_dir = 'ProfileDir.location',
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 ))
flags = Dict(flags)
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
persist connection data to disk as json
r3614 def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
persist connection data to disk as json
r3614 url = cdict['url']
location = cdict['location']
if not location:
try:
proto,ip,port = split_url(url)
except AssertionError:
pass
else:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
cdict['location'] = location
MinRK
update parallel apps to use ProfileDir
r3992 fname = os.path.join(self.profile_dir.security_dir, fname)
MinRK
persist connection data to disk as json
r3614 with open(fname, 'w') as f:
f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
MinRK
add default ip<x>z_config files
r3630
def load_config_from_json(self):
"""load config from existing json connector files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
add default ip<x>z_config files
r3630 # load from engine config
MinRK
update parallel apps to use ProfileDir
r3992 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 key = c.StreamSession.key = cfg['exec_key']
MinRK
add default ip<x>z_config files
r3630 xport,addr = cfg['url'].split('://')
c.HubFactory.engine_transport = xport
ip,ports = addr.split(':')
c.HubFactory.engine_ip = ip
c.HubFactory.regport = int(ports)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.location = cfg['location']
MinRK
persist connection data to disk as json
r3614
MinRK
add default ip<x>z_config files
r3630 # load client config
MinRK
update parallel apps to use ProfileDir
r3992 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
MinRK
add default ip<x>z_config files
r3630 cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
c.HubFactory.client_transport = xport
ip,ports = addr.split(':')
c.HubFactory.client_ip = ip
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ssh_server = cfg['ssh']
MinRK
add default ip<x>z_config files
r3630 assert int(ports) == c.HubFactory.regport, "regport mismatch"
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
c = self.config
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.do_import_statements()
reusing = self.reuse_files
MinRK
add default ip<x>z_config files
r3630 if reusing:
try:
self.load_config_from_json()
except (AssertionError,IOError):
reusing=False
# check again, because reusing may have failed:
if reusing:
pass
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 elif self.secure:
MinRK
add default ip<x>z_config files
r3630 key = str(uuid.uuid4())
MinRK
update parallel apps to use ProfileDir
r3992 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # with open(keyfile, 'w') as f:
# f.write(key)
# os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
c.StreamSession.key = key
MinRK
Refactor newparallel to use Config system...
r3604 else:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 key = c.StreamSession.key = ''
MinRK
Refactor newparallel to use Config system...
r3604
try:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.factory = HubFactory(config=c, log=self.log)
# self.start_logging()
self.factory.init_hub()
MinRK
Refactor newparallel to use Config system...
r3604 except:
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
MinRK
persist connection data to disk as json
r3614
MinRK
add default ip<x>z_config files
r3630 if not reusing:
# save to new json config files
f = self.factory
cdict = {'exec_key' : key,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'ssh' : self.ssh_server,
MinRK
add default ip<x>z_config files
r3630 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'location' : self.location
MinRK
add default ip<x>z_config files
r3630 }
self.save_connection_dict('ipcontroller-client.json', cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
self.save_connection_dict('ipcontroller-engine.json', edict)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
#
def init_schedulers(self):
children = self.children
MinRK
re-enable log forwarding and iplogger
r3989 mq = import_item(str(self.mq_class))
MinRK
persist connection data to disk as json
r3614
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 hub = self.factory
MinRK
parallel docs, tests, default config updated to newconfig
r3990 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(hub.client_info['iopub'])
q.bind_out(hub.engine_info['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(hub.client_info['mux'])
q.setsockopt_in(zmq.IDENTITY, 'mux')
q.bind_out(hub.engine_info['mux'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(hub.client_info['control'])
q.setsockopt_in(zmq.IDENTITY, 'control')
q.bind_out(hub.engine_info['control'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
scheme = TaskScheduler.scheme_name.get_default_value()
# Task Queue (in a Process)
if scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
# q.setsockopt_out(zmq.HWM, hub.hwm)
q.bind_in(hub.client_info['task'][1])
q.setsockopt_in(zmq.IDENTITY, 'task')
q.bind_out(hub.engine_info['task'])
q.connect_mon(hub.monitor_url)
q.daemon=True
children.append(q)
elif scheme == 'none':
self.log.warn("task::using no Task scheduler")
else:
self.log.info("task::using Python %s Task scheduler"%scheme)
sargs = (hub.client_info['task'][1], hub.engine_info['task'],
hub.monitor_url, hub.client_info['notification'])
MinRK
re-enable log forwarding and iplogger
r3989 kwargs = dict(logname='scheduler', loglevel=self.log_level,
log_url = self.log_url, config=dict(self.config))
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
def save_urls(self):
"""save the registration urls to files."""
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 c = self.config
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605
MinRK
update parallel apps to use ProfileDir
r3992 sec_dir = self.profile_dir.security_dir
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 cf = self.factory
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def do_import_statements(self):
statements = self.import_statements
MinRK
Refactor newparallel to use Config system...
r3604 for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
MinRK
re-enable log forwarding and iplogger
r3989 def forward_logging(self):
if self.log_url:
self.log.info("Forwarding logging to %s"%self.log_url)
context = zmq.Context.instance()
lsock = context.socket(zmq.PUB)
lsock.connect(self.log_url)
handler = PUBHandler(lsock)
self.log.removeHandler(self._log_handler)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
self._log_handler = handler
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 # #
MinRK
ipcluster implemented with new subcommands
r3986
def initialize(self, argv=None):
super(IPControllerApp, self).initialize(argv)
MinRK
re-enable log forwarding and iplogger
r3989 self.forward_logging()
MinRK
ipcluster implemented with new subcommands
r3986 self.init_hub()
self.init_schedulers()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def start(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # Start the subprocesses:
MinRK
Refactor newparallel to use Config system...
r3604 self.factory.start()
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 child_procs = []
for child in self.children:
child.start()
if isinstance(child, ProcessMonitoredQueue):
child_procs.append(child.launcher)
elif isinstance(child, Process):
child_procs.append(child)
if child_procs:
signal_children(child_procs)
MinRK
Refactor newparallel to use Config system...
r3604 self.write_pid_file(overwrite=True)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985
MinRK
Refactor newparallel to use Config system...
r3604 try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
MinRK
ipcluster implemented with new subcommands
r3986
MinRK
Refactor newparallel to use Config system...
r3604
def launch_new_instance():
"""Create and run the IPython controller"""
MinRK
use App.instance() in launch_new_instance (parallel apps)...
r3999 app = IPControllerApp.instance()
MinRK
ipcluster implemented with new subcommands
r3986 app.initialize()
MinRK
Refactor newparallel to use Config system...
r3604 app.start()
if __name__ == '__main__':
launch_new_instance()