##// END OF EJS Templates
add retries flag to LoadBalancedView...
add retries flag to LoadBalancedView also add some lbv tests, and related fixes closes gh-412

File last commit:

r3785:f216d709
r3873:6549d091
Show More
ipcontrollerapp.py
433 lines | 17.3 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
"""
The IPython controller application.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import copy
import os
import logging
import socket
import stat
import sys
import uuid
import zmq
from zmq.log.handlers import PUBHandler
from zmq.utils import jsonapi as json
from IPython.config.loader import Config
from IPython.parallel import factory
from IPython.parallel.apps.clusterdir import (
ApplicationWithClusterDir,
ClusterDirConfigLoader
)
from IPython.parallel.util import disambiguate_ip_address, split_url
# from IPython.kernel.fcutil import FCServiceFactory, FURLError
from IPython.utils.traitlets import Instance, Unicode
from IPython.parallel.controller.controller import ControllerFactory
#-----------------------------------------------------------------------------
# Module level variables
#-----------------------------------------------------------------------------
#: The default config file name for this application
default_config_file_name = u'ipcontroller_config.py'
_description = """Start the IPython controller for parallel computing.
The IPython controller provides a gateway between the IPython engines and
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
your ipython directory and named as "cluster_<profile>". See the --profile
and --cluster-dir options for details.
"""
#-----------------------------------------------------------------------------
# Default interfaces
#-----------------------------------------------------------------------------
# The default client interfaces for FCClientServiceFactory.interfaces
default_client_interfaces = Config()
default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
# Make this a dict we can pass to Config.__init__ for the default
default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
# The default engine interfaces for FCEngineServiceFactory.interfaces
default_engine_interfaces = Config()
default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
# Make this a dict we can pass to Config.__init__ for the default
default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
#-----------------------------------------------------------------------------
# Service factories
#-----------------------------------------------------------------------------
#
# class FCClientServiceFactory(FCServiceFactory):
# """A Foolscap implementation of the client services."""
#
# cert_file = Unicode(u'ipcontroller-client.pem', config=True)
# interfaces = Instance(klass=Config, kw=default_client_interfaces,
# allow_none=False, config=True)
#
#
# class FCEngineServiceFactory(FCServiceFactory):
# """A Foolscap implementation of the engine services."""
#
# cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
# interfaces = Instance(klass=dict, kw=default_engine_interfaces,
# allow_none=False, config=True)
#
#-----------------------------------------------------------------------------
# Command line options
#-----------------------------------------------------------------------------
class IPControllerAppConfigLoader(ClusterDirConfigLoader):
def _add_arguments(self):
super(IPControllerAppConfigLoader, self)._add_arguments()
paa = self.parser.add_argument
## Hub Config:
paa('--mongodb',
dest='HubFactory.db_class', action='store_const',
const='IPython.parallel.controller.mongodb.MongoDB',
help='Use MongoDB for task storage [default: in-memory]')
paa('--sqlite',
dest='HubFactory.db_class', action='store_const',
const='IPython.parallel.controller.sqlitedb.SQLiteDB',
help='Use SQLite3 for DB task storage [default: in-memory]')
paa('--hb',
type=int, dest='HubFactory.hb', nargs=2,
help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
'connections [default: random]',
metavar='Hub.hb_ports')
paa('--ping',
type=int, dest='HubFactory.ping',
help='The frequency at which the Hub pings the engines for heartbeats '
' (in ms) [default: 100]',
metavar='Hub.ping')
# Client config
paa('--client-ip',
type=str, dest='HubFactory.client_ip',
help='The IP address or hostname the Hub will listen on for '
'client connections. Both engine-ip and client-ip can be set simultaneously '
'via --ip [default: loopback]',
metavar='Hub.client_ip')
paa('--client-transport',
type=str, dest='HubFactory.client_transport',
help='The ZeroMQ transport the Hub will use for '
'client connections. Both engine-transport and client-transport can be set simultaneously '
'via --transport [default: tcp]',
metavar='Hub.client_transport')
paa('--query',
type=int, dest='HubFactory.query_port',
help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
metavar='Hub.query_port')
paa('--notifier',
type=int, dest='HubFactory.notifier_port',
help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
metavar='Hub.notifier_port')
# Engine config
paa('--engine-ip',
type=str, dest='HubFactory.engine_ip',
help='The IP address or hostname the Hub will listen on for '
'engine connections. This applies to the Hub and its schedulers'
'engine-ip and client-ip can be set simultaneously '
'via --ip [default: loopback]',
metavar='Hub.engine_ip')
paa('--engine-transport',
type=str, dest='HubFactory.engine_transport',
help='The ZeroMQ transport the Hub will use for '
'client connections. Both engine-transport and client-transport can be set simultaneously '
'via --transport [default: tcp]',
metavar='Hub.engine_transport')
# Scheduler config
paa('--mux',
type=int, dest='ControllerFactory.mux', nargs=2,
help='The (2) ports the MUX scheduler will listen on for client,engine '
'connections, respectively [default: random]',
metavar='Scheduler.mux_ports')
paa('--task',
type=int, dest='ControllerFactory.task', nargs=2,
help='The (2) ports the Task scheduler will listen on for client,engine '
'connections, respectively [default: random]',
metavar='Scheduler.task_ports')
paa('--control',
type=int, dest='ControllerFactory.control', nargs=2,
help='The (2) ports the Control scheduler will listen on for client,engine '
'connections, respectively [default: random]',
metavar='Scheduler.control_ports')
paa('--iopub',
type=int, dest='ControllerFactory.iopub', nargs=2,
help='The (2) ports the IOPub scheduler will listen on for client,engine '
'connections, respectively [default: random]',
metavar='Scheduler.iopub_ports')
paa('--scheme',
type=str, dest='HubFactory.scheme',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
help='select the task scheduler scheme [default: Python LRU]',
metavar='Scheduler.scheme')
paa('--usethreads',
dest='ControllerFactory.usethreads', action="store_true",
help='Use threads instead of processes for the schedulers',
)
paa('--hwm',
dest='TaskScheduler.hwm', type=int,
help='specify the High Water Mark (HWM) '
'in the Python scheduler. This is the maximum number '
'of allowed outstanding tasks on each engine.',
)
## Global config
paa('--log-to-file',
action='store_true', dest='Global.log_to_file',
help='Log to a file in the log directory (default is stdout)')
paa('--log-url',
type=str, dest='Global.log_url',
help='Broadcast logs to an iploggerz process [default: disabled]')
paa('-r','--reuse-files',
action='store_true', dest='Global.reuse_files',
help='Try to reuse existing json connection files.')
paa('--no-secure',
action='store_false', dest='Global.secure',
help='Turn off execution keys (default).')
paa('--secure',
action='store_true', dest='Global.secure',
help='Turn on execution keys.')
paa('--execkey',
type=str, dest='Global.exec_key',
help='path to a file containing an execution key.',
metavar='keyfile')
paa('--ssh',
type=str, dest='Global.sshserver',
help='ssh url for clients to use when connecting to the Controller '
'processes. It should be of the form: [user@]server[:port]. The '
'Controller\'s listening addresses must be accessible from the ssh server',
metavar='Global.sshserver')
paa('--location',
type=str, dest='Global.location',
help="The external IP or domain name of this machine, used for disambiguating "
"engine and client connections.",
metavar='Global.location')
factory.add_session_arguments(self.parser)
factory.add_registration_arguments(self.parser)
#-----------------------------------------------------------------------------
# The main application
#-----------------------------------------------------------------------------
class IPControllerApp(ApplicationWithClusterDir):
name = u'ipcontroller'
description = _description
command_line_loader = IPControllerAppConfigLoader
default_config_file_name = default_config_file_name
auto_create_cluster_dir = True
def create_default_config(self):
super(IPControllerApp, self).create_default_config()
# Don't set defaults for Global.secure or Global.reuse_furls
# as those are set in a component.
self.default_config.Global.import_statements = []
self.default_config.Global.clean_logs = True
self.default_config.Global.secure = True
self.default_config.Global.reuse_files = False
self.default_config.Global.exec_key = "exec_key.key"
self.default_config.Global.sshserver = None
self.default_config.Global.location = None
def pre_construct(self):
super(IPControllerApp, self).pre_construct()
c = self.master_config
# The defaults for these are set in FCClientServiceFactory and
# FCEngineServiceFactory, so we only set them here if the global
# options have be set to override the class level defaults.
# if hasattr(c.Global, 'reuse_furls'):
# c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
# c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
# del c.Global.reuse_furls
# if hasattr(c.Global, 'secure'):
# c.FCClientServiceFactory.secure = c.Global.secure
# c.FCEngineServiceFactory.secure = c.Global.secure
# del c.Global.secure
def save_connection_dict(self, fname, cdict):
"""save a connection dict to json file."""
c = self.master_config
url = cdict['url']
location = cdict['location']
if not location:
try:
proto,ip,port = split_url(url)
except AssertionError:
pass
else:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
cdict['location'] = location
fname = os.path.join(c.Global.security_dir, fname)
with open(fname, 'w') as f:
f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
def load_config_from_json(self):
"""load config from existing json connector files."""
c = self.master_config
# load from engine config
with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
cfg = json.loads(f.read())
key = c.SessionFactory.exec_key = cfg['exec_key']
xport,addr = cfg['url'].split('://')
c.HubFactory.engine_transport = xport
ip,ports = addr.split(':')
c.HubFactory.engine_ip = ip
c.HubFactory.regport = int(ports)
c.Global.location = cfg['location']
# load client config
with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
c.HubFactory.client_transport = xport
ip,ports = addr.split(':')
c.HubFactory.client_ip = ip
c.Global.sshserver = cfg['ssh']
assert int(ports) == c.HubFactory.regport, "regport mismatch"
def construct(self):
# This is the working dir by now.
sys.path.insert(0, '')
c = self.master_config
self.import_statements()
reusing = c.Global.reuse_files
if reusing:
try:
self.load_config_from_json()
except (AssertionError,IOError):
reusing=False
# check again, because reusing may have failed:
if reusing:
pass
elif c.Global.secure:
keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
key = str(uuid.uuid4())
with open(keyfile, 'w') as f:
f.write(key)
os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
c.SessionFactory.exec_key = key
else:
c.SessionFactory.exec_key = ''
key = None
try:
self.factory = ControllerFactory(config=c, logname=self.log.name)
self.start_logging()
self.factory.construct()
except:
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
if not reusing:
# save to new json config files
f = self.factory
cdict = {'exec_key' : key,
'ssh' : c.Global.sshserver,
'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
'location' : c.Global.location
}
self.save_connection_dict('ipcontroller-client.json', cdict)
edict = cdict
edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
self.save_connection_dict('ipcontroller-engine.json', edict)
def save_urls(self):
"""save the registration urls to files."""
c = self.master_config
sec_dir = c.Global.security_dir
cf = self.factory
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
def import_statements(self):
statements = self.master_config.Global.import_statements
for s in statements:
try:
self.log.msg("Executing statement: '%s'" % s)
exec s in globals(), locals()
except:
self.log.msg("Error running statement: %s" % s)
def start_logging(self):
super(IPControllerApp, self).start_logging()
if self.master_config.Global.log_url:
context = self.factory.context
lsock = context.socket(zmq.PUB)
lsock.connect(self.master_config.Global.log_url)
handler = PUBHandler(lsock)
handler.root_topic = 'controller'
handler.setLevel(self.log_level)
self.log.addHandler(handler)
#
def start_app(self):
# Start the subprocesses:
self.factory.start()
self.write_pid_file(overwrite=True)
try:
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPControllerApp()
app.start()
if __name__ == '__main__':
launch_new_instance()