#!/usr/bin/env python # encoding: utf-8 """ The IPython controller application. """ #----------------------------------------------------------------------------- # Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from __future__ import with_statement import copy import os import logging import socket import stat import sys import uuid import zmq from zmq.log.handlers import PUBHandler from zmq.utils import jsonapi as json from IPython.config.loader import Config from IPython.parallel import factory from .clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader ) from IPython.parallel.util import disambiguate_ip_address, split_url # from IPython.kernel.fcutil import FCServiceFactory, FURLError from IPython.utils.traitlets import Instance, Unicode from IPython.parallel.controller.controller import ControllerFactory #----------------------------------------------------------------------------- # Module level variables #----------------------------------------------------------------------------- #: The default config file name for this application default_config_file_name = u'ipcontroller_config.py' _description = """Start the IPython controller for parallel computing. The IPython controller provides a gateway between the IPython engines and clients. The controller needs to be started before the engines and can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ #----------------------------------------------------------------------------- # Default interfaces #----------------------------------------------------------------------------- # The default client interfaces for FCClientServiceFactory.interfaces default_client_interfaces = Config() default_client_interfaces.Default.url_file = 'ipcontroller-client.url' # Make this a dict we can pass to Config.__init__ for the default default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items())) # The default engine interfaces for FCEngineServiceFactory.interfaces default_engine_interfaces = Config() default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url' # Make this a dict we can pass to Config.__init__ for the default default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items())) #----------------------------------------------------------------------------- # Service factories #----------------------------------------------------------------------------- # # class FCClientServiceFactory(FCServiceFactory): # """A Foolscap implementation of the client services.""" # # cert_file = Unicode(u'ipcontroller-client.pem', config=True) # interfaces = Instance(klass=Config, kw=default_client_interfaces, # allow_none=False, config=True) # # # class FCEngineServiceFactory(FCServiceFactory): # """A Foolscap implementation of the engine services.""" # # cert_file = Unicode(u'ipcontroller-engine.pem', config=True) # interfaces = Instance(klass=dict, kw=default_engine_interfaces, # allow_none=False, config=True) # #----------------------------------------------------------------------------- # Command line options #----------------------------------------------------------------------------- class IPControllerAppConfigLoader(ClusterDirConfigLoader): def _add_arguments(self): super(IPControllerAppConfigLoader, self)._add_arguments() paa = self.parser.add_argument ## Hub Config: paa('--mongodb', dest='HubFactory.db_class', action='store_const', const='IPython.parallel.controller.mongodb.MongoDB', help='Use MongoDB for task storage [default: in-memory]') paa('--sqlite', dest='HubFactory.db_class', action='store_const', const='IPython.parallel.controller.sqlitedb.SQLiteDB', help='Use SQLite3 for DB task storage [default: in-memory]') paa('--hb', type=int, dest='HubFactory.hb', nargs=2, help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' 'connections [default: random]', metavar='Hub.hb_ports') paa('--ping', type=int, dest='HubFactory.ping', help='The frequency at which the Hub pings the engines for heartbeats ' ' (in ms) [default: 100]', metavar='Hub.ping') # Client config paa('--client-ip', type=str, dest='HubFactory.client_ip', help='The IP address or hostname the Hub will listen on for ' 'client connections. Both engine-ip and client-ip can be set simultaneously ' 'via --ip [default: loopback]', metavar='Hub.client_ip') paa('--client-transport', type=str, dest='HubFactory.client_transport', help='The ZeroMQ transport the Hub will use for ' 'client connections. Both engine-transport and client-transport can be set simultaneously ' 'via --transport [default: tcp]', metavar='Hub.client_transport') paa('--query', type=int, dest='HubFactory.query_port', help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]', metavar='Hub.query_port') paa('--notifier', type=int, dest='HubFactory.notifier_port', help='The port on which the Hub PUB socket will listen for notification connections [default: random]', metavar='Hub.notifier_port') # Engine config paa('--engine-ip', type=str, dest='HubFactory.engine_ip', help='The IP address or hostname the Hub will listen on for ' 'engine connections. This applies to the Hub and its schedulers' 'engine-ip and client-ip can be set simultaneously ' 'via --ip [default: loopback]', metavar='Hub.engine_ip') paa('--engine-transport', type=str, dest='HubFactory.engine_transport', help='The ZeroMQ transport the Hub will use for ' 'client connections. Both engine-transport and client-transport can be set simultaneously ' 'via --transport [default: tcp]', metavar='Hub.engine_transport') # Scheduler config paa('--mux', type=int, dest='ControllerFactory.mux', nargs=2, help='The (2) ports the MUX scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.mux_ports') paa('--task', type=int, dest='ControllerFactory.task', nargs=2, help='The (2) ports the Task scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.task_ports') paa('--control', type=int, dest='ControllerFactory.control', nargs=2, help='The (2) ports the Control scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.control_ports') paa('--iopub', type=int, dest='ControllerFactory.iopub', nargs=2, help='The (2) ports the IOPub scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.iopub_ports') paa('--scheme', type=str, dest='HubFactory.scheme', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], help='select the task scheduler scheme [default: Python LRU]', metavar='Scheduler.scheme') paa('--usethreads', dest='ControllerFactory.usethreads', action="store_true", help='Use threads instead of processes for the schedulers', ) paa('--hwm', dest='ControllerFactory.hwm', type=int, help='specify the High Water Mark (HWM) for the downstream ' 'socket in the pure ZMQ scheduler. This is the maximum number ' 'of allowed outstanding tasks on each engine.', ) ## Global config paa('--log-to-file', action='store_true', dest='Global.log_to_file', help='Log to a file in the log directory (default is stdout)') paa('--log-url', type=str, dest='Global.log_url', help='Broadcast logs to an iploggerz process [default: disabled]') paa('-r','--reuse-files', action='store_true', dest='Global.reuse_files', help='Try to reuse existing json connection files.') paa('--no-secure', action='store_false', dest='Global.secure', help='Turn off execution keys (default).') paa('--secure', action='store_true', dest='Global.secure', help='Turn on execution keys.') paa('--execkey', type=str, dest='Global.exec_key', help='path to a file containing an execution key.', metavar='keyfile') paa('--ssh', type=str, dest='Global.sshserver', help='ssh url for clients to use when connecting to the Controller ' 'processes. It should be of the form: [user@]server[:port]. The ' 'Controller\'s listening addresses must be accessible from the ssh server', metavar='Global.sshserver') paa('--location', type=str, dest='Global.location', help="The external IP or domain name of this machine, used for disambiguating " "engine and client connections.", metavar='Global.location') factory.add_session_arguments(self.parser) factory.add_registration_arguments(self.parser) #----------------------------------------------------------------------------- # The main application #----------------------------------------------------------------------------- class IPControllerApp(ApplicationWithClusterDir): name = u'ipcontroller' description = _description command_line_loader = IPControllerAppConfigLoader default_config_file_name = default_config_file_name auto_create_cluster_dir = True def create_default_config(self): super(IPControllerApp, self).create_default_config() # Don't set defaults for Global.secure or Global.reuse_furls # as those are set in a component. self.default_config.Global.import_statements = [] self.default_config.Global.clean_logs = True self.default_config.Global.secure = True self.default_config.Global.reuse_files = False self.default_config.Global.exec_key = "exec_key.key" self.default_config.Global.sshserver = None self.default_config.Global.location = None def pre_construct(self): super(IPControllerApp, self).pre_construct() c = self.master_config # The defaults for these are set in FCClientServiceFactory and # FCEngineServiceFactory, so we only set them here if the global # options have be set to override the class level defaults. # if hasattr(c.Global, 'reuse_furls'): # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls # del c.Global.reuse_furls # if hasattr(c.Global, 'secure'): # c.FCClientServiceFactory.secure = c.Global.secure # c.FCEngineServiceFactory.secure = c.Global.secure # del c.Global.secure def save_connection_dict(self, fname, cdict): """save a connection dict to json file.""" c = self.master_config url = cdict['url'] location = cdict['location'] if not location: try: proto,ip,port = split_url(url) except AssertionError: pass else: location = socket.gethostbyname_ex(socket.gethostname())[2][-1] cdict['location'] = location fname = os.path.join(c.Global.security_dir, fname) with open(fname, 'w') as f: f.write(json.dumps(cdict, indent=2)) os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) def load_config_from_json(self): """load config from existing json connector files.""" c = self.master_config # load from engine config with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f: cfg = json.loads(f.read()) key = c.SessionFactory.exec_key = cfg['exec_key'] xport,addr = cfg['url'].split('://') c.HubFactory.engine_transport = xport ip,ports = addr.split(':') c.HubFactory.engine_ip = ip c.HubFactory.regport = int(ports) c.Global.location = cfg['location'] # load client config with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f: cfg = json.loads(f.read()) assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" xport,addr = cfg['url'].split('://') c.HubFactory.client_transport = xport ip,ports = addr.split(':') c.HubFactory.client_ip = ip c.Global.sshserver = cfg['ssh'] assert int(ports) == c.HubFactory.regport, "regport mismatch" def construct(self): # This is the working dir by now. sys.path.insert(0, '') c = self.master_config self.import_statements() reusing = c.Global.reuse_files if reusing: try: self.load_config_from_json() except (AssertionError,IOError): reusing=False # check again, because reusing may have failed: if reusing: pass elif c.Global.secure: keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) key = str(uuid.uuid4()) with open(keyfile, 'w') as f: f.write(key) os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) c.SessionFactory.exec_key = key else: c.SessionFactory.exec_key = '' key = None try: self.factory = ControllerFactory(config=c, logname=self.log.name) self.start_logging() self.factory.construct() except: self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) if not reusing: # save to new json config files f = self.factory cdict = {'exec_key' : key, 'ssh' : c.Global.sshserver, 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), 'location' : c.Global.location } self.save_connection_dict('ipcontroller-client.json', cdict) edict = cdict edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) self.save_connection_dict('ipcontroller-engine.json', edict) def save_urls(self): """save the registration urls to files.""" c = self.master_config sec_dir = c.Global.security_dir cf = self.factory with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) def import_statements(self): statements = self.master_config.Global.import_statements for s in statements: try: self.log.msg("Executing statement: '%s'" % s) exec s in globals(), locals() except: self.log.msg("Error running statement: %s" % s) def start_logging(self): super(IPControllerApp, self).start_logging() if self.master_config.Global.log_url: context = self.factory.context lsock = context.socket(zmq.PUB) lsock.connect(self.master_config.Global.log_url) handler = PUBHandler(lsock) handler.root_topic = 'controller' handler.setLevel(self.log_level) self.log.addHandler(handler) # def start_app(self): # Start the subprocesses: self.factory.start() self.write_pid_file(overwrite=True) try: self.factory.loop.start() except KeyboardInterrupt: self.log.critical("Interrupted, Exiting...\n") def launch_new_instance(): """Create and run the IPython controller""" app = IPControllerApp() app.start() if __name__ == '__main__': launch_new_instance()