From d34f19346d534aa7544ac0db6cd3cfa2b428ab9a 2011-04-08 00:38:20 From: MinRK Date: 2011-04-08 00:38:20 Subject: [PATCH] add default ipz_config files --- diff --git a/IPython/config/default/ipclusterz_config.py b/IPython/config/default/ipclusterz_config.py index 36aa453..ff4a6ee 100644 --- a/IPython/config/default/ipclusterz_config.py +++ b/IPython/config/default/ipclusterz_config.py @@ -12,7 +12,7 @@ c = get_config() # - Start using mpiexec. # - Start using the Windows HPC Server 2008 scheduler # - Start using PBS -# - Start using SSH (currently broken) +# - Start using SSH # The selected launchers can be configured below. @@ -22,14 +22,14 @@ c = get_config() # - MPIExecControllerLauncher # - PBSControllerLauncher # - WindowsHPCControllerLauncher -# c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher' +# c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher' # Options are: # - LocalEngineSetLauncher # - MPIExecEngineSetLauncher # - PBSEngineSetLauncher # - WindowsHPCEngineSetLauncher -# c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher' +# c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher' #----------------------------------------------------------------------------- # Global configuration diff --git a/IPython/config/default/ipcontrollerz_config.py b/IPython/config/default/ipcontrollerz_config.py index c1d0bce..58a10a8 100644 --- a/IPython/config/default/ipcontrollerz_config.py +++ b/IPython/config/default/ipcontrollerz_config.py @@ -25,112 +25,112 @@ c = get_config() # be imported in the controller for pickling to work. # c.Global.import_statements = ['import math'] -# Reuse the controller's FURL files. If False, FURL files are regenerated +# Reuse the controller's JSON files. If False, JSON files are regenerated # each time the controller is run. If True, they will be reused, *but*, you # also must set the network ports by hand. If set, this will override the # values set for the client and engine connections below. -# c.Global.reuse_furls = True +# c.Global.reuse_files = True -# Enable SSL encryption on all connections to the controller. If set, this -# will override the values set for the client and engine connections below. +# Enable exec_key authentication on all messages. Default is True # c.Global.secure = True # The working directory for the process. The application will use os.chdir # to change to this directory before starting. # c.Global.work_dir = os.getcwd() -#----------------------------------------------------------------------------- -# Configure the client services -#----------------------------------------------------------------------------- +# The log url for logging to an `iploggerz` application. This will override +# log-to-file. +# c.Global.log_url = 'tcp://127.0.0.1:20202' -# Basic client service config attributes +# The specific external IP that is used to disambiguate multi-interface URLs. +# The default behavior is to guess from external IPs gleaned from `socket`. +# c.Global.location = '192.168.1.123' -# The network interface the controller will listen on for client connections. -# This should be an IP address or hostname of the controller's host. The empty -# string means listen on all interfaces. -# c.FCClientServiceFactory.ip = '' +# The ssh server remote clients should use to connect to this controller. +# It must be a machine that can see the interface specified in client_ip. +# The default for client_ip is localhost, in which case the sshserver must +# be an external IP of the controller machine. +# c.Global.sshserver = 'controller.example.com' + +# the url to use for registration. If set, this overrides engine-ip, +# engine-transport client-ip,client-transport, and regport. +# c.RegistrationFactory.url = 'tcp://*:12345' -# The TCP/IP port the controller will listen on for client connections. If 0 -# a random port will be used. If the controller's host has a firewall running -# it must allow incoming traffic on this port. -# c.FCClientServiceFactory.port = 0 +# the port to use for registration. Clients and Engines both use this +# port for registration. +# c.RegistrationFactory.regport = 10101 -# The client learns how to connect to the controller by looking at the -# location field embedded in the FURL. If this field is empty, all network -# interfaces that the controller is listening on will be listed. To have the -# client connect on a particular interface, list it here. -# c.FCClientServiceFactory.location = '' +#----------------------------------------------------------------------------- +# Configure the Task Scheduler +#----------------------------------------------------------------------------- -# Use SSL encryption for the client connection. -# c.FCClientServiceFactory.secure = True +# The routing scheme. 'pure' will use the pure-ZMQ scheduler. Any other +# value will use a Python scheduler with various routing schemes. +# python schemes are: lru, weighted, random, twobin. Default is 'weighted'. +# Note that the pure ZMQ scheduler does not support many features, such as +# dying engines, dependencies, or engine-subset load-balancing. +# c.ControllerFactory.scheme = 'pure' -# Reuse the client FURL each time the controller is started. If set, you must -# also pick a specific network port above (FCClientServiceFactory.port). -# c.FCClientServiceFactory.reuse_furls = False +# The pure ZMQ scheduler can limit the number of outstanding tasks per engine +# by using the ZMQ HWM option. This allows engines with long-running tasks +# to not steal too many tasks from other engines. The default is 0, which +# means agressively distribute messages, never waiting for them to finish. +# c.ControllerFactory.hwm = 1 + +# Whether to use Threads or Processes to start the Schedulers. Threads will +# use less resources, but potentially reduce throughput. Default is to +# use processes. Note that the a Python scheduler will always be in a Process. +# c.ControllerFactory.usethreads #----------------------------------------------------------------------------- -# Configure the engine services +# Configure the Hub #----------------------------------------------------------------------------- -# Basic config attributes for the engine services. +# Which class to use for the db backend. Currently supported are DictDB (the +# default), and MongoDB. Uncomment this line to enable MongoDB, which will +# slow-down the Hub's responsiveness, but also reduce its memory footprint. +# c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB' -# The network interface the controller will listen on for engine connections. -# This should be an IP address or hostname of the controller's host. The empty -# string means listen on all interfaces. -# c.FCEngineServiceFactory.ip = '' +# The heartbeat ping frequency. This is the frequency (in ms) at which the +# Hub pings engines for heartbeats. This determines how quickly the Hub +# will react to engines coming and going. A lower number means faster response +# time, but more network activity. The default is 100ms +# c.HubFactory.ping = 100 -# The TCP/IP port the controller will listen on for engine connections. If 0 -# a random port will be used. If the controller's host has a firewall running -# it must allow incoming traffic on this port. -# c.FCEngineServiceFactory.port = 0 +# HubFactory queue port pairs, to set by name: mux, iopub, control, task. Set +# each as a tuple of length 2 of ints. The default is to find random +# available ports +# c.HubFactory.mux = (10102,10112) -# The engine learns how to connect to the controller by looking at the -# location field embedded in the FURL. If this field is empty, all network -# interfaces that the controller is listening on will be listed. To have the -# client connect on a particular interface, list it here. -# c.FCEngineServiceFactory.location = '' +#----------------------------------------------------------------------------- +# Configure the client connections +#----------------------------------------------------------------------------- -# Use SSL encryption for the engine connection. -# c.FCEngineServiceFactory.secure = True +# Basic client connection config attributes + +# The network interface the controller will listen on for client connections. +# This should be an IP address or interface on the controller. An asterisk +# means listen on all interfaces. The transport can be any transport +# supported by zeromq (tcp,epgm,pgm,ib,ipc): +# c.HubFactory.client_ip = '*' +# c.HubFactory.client_transport = 'tcp' -# Reuse the client FURL each time the controller is started. If set, you must -# also pick a specific network port above (FCClientServiceFactory.port). -# c.FCEngineServiceFactory.reuse_furls = False +# individual client ports to configure by name: query_port, notifier_port +# c.HubFactory.query_port = 12345 #----------------------------------------------------------------------------- -# Developer level configuration attributes +# Configure the engine connections #----------------------------------------------------------------------------- -# You shouldn't have to modify anything in this section. These attributes -# are more for developers who want to change the behavior of the controller -# at a fundamental level. - -# c.FCClientServiceFactory.cert_file = u'ipcontroller-client.pem' - -# default_client_interfaces = Config() -# default_client_interfaces.Task.interface_chain = [ -# 'IPython.kernel.task.ITaskController', -# 'IPython.kernel.taskfc.IFCTaskController' -# ] -# -# default_client_interfaces.Task.furl_file = u'ipcontroller-tc.furl' -# -# default_client_interfaces.MultiEngine.interface_chain = [ -# 'IPython.kernel.multiengine.IMultiEngine', -# 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine' -# ] -# -# default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl' -# -# c.FCEngineServiceFactory.interfaces = default_client_interfaces - -# c.FCEngineServiceFactory.cert_file = u'ipcontroller-engine.pem' - -# default_engine_interfaces = Config() -# default_engine_interfaces.Default.interface_chain = [ -# 'IPython.kernel.enginefc.IFCControllerBase' -# ] -# -# default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl' -# -# c.FCEngineServiceFactory.interfaces = default_engine_interfaces +# Basic config attributes for the engine connections. + +# The network interface the controller will listen on for engine connections. +# This should be an IP address or interface on the controller. An asterisk +# means listen on all interfaces. The transport can be any transport +# supported by zeromq (tcp,epgm,pgm,ib,ipc): +# c.HubFactory.engine_ip = '*' +# c.HubFactory.engine_transport = 'tcp' + +# set the engine heartbeat ports to use: +# c.HubFactory.hb = (10303,10313) + diff --git a/IPython/config/default/ipenginez_config.py b/IPython/config/default/ipenginez_config.py index 42483ed..402f7fd 100644 --- a/IPython/config/default/ipenginez_config.py +++ b/IPython/config/default/ipenginez_config.py @@ -29,10 +29,10 @@ c = get_config() # c.Global.connect_delay = 0.1 # c.Global.connect_max_tries = 15 -# By default, the engine will look for the controller's FURL file in its own -# cluster directory. Sometimes, the FURL file will be elsewhere and this -# attribute can be set to the full path of the FURL file. -# c.Global.furl_file = u'' +# By default, the engine will look for the controller's JSON file in its own +# cluster directory. Sometimes, the JSON file will be elsewhere and this +# attribute can be set to the full path of the JSON file. +# c.Global.url_file = u'/path/to/my/ipcontroller-engine.json' # The working directory for the process. The application will use os.chdir # to change to this directory before starting. @@ -78,12 +78,7 @@ c = get_config() # You should not have to change these attributes. -# c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter' - -# c.Global.furl_file_name = u'ipcontroller-engine.furl' - - - +# c.Global.url_file_name = u'ipcontroller-engine.furl' diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 18ae81a..a3fc89e 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -18,7 +18,7 @@ import logging from multiprocessing import Process import zmq - +from zmq.devices import ProcessMonitoredQueue # internal: from IPython.utils.importstring import import_item from IPython.utils.traitlets import Int, Str, Instance, List, Bool @@ -38,6 +38,8 @@ class ControllerFactory(HubFactory): """Configurable for setting up a Hub and Schedulers.""" usethreads = Bool(False, config=True) + # pure-zmq downstream HWM + hwm = Int(0, config=True) # internal children = List() @@ -52,22 +54,28 @@ class ControllerFactory(HubFactory): def start(self): super(ControllerFactory, self).start() + child_procs = [] for child in self.children: child.start() - if not self.usethreads: - signal_children([ getattr(c, 'launcher', c) for c in self.children ]) + if isinstance(child, ProcessMonitoredQueue): + child_procs.append(child.launcher) + elif isinstance(child, Process): + child_procs.append(child) + if child_procs: + signal_children(child_procs) def construct_schedulers(self): children = self.children mq = import_item(self.mq_class) + maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') q.bind_in(self.client_info['iopub']) q.bind_out(self.engine_info['iopub']) q.setsockopt_out(zmq.SUBSCRIBE, '') - q.connect_mon(self.monitor_url) + q.connect_mon(maybe_inproc) q.daemon=True children.append(q) @@ -75,7 +83,7 @@ class ControllerFactory(HubFactory): q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') q.bind_in(self.client_info['mux']) q.bind_out(self.engine_info['mux']) - q.connect_mon(self.monitor_url) + q.connect_mon(maybe_inproc) q.daemon=True children.append(q) @@ -83,16 +91,17 @@ class ControllerFactory(HubFactory): q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') q.bind_in(self.client_info['control']) q.bind_out(self.engine_info['control']) - q.connect_mon(self.monitor_url) + q.connect_mon(maybe_inproc) q.daemon=True children.append(q) # Task Queue (in a Process) if self.scheme == 'pure': self.log.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') + q.setsockopt_out(zmq.HWM, self.hwm) q.bind_in(self.client_info['task'][1]) q.bind_out(self.engine_info['task']) - q.connect_mon(self.monitor_url) + q.connect_mon(maybe_inproc) q.daemon=True children.append(q) elif self.scheme == 'none': diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 5fc35d4..a9b5179 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -26,7 +26,7 @@ from zmq.eventloop.zmqstream import ZMQStream # internal: from IPython.config.configurable import Configurable -from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool +from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool from IPython.utils.importstring import import_item from entry_point import select_random_ports @@ -138,18 +138,18 @@ class HubFactory(RegistrationFactory): ping = Int(1000, config=True) # ping frequency - engine_ip = Str('127.0.0.1', config=True) - engine_transport = Str('tcp', config=True) + engine_ip = CStr('127.0.0.1', config=True) + engine_transport = CStr('tcp', config=True) - client_ip = Str('127.0.0.1', config=True) - client_transport = Str('tcp', config=True) + client_ip = CStr('127.0.0.1', config=True) + client_transport = CStr('tcp', config=True) - monitor_ip = Str('127.0.0.1', config=True) - monitor_transport = Str('tcp', config=True) + monitor_ip = CStr('127.0.0.1', config=True) + monitor_transport = CStr('tcp', config=True) - monitor_url = Str('') + monitor_url = CStr('') - db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True) + db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True) # not configurable db = Instance('IPython.zmq.parallel.dictdb.BaseDB') @@ -234,6 +234,7 @@ class HubFactory(RegistrationFactory): sub = ctx.socket(zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, "") sub.bind(self.monitor_url) + sub.bind('inproc://monitor') sub = ZMQStream(sub, loop) # connect the db diff --git a/IPython/zmq/parallel/ipclusterapp.py b/IPython/zmq/parallel/ipclusterapp.py index cbeed95..658c9bc 100755 --- a/IPython/zmq/parallel/ipclusterapp.py +++ b/IPython/zmq/parallel/ipclusterapp.py @@ -38,7 +38,7 @@ from IPython.zmq.parallel.clusterdir import ( #----------------------------------------------------------------------------- -default_config_file_name = u'ipcluster_config.py' +default_config_file_name = u'ipclusterz_config.py' _description = """\ diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py index e5c6334..4ce8eb3 100755 --- a/IPython/zmq/parallel/ipcontrollerapp.py +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -49,7 +49,7 @@ from util import disambiguate_ip_address, split_url #: The default config file name for this application -default_config_file_name = u'ipcontroller_config.py' +default_config_file_name = u'ipcontrollerz_config.py' _description = """Start the IPython controller for parallel computing. @@ -58,7 +58,7 @@ The IPython controller provides a gateway between the IPython engines and clients. The controller needs to be started before the engines and can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are usually located in -your .ipython directory and named as "cluster_". See the --profile +your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ @@ -189,6 +189,7 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): help='The (2) ports the IOPub scheduler will listen on for client,engine ' 'connections, respectively [default: random]', metavar='Scheduler.iopub_ports') + paa('--scheme', type=str, dest='HubFactory.scheme', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], @@ -198,6 +199,12 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): dest='ControllerFactory.usethreads', action="store_true", help='Use threads instead of processes for the schedulers', ) + paa('--hwm', + dest='ControllerFactory.hwm', type=int, + help='specify the High Water Mark (HWM) for the downstream ' + 'socket in the pure ZMQ scheduler. This is the maximum number ' + 'of allowed outstanding tasks on each engine.', + ) ## Global config paa('--log-to-file', @@ -206,9 +213,9 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): paa('--log-url', type=str, dest='Global.log_url', help='Broadcast logs to an iploggerz process [default: disabled]') - paa('-r','--reuse-key', - action='store_true', dest='Global.reuse_key', - help='Try to reuse existing execution keys.') + paa('-r','--reuse-files', + action='store_true', dest='Global.reuse_files', + help='Try to reuse existing json connection files.') paa('--no-secure', action='store_false', dest='Global.secure', help='Turn off execution keys (default).') @@ -255,7 +262,7 @@ class IPControllerApp(ApplicationWithClusterDir): self.default_config.Global.import_statements = [] self.default_config.Global.clean_logs = True self.default_config.Global.secure = True - self.default_config.Global.reuse_key = False + self.default_config.Global.reuse_files = False self.default_config.Global.exec_key = "exec_key.key" self.default_config.Global.sshserver = None self.default_config.Global.location = None @@ -293,24 +300,53 @@ class IPControllerApp(ApplicationWithClusterDir): with open(fname, 'w') as f: f.write(json.dumps(cdict, indent=2)) os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) + + def load_config_from_json(self): + """load config from existing json connector files.""" + c = self.master_config + # load from engine config + with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f: + cfg = json.loads(f.read()) + key = c.SessionFactory.exec_key = cfg['exec_key'] + xport,addr = cfg['url'].split('://') + c.HubFactory.engine_transport = xport + ip,ports = addr.split(':') + c.HubFactory.engine_ip = ip + c.HubFactory.regport = int(ports) + c.Global.location = cfg['location'] + # load client config + with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f: + cfg = json.loads(f.read()) + assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" + xport,addr = cfg['url'].split('://') + c.HubFactory.client_transport = xport + ip,ports = addr.split(':') + c.HubFactory.client_ip = ip + c.Global.sshserver = cfg['ssh'] + assert int(ports) == c.HubFactory.regport, "regport mismatch" + def construct(self): # This is the working dir by now. sys.path.insert(0, '') c = self.master_config self.import_statements() - - if c.Global.secure: + reusing = c.Global.reuse_files + if reusing: + try: + self.load_config_from_json() + except (AssertionError,IOError): + reusing=False + # check again, because reusing may have failed: + if reusing: + pass + elif c.Global.secure: keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) - if not c.Global.reuse_key or not os.path.exists(keyfile): - key = str(uuid.uuid4()) - with open(keyfile, 'w') as f: - f.write(key) - os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - else: - with open(keyfile) as f: - key = f.read().strip() + key = str(uuid.uuid4()) + with open(keyfile, 'w') as f: + f.write(key) + os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) c.SessionFactory.exec_key = key else: c.SessionFactory.exec_key = '' @@ -324,16 +360,18 @@ class IPControllerApp(ApplicationWithClusterDir): self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) - f = self.factory - cdict = {'exec_key' : key, - 'ssh' : c.Global.sshserver, - 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), - 'location' : c.Global.location - } - self.save_connection_dict('ipcontroller-client.json', cdict) - edict = cdict - edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) - self.save_connection_dict('ipcontroller-engine.json', edict) + if not reusing: + # save to new json config files + f = self.factory + cdict = {'exec_key' : key, + 'ssh' : c.Global.sshserver, + 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), + 'location' : c.Global.location + } + self.save_connection_dict('ipcontroller-client.json', cdict) + edict = cdict + edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) + self.save_connection_dict('ipcontroller-engine.json', edict) def save_urls(self): diff --git a/IPython/zmq/parallel/ipengineapp.py b/IPython/zmq/parallel/ipengineapp.py index 16773ff..9521dbb 100755 --- a/IPython/zmq/parallel/ipengineapp.py +++ b/IPython/zmq/parallel/ipengineapp.py @@ -40,7 +40,7 @@ from util import disambiguate_url #----------------------------------------------------------------------------- #: The default config file name for this application -default_config_file_name = u'ipengine_config.py' +default_config_file_name = u'ipenginez_config.py' mpi4py_init = """from mpi4py import MPI as mpi @@ -64,7 +64,7 @@ IPython engines run in parallel and perform computations on behalf of a client and controller. A controller needs to be started before the engines. The engine can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are -usually located in your .ipython directory and named as "cluster_". +usually located in your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ @@ -79,7 +79,7 @@ class IPEngineAppConfigLoader(ClusterDirConfigLoader): super(IPEngineAppConfigLoader, self)._add_arguments() paa = self.parser.add_argument # Controller config - paa('--file', + paa('--file', '-f', type=unicode, dest='Global.url_file', help='The full location of the file containing the connection information fo ' 'controller. If this is not given, the file must be in the ' diff --git a/IPython/zmq/parallel/iploggerapp.py b/IPython/zmq/parallel/iploggerapp.py index e8d906d..104fa39 100755 --- a/IPython/zmq/parallel/iploggerapp.py +++ b/IPython/zmq/parallel/iploggerapp.py @@ -39,7 +39,7 @@ IPython controllers and engines (and your own processes) can broadcast log messa by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The logger can be configured using command line options or using a cluster directory. Cluster directories contain config, log and security files and are -usually located in your .ipython directory and named as "cluster_". +usually located in your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """