#!/usr/bin/env python # encoding: utf-8 """ The IPython controller application. """ #----------------------------------------------------------------------------- # Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from __future__ import with_statement import copy import sys import os import logging # from twisted.application import service # from twisted.internet import reactor # from twisted.python import log import zmq from zmq.log.handlers import PUBHandler from IPython.config.loader import Config from IPython.zmq.parallel import factory from IPython.zmq.parallel.controller import ControllerFactory from IPython.zmq.parallel.clusterdir import ( ApplicationWithClusterDir, ClusterDirConfigLoader ) # from IPython.kernel.fcutil import FCServiceFactory, FURLError from IPython.utils.traitlets import Instance, Unicode from entry_point import generate_exec_key #----------------------------------------------------------------------------- # Module level variables #----------------------------------------------------------------------------- #: The default config file name for this application default_config_file_name = u'ipcontroller_config.py' _description = """Start the IPython controller for parallel computing. The IPython controller provides a gateway between the IPython engines and clients. The controller needs to be started before the engines and can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in your .ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ #----------------------------------------------------------------------------- # Default interfaces #----------------------------------------------------------------------------- # The default client interfaces for FCClientServiceFactory.interfaces default_client_interfaces = Config() default_client_interfaces.Default.url_file = 'ipcontroller-client.url' # Make this a dict we can pass to Config.__init__ for the default default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items())) # The default engine interfaces for FCEngineServiceFactory.interfaces default_engine_interfaces = Config() default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url' # Make this a dict we can pass to Config.__init__ for the default default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items())) #----------------------------------------------------------------------------- # Service factories #----------------------------------------------------------------------------- # # class FCClientServiceFactory(FCServiceFactory): # """A Foolscap implementation of the client services.""" # # cert_file = Unicode(u'ipcontroller-client.pem', config=True) # interfaces = Instance(klass=Config, kw=default_client_interfaces, # allow_none=False, config=True) # # # class FCEngineServiceFactory(FCServiceFactory): # """A Foolscap implementation of the engine services.""" # # cert_file = Unicode(u'ipcontroller-engine.pem', config=True) # interfaces = Instance(klass=dict, kw=default_engine_interfaces, # allow_none=False, config=True) # #----------------------------------------------------------------------------- # Command line options #----------------------------------------------------------------------------- class IPControllerAppConfigLoader(ClusterDirConfigLoader): def _add_arguments(self): super(IPControllerAppConfigLoader, self)._add_arguments() paa = self.parser.add_argument ## Hub Config: paa('--mongodb', dest='HubFactory.db_class', action='store_const', const='IPython.zmq.parallel.mongodb.MongoDB', help='Use MongoDB task storage [default: in-memory]') paa('--hb', type=int, dest='HubFactory.hb', nargs=2, help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' 'connections [default: random]', metavar='Hub.hb_ports') paa('--ping', type=int, dest='HubFactory.ping', help='The frequency at which the Hub pings the engines for heartbeats ' ' (in ms) [default: 100]', metavar='Hub.ping') # Client config paa('--client-ip', type=str, dest='HubFactory.client_ip', help='The IP address or hostname the Hub will listen on for ' 'client connections. Both engine-ip and client-ip can be set simultaneously ' 'via --ip [default: loopback]', metavar='Hub.client_ip') paa('--client-transport', type=str, dest='HubFactory.client_transport', help='The ZeroMQ transport the Hub will use for ' 'client connections. Both engine-transport and client-transport can be set simultaneously ' 'via --transport [default: tcp]', metavar='Hub.client_transport') paa('--query', type=int, dest='HubFactory.query_port', help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]', metavar='Hub.query_port') paa('--notifier', type=int, dest='HubFactory.notifier_port', help='The port on which the Hub PUB socket will listen for notification connections [default: random]', metavar='Hub.notifier_port') # Engine config paa('--engine-ip', type=str, dest='HubFactory.engine_ip', help='The IP address or hostname the Hub will listen on for ' 'engine connections. This applies to the Hub and its schedulers' 'engine-ip and client-ip can be set simultaneously ' 'via --ip [default: loopback]', metavar='Hub.engine_ip') paa('--engine-transport', type=str, dest='HubFactory.engine_transport', help='The ZeroMQ transport the Hub will use for ' 'client connections. Both engine-transport and client-transport can be set simultaneously ' 'via --transport [default: tcp]', metavar='Hub.engine_transport') # Scheduler config paa('--mux', type=int, dest='ControllerFactory.mux', nargs=2, help='The (2) ports the MUX scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.mux_ports') paa('--task', type=int, dest='ControllerFactory.task', nargs=2, help='The (2) ports the Task scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.task_ports') paa('--control', type=int, dest='ControllerFactory.control', nargs=2, help='The (2) ports the Control scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.control_ports') paa('--iopub', type=int, dest='ControllerFactory.iopub', nargs=2, help='The (2) ports the IOPub scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.iopub_ports') paa('--scheme', type=str, dest='ControllerFactory.scheme', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], help='select the task scheduler scheme [default: Python LRU]', metavar='Scheduler.scheme') paa('--usethreads', dest='ControllerFactory.usethreads', action="store_true", help='Use threads instead of processes for the schedulers', ) ## Global config paa('--log-to-file', action='store_true', dest='Global.log_to_file', help='Log to a file in the log directory (default is stdout)') paa('--log-url', type=str, dest='Global.log_url', help='Broadcast logs to an iploggerz process [default: disabled]') paa('-r','--reuse-key', action='store_true', dest='Global.reuse_key', help='Try to reuse existing execution keys.') paa('--no-secure', action='store_false', dest='Global.secure', help='Turn off execution keys (default).') paa('--secure', action='store_true', dest='Global.secure', help='Turn on execution keys.') paa('--execkey', type=str, dest='Global.exec_key', help='path to a file containing an execution key.', metavar='keyfile') factory.add_session_arguments(self.parser) factory.add_registration_arguments(self.parser) #----------------------------------------------------------------------------- # The main application #----------------------------------------------------------------------------- class IPControllerApp(ApplicationWithClusterDir): name = u'ipcontrollerz' description = _description command_line_loader = IPControllerAppConfigLoader default_config_file_name = default_config_file_name auto_create_cluster_dir = True def create_default_config(self): super(IPControllerApp, self).create_default_config() # Don't set defaults for Global.secure or Global.reuse_furls # as those are set in a component. self.default_config.Global.import_statements = [] self.default_config.Global.clean_logs = True self.default_config.Global.secure = False self.default_config.Global.reuse_key = False self.default_config.Global.exec_key = "exec_key.key" def pre_construct(self): super(IPControllerApp, self).pre_construct() c = self.master_config # The defaults for these are set in FCClientServiceFactory and # FCEngineServiceFactory, so we only set them here if the global # options have be set to override the class level defaults. # if hasattr(c.Global, 'reuse_furls'): # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls # del c.Global.reuse_furls # if hasattr(c.Global, 'secure'): # c.FCClientServiceFactory.secure = c.Global.secure # c.FCEngineServiceFactory.secure = c.Global.secure # del c.Global.secure def construct(self): # This is the working dir by now. sys.path.insert(0, '') c = self.master_config self.import_statements() if c.Global.secure: keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) if not c.Global.reuse_key or not os.path.exists(keyfile): generate_exec_key(keyfile) c.SessionFactory.exec_key = keyfile else: keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) if os.path.exists(keyfile): os.remove(keyfile) c.SessionFactory.exec_key = '' try: self.factory = ControllerFactory(config=c) self.start_logging() self.factory.construct() except: self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) def save_urls(self): """save the registration urls to files.""" c = self.master_config sec_dir = c.Global.security_dir cf = self.factory with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport)) with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f: f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) def import_statements(self): statements = self.master_config.Global.import_statements for s in statements: try: self.log.msg("Executing statement: '%s'" % s) exec s in globals(), locals() except: self.log.msg("Error running statement: %s" % s) def start_logging(self): super(IPControllerApp, self).start_logging() if self.master_config.Global.log_url: context = self.factory.context lsock = context.socket(zmq.PUB) lsock.connect(self.master_config.Global.log_url) handler = PUBHandler(lsock) handler.root_topic = 'controller' handler.setLevel(self.log_level) self.log.addHandler(handler) # def start_app(self): # Start the subprocesses: self.factory.start() self.write_pid_file(overwrite=True) try: self.factory.loop.start() except KeyboardInterrupt: self.log.critical("Interrupted, Exiting...\n") def launch_new_instance(): """Create and run the IPython controller""" app = IPControllerApp() app.start() if __name__ == '__main__': launch_new_instance()