From ebe5cf593f6afd418406e269a8edb9479c53b4f8 2011-06-20 23:40:13 From: MinRK Date: 2011-06-20 23:40:13 Subject: [PATCH] all ipcluster scripts in some degree of working order with new config --- diff --git a/IPython/parallel/apps/clusterdir.py b/IPython/parallel/apps/clusterdir.py index a91a2cb..d7da231 100755 --- a/IPython/parallel/apps/clusterdir.py +++ b/IPython/parallel/apps/clusterdir.py @@ -25,16 +25,18 @@ import sys from subprocess import Popen, PIPE -from IPython.config.loader import PyFileConfigLoader +from IPython.config.loader import PyFileConfigLoader, Config from IPython.config.configurable import Configurable -from IPython.core.application import Application, BaseAppConfigLoader +from IPython.config.application import Application from IPython.core.crashhandler import CrashHandler +from IPython.core.newapplication import BaseIPythonApplication from IPython.core import release from IPython.utils.path import ( get_ipython_package_dir, + get_ipython_dir, expand_path ) -from IPython.utils.traitlets import Unicode +from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict #----------------------------------------------------------------------------- # Module errors @@ -69,19 +71,45 @@ class ClusterDir(Configurable): security_dir = Unicode(u'') log_dir = Unicode(u'') pid_dir = Unicode(u'') - location = Unicode(u'') - def __init__(self, location=u''): - super(ClusterDir, self).__init__(location=location) + location = Unicode(u'', config=True, + help="""Set the cluster dir. This overrides the logic used by the + `profile` option.""", + ) + profile = Unicode(u'default', + help="""The string name of the profile to be used. This determines the name + of the cluster dir as: cluster_. The default profile is named + 'default'. The cluster directory is resolve this way if the + `cluster_dir` option is not used.""", config=True + ) + + _location_isset = Bool(False) # flag for detecting multiply set location + _new_dir = Bool(False) # flag for whether a new dir was created + + def __init__(self, **kwargs): + super(ClusterDir, self).__init__(**kwargs) + if not self.location: + self._profile_changed('profile', 'default', self.profile) def _location_changed(self, name, old, new): + if self._location_isset: + raise RuntimeError("Cannot set ClusterDir more than once.") + self._location_isset = True if not os.path.isdir(new): os.makedirs(new) + self._new_dir = True + # ensure config files exist: + self.copy_all_config_files(overwrite=False) self.security_dir = os.path.join(new, self.security_dir_name) self.log_dir = os.path.join(new, self.log_dir_name) self.pid_dir = os.path.join(new, self.pid_dir_name) self.check_dirs() + def _profile_changed(self, name, old, new): + if self._location_isset: + raise RuntimeError("ClusterDir already set. Cannot set by profile.") + self.location = os.path.join(get_ipython_dir(), 'cluster_'+new) + def _log_dir_changed(self, name, old, new): self.check_log_dir() @@ -110,18 +138,6 @@ class ClusterDir(Configurable): self.check_log_dir() self.check_pid_dir() - def load_config_file(self, filename): - """Load a config file from the top level of the cluster dir. - - Parameters - ---------- - filename : unicode or str - The filename only of the config file that must be located in - the top-level of the cluster directory. - """ - loader = PyFileConfigLoader(filename, self.location) - return loader.load_config() - def copy_config_file(self, config_file, path=None, overwrite=False): """Copy a default config file into the active cluster directory. @@ -227,59 +243,6 @@ class ClusterDir(Configurable): #----------------------------------------------------------------------------- -# Command line options -#----------------------------------------------------------------------------- - -class ClusterDirConfigLoader(BaseAppConfigLoader): - - def _add_cluster_profile(self, parser): - paa = parser.add_argument - paa('-p', '--profile', - dest='Global.profile',type=unicode, - help= - """The string name of the profile to be used. This determines the name - of the cluster dir as: cluster_. The default profile is named - 'default'. The cluster directory is resolve this way if the - --cluster-dir option is not used.""", - metavar='Global.profile') - - def _add_cluster_dir(self, parser): - paa = parser.add_argument - paa('--cluster-dir', - dest='Global.cluster_dir',type=unicode, - help="""Set the cluster dir. This overrides the logic used by the - --profile option.""", - metavar='Global.cluster_dir') - - def _add_work_dir(self, parser): - paa = parser.add_argument - paa('--work-dir', - dest='Global.work_dir',type=unicode, - help='Set the working dir for the process.', - metavar='Global.work_dir') - - def _add_clean_logs(self, parser): - paa = parser.add_argument - paa('--clean-logs', - dest='Global.clean_logs', action='store_true', - help='Delete old log flies before starting.') - - def _add_no_clean_logs(self, parser): - paa = parser.add_argument - paa('--no-clean-logs', - dest='Global.clean_logs', action='store_false', - help="Don't Delete old log flies before starting.") - - def _add_arguments(self): - super(ClusterDirConfigLoader, self)._add_arguments() - self._add_cluster_profile(self.parser) - self._add_cluster_dir(self.parser) - self._add_work_dir(self.parser) - self._add_clean_logs(self.parser) - self._add_no_clean_logs(self.parser) - - -#----------------------------------------------------------------------------- # Crash handler for this application #----------------------------------------------------------------------------- @@ -312,8 +275,8 @@ class ClusterDirCrashHandler(CrashHandler): message_template = _message_template def __init__(self, app): - contact_name = release.authors['Brian'][0] - contact_email = release.authors['Brian'][1] + contact_name = release.authors['Min'][0] + contact_email = release.authors['Min'][1] bug_tracker = 'http://github.com/ipython/ipython/issues' super(ClusterDirCrashHandler,self).__init__( app, contact_name, contact_email, bug_tracker @@ -323,8 +286,25 @@ class ClusterDirCrashHandler(CrashHandler): #----------------------------------------------------------------------------- # Main application #----------------------------------------------------------------------------- - -class ApplicationWithClusterDir(Application): +base_aliases = { + 'profile' : "ClusterDir.profile", + 'cluster_dir' : 'ClusterDir.location', + 'log_level' : 'Application.log_level', + 'work_dir' : 'ClusterDirApplicaiton.work_dir', + 'log_to_file' : 'ClusterDirApplicaiton.log_to_file', + 'clean_logs' : 'ClusterDirApplicaiton.clean_logs', + 'log_url' : 'ClusterDirApplicaiton.log_url', +} + +base_flags = { + 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"), + 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"), + 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file") +} +for k,v in base_flags.iteritems(): + base_flags[k] = (Config(v[0]),v[1]) + +class ClusterDirApplication(BaseIPythonApplication): """An application that puts everything into a cluster directory. Instead of looking for things in the ipython_dir, this type of application @@ -343,22 +323,37 @@ class ApplicationWithClusterDir(Application): dir and named the value of the ``config_file_name`` class attribute. """ - command_line_loader = ClusterDirConfigLoader crash_handler_class = ClusterDirCrashHandler - auto_create_cluster_dir = True + auto_create_cluster_dir = Bool(True, config=True, + help="whether to create the cluster_dir if it doesn't exist") # temporarily override default_log_level to INFO default_log_level = logging.INFO + cluster_dir = Instance(ClusterDir) + + work_dir = Unicode(os.getcwdu(), config=True, + help='Set the working dir for the process.' + ) + def _work_dir_changed(self, name, old, new): + self.work_dir = unicode(expand_path(new)) + + log_to_file = Bool(config=True, + help="whether to log to a file") + + clean_logs = Bool(True, shortname='--clean-logs', config=True, + help="whether to cleanup old logfiles before starting") - def create_default_config(self): - super(ApplicationWithClusterDir, self).create_default_config() - self.default_config.Global.profile = u'default' - self.default_config.Global.cluster_dir = u'' - self.default_config.Global.work_dir = os.getcwd() - self.default_config.Global.log_to_file = False - self.default_config.Global.log_url = None - self.default_config.Global.clean_logs = False + log_url = CStr('', shortname='--log-url', config=True, + help="The ZMQ URL of the iplooger to aggregate logging.") - def find_resources(self): + config_file = Unicode(u'', config=True, + help="""Path to ipcontroller configuration file. The default is to use + _config.py, as found by cluster-dir.""" + ) + + aliases = Dict(base_aliases) + flags = Dict(base_flags) + + def init_clusterdir(self): """This resolves the cluster directory. This tries to find the cluster directory and if successful, it will @@ -375,121 +370,53 @@ class ApplicationWithClusterDir(Application): ``True``, then create the new cluster dir in the IPython directory. 4. If all fails, then raise :class:`ClusterDirError`. """ - - try: - cluster_dir = self.command_line_config.Global.cluster_dir - except AttributeError: - cluster_dir = self.default_config.Global.cluster_dir - cluster_dir = expand_path(cluster_dir) - try: - self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) - except ClusterDirError: - pass - else: - self.log.info('Using existing cluster dir: %s' % \ - self.cluster_dir_obj.location - ) - self.finish_cluster_dir() - return - - try: - self.profile = self.command_line_config.Global.profile - except AttributeError: - self.profile = self.default_config.Global.profile - try: - self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( - self.ipython_dir, self.profile) - except ClusterDirError: - pass - else: - self.log.info('Using existing cluster dir: %s' % \ - self.cluster_dir_obj.location - ) - self.finish_cluster_dir() - return - - if self.auto_create_cluster_dir: - self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( - self.ipython_dir, self.profile - ) + self.cluster_dir = ClusterDir(config=self.config) + if self.cluster_dir._new_dir: self.log.info('Creating new cluster dir: %s' % \ - self.cluster_dir_obj.location - ) - self.finish_cluster_dir() + self.cluster_dir.location) else: - raise ClusterDirError('Could not find a valid cluster directory.') - - def finish_cluster_dir(self): - # Set the cluster directory - self.cluster_dir = self.cluster_dir_obj.location - - # These have to be set because they could be different from the one - # that we just computed. Because command line has the highest - # priority, this will always end up in the master_config. - self.default_config.Global.cluster_dir = self.cluster_dir - self.command_line_config.Global.cluster_dir = self.cluster_dir - - def find_config_file_name(self): - """Find the config file name for this application.""" - # For this type of Application it should be set as a class attribute. - if not hasattr(self, 'default_config_file_name'): - self.log.critical("No config filename found") - else: - self.config_file_name = self.default_config_file_name - - def find_config_file_paths(self): - # Set the search path to to the cluster directory. We should NOT - # include IPython.config.default here as the default config files - # are ALWAYS automatically moved to the cluster directory. - conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default') - self.config_file_paths = (self.cluster_dir,) - - def pre_construct(self): - # The log and security dirs were set earlier, but here we put them - # into the config and log them. - config = self.master_config - sdir = self.cluster_dir_obj.security_dir - self.security_dir = config.Global.security_dir = sdir - ldir = self.cluster_dir_obj.log_dir - self.log_dir = config.Global.log_dir = ldir - pdir = self.cluster_dir_obj.pid_dir - self.pid_dir = config.Global.pid_dir = pdir - self.log.info("Cluster directory set to: %s" % self.cluster_dir) - config.Global.work_dir = unicode(expand_path(config.Global.work_dir)) - # Change to the working directory. We do this just before construct - # is called so all the components there have the right working dir. - self.to_work_dir() + self.log.info('Using existing cluster dir: %s' % \ + self.cluster_dir.location) def to_work_dir(self): - wd = self.master_config.Global.work_dir - if unicode(wd) != unicode(os.getcwd()): + wd = self.work_dir + if unicode(wd) != os.getcwdu(): os.chdir(wd) self.log.info("Changing to working dir: %s" % wd) - def start_logging(self): - # Remove old log files - if self.master_config.Global.clean_logs: - log_dir = self.master_config.Global.log_dir - for f in os.listdir(log_dir): - if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): - # if f.startswith(self.name + u'-') and f.endswith('.log'): - os.remove(os.path.join(log_dir, f)) - # Start logging to the new log file - if self.master_config.Global.log_to_file: - log_filename = self.name + u'-' + str(os.getpid()) + u'.log' - logfile = os.path.join(self.log_dir, log_filename) - open_log_file = open(logfile, 'w') - elif self.master_config.Global.log_url: - open_log_file = None - else: - open_log_file = sys.stdout - if open_log_file is not None: - self.log.removeHandler(self._log_handler) - self._log_handler = logging.StreamHandler(open_log_file) - self._log_formatter = logging.Formatter("[%(name)s] %(message)s") - self._log_handler.setFormatter(self._log_formatter) - self.log.addHandler(self._log_handler) - # log.startLogging(open_log_file) + def load_config_file(self, filename, path=None): + """Load a .py based config file by filename and path.""" + return Application.load_config_file(self, filename, path=path) + # + # def load_default_config_file(self): + # """Load a .py based config file by filename and path.""" + # return BaseIPythonApplication.load_config_file(self) + + # disable URL-logging + # def init_logging(self): + # # Remove old log files + # if self.master_config.Global.clean_logs: + # log_dir = self.master_config.Global.log_dir + # for f in os.listdir(log_dir): + # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f): + # # if f.startswith(self.name + u'-') and f.endswith('.log'): + # os.remove(os.path.join(log_dir, f)) + # # Start logging to the new log file + # if self.master_config.Global.log_to_file: + # log_filename = self.name + u'-' + str(os.getpid()) + u'.log' + # logfile = os.path.join(self.log_dir, log_filename) + # open_log_file = open(logfile, 'w') + # elif self.master_config.Global.log_url: + # open_log_file = None + # else: + # open_log_file = sys.stdout + # if open_log_file is not None: + # self.log.removeHandler(self._log_handler) + # self._log_handler = logging.StreamHandler(open_log_file) + # self._log_formatter = logging.Formatter("[%(name)s] %(message)s") + # self._log_handler.setFormatter(self._log_formatter) + # self.log.addHandler(self._log_handler) + # # log.startLogging(open_log_file) def write_pid_file(self, overwrite=False): """Create a .pid file in the pid_dir with my pid. @@ -497,7 +424,7 @@ class ApplicationWithClusterDir(Application): This must be called after pre_construct, which sets `self.pid_dir`. This raises :exc:`PIDFileError` if the pid file exists already. """ - pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): pid = self.get_pid_from_file() if not overwrite: @@ -516,7 +443,7 @@ class ApplicationWithClusterDir(Application): :func:`reactor.addSystemEventTrigger`. This needs to return ``None``. """ - pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): try: self.log.info("Removing pid file: %s" % pid_file) @@ -529,7 +456,7 @@ class ApplicationWithClusterDir(Application): If the pid file doesn't exist a :exc:`PIDFileError` is raised. """ - pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid') if os.path.isfile(pid_file): with open(pid_file, 'r') as f: pid = int(f.read().strip()) @@ -563,4 +490,3 @@ class ApplicationWithClusterDir(Application): return True pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)) return pid in pids - \ No newline at end of file diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index 725853f..67ce5f1 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -25,12 +25,14 @@ from subprocess import check_call, CalledProcessError, PIPE import zmq from zmq.eventloop import ioloop -from IPython.external.argparse import ArgumentParser, SUPPRESS +from IPython.config.loader import Config from IPython.utils.importstring import import_item +from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List from IPython.parallel.apps.clusterdir import ( - ApplicationWithClusterDir, ClusterDirConfigLoader, - ClusterDirError, PIDFileError + ClusterDirApplication, ClusterDirError, + PIDFileError, + base_flags, ) @@ -49,9 +51,9 @@ An IPython cluster consists of 1 controller and 1 or more engines. This command automates the startup of these processes using a wide range of startup methods (SSH, local processes, PBS, mpiexec, Windows HPC Server 2008). To start a cluster with 4 engines on your -local host simply do 'ipcluster start -n 4'. For more complex usage -you will typically do 'ipcluster create -p mycluster', then edit -configuration files, followed by 'ipcluster start -p mycluster -n 4'. +local host simply do 'ipcluster start n=4'. For more complex usage +you will typically do 'ipcluster --create profile=mycluster', then edit +configuration files, followed by 'ipcluster --start -p mycluster -n 4'. """ @@ -72,96 +74,9 @@ NO_CLUSTER = 12 #----------------------------------------------------------------------------- -# Command line options +# Main application #----------------------------------------------------------------------------- - - -class IPClusterAppConfigLoader(ClusterDirConfigLoader): - - def _add_arguments(self): - # Don't call ClusterDirConfigLoader._add_arguments as we don't want - # its defaults on self.parser. Instead, we will put those on - # default options on our subparsers. - - # This has all the common options that all subcommands use - parent_parser1 = ArgumentParser( - add_help=False, - argument_default=SUPPRESS - ) - self._add_ipython_dir(parent_parser1) - self._add_log_level(parent_parser1) - - # This has all the common options that other subcommands use - parent_parser2 = ArgumentParser( - add_help=False, - argument_default=SUPPRESS - ) - self._add_cluster_profile(parent_parser2) - self._add_cluster_dir(parent_parser2) - self._add_work_dir(parent_parser2) - paa = parent_parser2.add_argument - paa('--log-to-file', - action='store_true', dest='Global.log_to_file', - help='Log to a file in the log directory (default is stdout)') - - # Create the object used to create the subparsers. - subparsers = self.parser.add_subparsers( - dest='Global.subcommand', - title='ipcluster subcommands', - description= - """ipcluster has a variety of subcommands. The general way of - running ipcluster is 'ipcluster [options]'. To get help - on a particular subcommand do 'ipcluster -h'.""" - # help="For more help, type 'ipcluster -h'", - ) - - # The "list" subcommand parser - parser_list = subparsers.add_parser( - 'list', - parents=[parent_parser1], - argument_default=SUPPRESS, - help="List all clusters in cwd and ipython_dir.", - description= - """List all available clusters, by cluster directory, that can - be found in the current working directly or in the ipython - directory. Cluster directories are named using the convention - 'cluster_'.""" - ) - - # The "create" subcommand parser - parser_create = subparsers.add_parser( - 'create', - parents=[parent_parser1, parent_parser2], - argument_default=SUPPRESS, - help="Create a new cluster directory.", - description= - """Create an ipython cluster directory by its profile name or - cluster directory path. Cluster directories contain - configuration, log and security related files and are named - using the convention 'cluster_'. By default they are - located in your ipython directory. Once created, you will - probably need to edit the configuration files in the cluster - directory to configure your cluster. Most users will create a - cluster directory by profile name, - 'ipcluster create -p mycluster', which will put the directory - in '/cluster_mycluster'. - """ - ) - paa = parser_create.add_argument - paa('--reset-config', - dest='Global.reset_config', action='store_true', - help= - """Recopy the default config files to the cluster directory. - You will loose any modifications you have made to these files.""") - - # The "start" subcommand parser - parser_start = subparsers.add_parser( - 'start', - parents=[parent_parser1, parent_parser2], - argument_default=SUPPRESS, - help="Start a cluster.", - description= - """Start an ipython cluster by its profile name or cluster +start_help = """Start an ipython cluster by its profile name or cluster directory. Cluster directories contain configuration, log and security related files and are named using the convention 'cluster_' and should be creating using the 'start' @@ -170,121 +85,130 @@ class IPClusterAppConfigLoader(ClusterDirConfigLoader): using its profile name, 'ipcluster start -n 4 -p `, otherwise use the '--cluster-dir' option. """ - ) - - paa = parser_start.add_argument - paa('-n', '--number', - type=int, dest='Global.n', - help='The number of engines to start.', - metavar='Global.n') - paa('--clean-logs', - dest='Global.clean_logs', action='store_true', - help='Delete old log flies before starting.') - paa('--no-clean-logs', - dest='Global.clean_logs', action='store_false', - help="Don't delete old log flies before starting.") - paa('--daemon', - dest='Global.daemonize', action='store_true', - help='Daemonize the ipcluster program. This implies --log-to-file') - paa('--no-daemon', - dest='Global.daemonize', action='store_false', - help="Dont't daemonize the ipcluster program.") - paa('--delay', - type=float, dest='Global.delay', - help="Specify the delay (in seconds) between starting the controller and starting the engine(s).") - - # The "stop" subcommand parser - parser_stop = subparsers.add_parser( - 'stop', - parents=[parent_parser1, parent_parser2], - argument_default=SUPPRESS, - help="Stop a running cluster.", - description= - """Stop a running ipython cluster by its profile name or cluster +stop_help = """Stop a running ipython cluster by its profile name or cluster directory. Cluster directories are named using the convention 'cluster_'. If your cluster directory is in the cwd or the ipython directory, you can simply refer to it using its profile name, 'ipcluster stop -p `, otherwise use the '--cluster-dir' option. """ - ) - paa = parser_stop.add_argument - paa('--signal', - dest='Global.signal', type=int, - help="The signal number to use in stopping the cluster (default=2).", - metavar="Global.signal") - - # the "engines" subcommand parser - parser_engines = subparsers.add_parser( - 'engines', - parents=[parent_parser1, parent_parser2], - argument_default=SUPPRESS, - help="Attach some engines to an existing controller or cluster.", - description= - """Start one or more engines to connect to an existing Cluster +engines_help = """Start one or more engines to connect to an existing Cluster by profile name or cluster directory. Cluster directories contain configuration, log and security related files and are named using the convention 'cluster_' and should be creating using the 'start' subcommand of 'ipcluster'. If your cluster directory is in the cwd or the ipython directory, you can simply refer to it - using its profile name, 'ipcluster engines -n 4 -p `, - otherwise use the '--cluster-dir' option. + using its profile name, 'ipcluster --engines -n 4 -p `, + otherwise use the 'cluster_dir' option. """ - ) - paa = parser_engines.add_argument - paa('-n', '--number', - type=int, dest='Global.n', - help='The number of engines to start.', - metavar='Global.n') - paa('--daemon', - dest='Global.daemonize', action='store_true', - help='Daemonize the ipcluster program. This implies --log-to-file') - paa('--no-daemon', - dest='Global.daemonize', action='store_false', - help="Dont't daemonize the ipcluster program.") +create_help = """Create an ipython cluster directory by its profile name or + cluster directory path. Cluster directories contain + configuration, log and security related files and are named + using the convention 'cluster_'. By default they are + located in your ipython directory. Once created, you will + probably need to edit the configuration files in the cluster + directory to configure your cluster. Most users will create a + cluster directory by profile name, + 'ipcluster create -p mycluster', which will put the directory + in '/cluster_mycluster'. + """ +list_help = """List all available clusters, by cluster directory, that can + be found in the current working directly or in the ipython + directory. Cluster directories are named using the convention + 'cluster_'.""" -#----------------------------------------------------------------------------- -# Main application -#----------------------------------------------------------------------------- +flags = {} +flags.update(base_flags) +flags.update({ + 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help), + 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help), + 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help), + 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help), + 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help), -class IPClusterApp(ApplicationWithClusterDir): +}) + +class IPClusterApp(ClusterDirApplication): name = u'ipcluster' description = _description usage = None - command_line_loader = IPClusterAppConfigLoader default_config_file_name = default_config_file_name default_log_level = logging.INFO auto_create_cluster_dir = False + classes = List() + def _classes_default(self,): + from IPython.parallel.apps import launcher + return launcher.all_launchers + + n = Int(0, config=True, + help="The number of engines to start.") + signal = Int(signal.SIGINT, config=True, + help="signal to use for stopping. [default: SIGINT]") + delay = CFloat(1., config=True, + help="delay (in s) between starting the controller and the engines") + + subcommand = Str('', config=True, + help="""ipcluster has a variety of subcommands. The general way of + running ipcluster is 'ipcluster -- [options]'.""" + ) - def create_default_config(self): - super(IPClusterApp, self).create_default_config() - self.default_config.Global.controller_launcher = \ - 'IPython.parallel.apps.launcher.LocalControllerLauncher' - self.default_config.Global.engine_launcher = \ - 'IPython.parallel.apps.launcher.LocalEngineSetLauncher' - self.default_config.Global.n = 2 - self.default_config.Global.delay = 2 - self.default_config.Global.reset_config = False - self.default_config.Global.clean_logs = True - self.default_config.Global.signal = signal.SIGINT - self.default_config.Global.daemonize = False - - def find_resources(self): - subcommand = self.command_line_config.Global.subcommand + controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher', + config=True, + help="The class for launching a Controller." + ) + engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher', + config=True, + help="The class for launching Engines." + ) + reset = Bool(False, config=True, + help="Whether to reset config files as part of '--create'." + ) + daemonize = Bool(False, config=True, + help='Daemonize the ipcluster program. This implies --log-to-file') + + def _daemonize_changed(self, name, old, new): + if new: + self.log_to_file = True + + def _n_changed(self, name, old, new): + # propagate n all over the place... + # TODO make this clean + # ensure all classes are covered. + self.config.LocalEngineSetLauncher.n=new + self.config.MPIExecEngineSetLauncher.n=new + self.config.SSHEngineSetLauncher.n=new + self.config.PBSEngineSetLauncher.n=new + self.config.SGEEngineSetLauncher.n=new + self.config.WinHPEngineSetLauncher.n=new + + aliases = Dict(dict( + n='IPClusterApp.n', + signal = 'IPClusterApp.signal', + delay = 'IPClusterApp.delay', + clauncher = 'IPClusterApp.controller_launcher_class', + elauncher = 'IPClusterApp.engine_launcher_class', + )) + flags = Dict(flags) + + def init_clusterdir(self): + subcommand = self.subcommand if subcommand=='list': self.list_cluster_dirs() - # Exit immediately because there is nothing left to do. - self.exit() - elif subcommand=='create': + self.exit(0) + if subcommand=='create': + reset = self.reset_config self.auto_create_cluster_dir = True - super(IPClusterApp, self).find_resources() + super(IPClusterApp, self).init_clusterdir() + self.log.info('Copying default config files to cluster directory ' + '[overwrite=%r]' % (reset,)) + self.cluster_dir.copy_all_config_files(overwrite=reset) elif subcommand=='start' or subcommand=='stop': self.auto_create_cluster_dir = True try: - super(IPClusterApp, self).find_resources() + super(IPClusterApp, self).init_clusterdir() except ClusterDirError: raise ClusterDirError( "Could not find a cluster directory. A cluster dir must " @@ -295,7 +219,7 @@ class IPClusterApp(ApplicationWithClusterDir): elif subcommand=='engines': self.auto_create_cluster_dir = False try: - super(IPClusterApp, self).find_resources() + super(IPClusterApp, self).init_clusterdir() except ClusterDirError: raise ClusterDirError( "Could not find a cluster directory. A cluster dir must " @@ -312,9 +236,9 @@ class IPClusterApp(ApplicationWithClusterDir): else: cluster_dir_paths = [] try: - ipython_dir = self.command_line_config.Global.ipython_dir + ipython_dir = self.ipython_dir except AttributeError: - ipython_dir = self.default_config.Global.ipython_dir + ipython_dir = self.ipython_dir paths = [os.getcwd(), ipython_dir] + \ cluster_dir_paths paths = list(set(paths)) @@ -326,32 +250,13 @@ class IPClusterApp(ApplicationWithClusterDir): full_path = os.path.join(path, f) if os.path.isdir(full_path) and f.startswith('cluster_'): profile = full_path.split('_')[-1] - start_cmd = 'ipcluster start -p %s -n 4' % profile + start_cmd = 'ipcluster --start profile=%s n=4' % profile print start_cmd + " ==> " + full_path - def pre_construct(self): - # IPClusterApp.pre_construct() is where we cd to the working directory. - super(IPClusterApp, self).pre_construct() - config = self.master_config - try: - daemon = config.Global.daemonize - if daemon: - config.Global.log_to_file = True - except AttributeError: - pass - - def construct(self): - config = self.master_config - subcmd = config.Global.subcommand - reset = config.Global.reset_config - if subcmd == 'list': - return - if subcmd == 'create': - self.log.info('Copying default config files to cluster directory ' - '[overwrite=%r]' % (reset,)) - self.cluster_dir_obj.copy_all_config_files(overwrite=reset) + def init_launchers(self): + config = self.config + subcmd = self.subcommand if subcmd =='start': - self.cluster_dir_obj.copy_all_config_files(overwrite=False) self.start_logging() self.loop = ioloop.IOLoop.instance() # reactor.callWhenRunning(self.start_launchers) @@ -366,16 +271,19 @@ class IPClusterApp(ApplicationWithClusterDir): dc.start() def start_launchers(self, controller=True): - config = self.master_config + config = self.config # Create the launchers. In both bases, we set the work_dir of # the launcher to the cluster_dir. This is where the launcher's # subprocesses will be launched. It is not where the controller # and engine will be launched. if controller: - cl_class = import_item(config.Global.controller_launcher) + clsname = self.controller_launcher_class + if '.' not in clsname: + clsname = 'IPython.parallel.apps.launcher.'+clsname + cl_class = import_item(clsname) self.controller_launcher = cl_class( - work_dir=self.cluster_dir, config=config, + work_dir=self.cluster_dir.location, config=config, logname=self.log.name ) # Setup the observing of stopping. If the controller dies, shut @@ -391,9 +299,15 @@ class IPClusterApp(ApplicationWithClusterDir): else: self.controller_launcher = None - el_class = import_item(config.Global.engine_launcher) + clsname = self.engine_launcher_class + if '.' not in clsname: + # not a module, presume it's the raw name in apps.launcher + clsname = 'IPython.parallel.apps.launcher.'+clsname + print repr(clsname) + el_class = import_item(clsname) + self.engine_launcher = el_class( - work_dir=self.cluster_dir, config=config, logname=self.log.name + work_dir=self.cluster_dir.location, config=config, logname=self.log.name ) # Setup signals @@ -403,7 +317,7 @@ class IPClusterApp(ApplicationWithClusterDir): self._stopping = False # Make sure stop_launchers is not called 2x. if controller: self.start_controller() - dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop) + dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop) dc.start() self.startup_message() @@ -413,19 +327,19 @@ class IPClusterApp(ApplicationWithClusterDir): def start_controller(self, r=None): # self.log.info("In start_controller") - config = self.master_config + config = self.config d = self.controller_launcher.start( - cluster_dir=config.Global.cluster_dir + cluster_dir=self.cluster_dir.location ) return d def start_engines(self, r=None): # self.log.info("In start_engines") - config = self.master_config + config = self.config d = self.engine_launcher.start( - config.Global.n, - cluster_dir=config.Global.cluster_dir + self.n, + cluster_dir=self.cluster_dir.location ) return d @@ -469,18 +383,19 @@ class IPClusterApp(ApplicationWithClusterDir): def start_logging(self): # Remove old log files of the controller and engine - if self.master_config.Global.clean_logs: - log_dir = self.master_config.Global.log_dir + if self.clean_logs: + log_dir = self.cluster_dir.log_dir for f in os.listdir(log_dir): if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f): os.remove(os.path.join(log_dir, f)) # This will remove old log files for ipcluster itself - super(IPClusterApp, self).start_logging() + # super(IPClusterApp, self).start_logging() - def start_app(self): + def start(self): """Start the application, depending on what subcommand is used.""" - subcmd = self.master_config.Global.subcommand - if subcmd=='create' or subcmd=='list': + subcmd = self.subcommand + if subcmd=='create': + # init_clusterdir step completed create action return elif subcmd=='start': self.start_app_start() @@ -488,10 +403,14 @@ class IPClusterApp(ApplicationWithClusterDir): self.start_app_stop() elif subcmd=='engines': self.start_app_engines() + else: + self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'" + " must be specified") + self.exit(-1) def start_app_start(self): """Start the app for the start subcommand.""" - config = self.master_config + config = self.config # First see if the cluster is already running try: pid = self.get_pid_from_file() @@ -512,10 +431,10 @@ class IPClusterApp(ApplicationWithClusterDir): # Now log and daemonize self.log.info( - 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize + 'Starting ipcluster with [daemon=%r]' % self.daemonize ) # TODO: Get daemonize working on Windows or as a Windows Server. - if config.Global.daemonize: + if self.daemonize: if os.name=='posix': from twisted.scripts._twistd_unix import daemonize daemonize() @@ -536,15 +455,15 @@ class IPClusterApp(ApplicationWithClusterDir): def start_app_engines(self): """Start the app for the start subcommand.""" - config = self.master_config + config = self.config # First see if the cluster is already running # Now log and daemonize self.log.info( - 'Starting engines with [daemon=%r]' % config.Global.daemonize + 'Starting engines with [daemon=%r]' % self.daemonize ) # TODO: Get daemonize working on Windows or as a Windows Server. - if config.Global.daemonize: + if self.daemonize: if os.name=='posix': from twisted.scripts._twistd_unix import daemonize daemonize() @@ -564,7 +483,7 @@ class IPClusterApp(ApplicationWithClusterDir): def start_app_stop(self): """Start the app for the stop subcommand.""" - config = self.master_config + config = self.config try: pid = self.get_pid_from_file() except PIDFileError: @@ -586,7 +505,7 @@ class IPClusterApp(ApplicationWithClusterDir): self.exit(ALREADY_STOPPED) elif os.name=='posix': - sig = config.Global.signal + sig = self.signal self.log.info( "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig) ) @@ -609,6 +528,20 @@ class IPClusterApp(ApplicationWithClusterDir): def launch_new_instance(): """Create and run the IPython cluster.""" app = IPClusterApp() + app.parse_command_line() + cl_config = app.config + app.init_clusterdir() + if app.config_file: + app.load_config_file(app.config_file) + else: + app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) + # command-line should *override* config file, but command-line is necessary + # to determine clusterdir, etc. + app.update_config(cl_config) + + app.to_work_dir() + app.init_launchers() + app.start() diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 0cc69ac..c00e8f9 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -25,7 +25,10 @@ import stat import sys import uuid +from multiprocessing import Process + import zmq +from zmq.devices import ProcessMonitoredQueue from zmq.log.handlers import PUBHandler from zmq.utils import jsonapi as json @@ -34,14 +37,31 @@ from IPython.config.loader import Config from IPython.parallel import factory from IPython.parallel.apps.clusterdir import ( - ApplicationWithClusterDir, - ClusterDirConfigLoader + ClusterDir, + ClusterDirApplication, + base_flags + # ClusterDirConfigLoader ) -from IPython.parallel.util import disambiguate_ip_address, split_url -# from IPython.kernel.fcutil import FCServiceFactory, FURLError -from IPython.utils.traitlets import Instance, Unicode +from IPython.utils.importstring import import_item +from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict + +# from IPython.parallel.controller.controller import ControllerFactory +from IPython.parallel.streamsession import StreamSession +from IPython.parallel.controller.heartmonitor import HeartMonitor +from IPython.parallel.controller.hub import Hub, HubFactory +from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler +from IPython.parallel.controller.sqlitedb import SQLiteDB + +from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url -from IPython.parallel.controller.controller import ControllerFactory +# conditional import of MongoDB backend class + +try: + from IPython.parallel.controller.mongodb import MongoDB +except ImportError: + maybe_mongo = [] +else: + maybe_mongo = [MongoDB] #----------------------------------------------------------------------------- @@ -63,234 +83,102 @@ your ipython directory and named as "cluster_". See the --profile and --cluster-dir options for details. """ -#----------------------------------------------------------------------------- -# Default interfaces -#----------------------------------------------------------------------------- - -# The default client interfaces for FCClientServiceFactory.interfaces -default_client_interfaces = Config() -default_client_interfaces.Default.url_file = 'ipcontroller-client.url' - -# Make this a dict we can pass to Config.__init__ for the default -default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items())) - - -# The default engine interfaces for FCEngineServiceFactory.interfaces -default_engine_interfaces = Config() -default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url' - -# Make this a dict we can pass to Config.__init__ for the default -default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items())) - - -#----------------------------------------------------------------------------- -# Service factories -#----------------------------------------------------------------------------- - -# -# class FCClientServiceFactory(FCServiceFactory): -# """A Foolscap implementation of the client services.""" -# -# cert_file = Unicode(u'ipcontroller-client.pem', config=True) -# interfaces = Instance(klass=Config, kw=default_client_interfaces, -# allow_none=False, config=True) -# -# -# class FCEngineServiceFactory(FCServiceFactory): -# """A Foolscap implementation of the engine services.""" -# -# cert_file = Unicode(u'ipcontroller-engine.pem', config=True) -# interfaces = Instance(klass=dict, kw=default_engine_interfaces, -# allow_none=False, config=True) -# - -#----------------------------------------------------------------------------- -# Command line options -#----------------------------------------------------------------------------- - - -class IPControllerAppConfigLoader(ClusterDirConfigLoader): - - def _add_arguments(self): - super(IPControllerAppConfigLoader, self)._add_arguments() - paa = self.parser.add_argument - - ## Hub Config: - paa('--mongodb', - dest='HubFactory.db_class', action='store_const', - const='IPython.parallel.controller.mongodb.MongoDB', - help='Use MongoDB for task storage [default: in-memory]') - paa('--sqlite', - dest='HubFactory.db_class', action='store_const', - const='IPython.parallel.controller.sqlitedb.SQLiteDB', - help='Use SQLite3 for DB task storage [default: in-memory]') - paa('--hb', - type=int, dest='HubFactory.hb', nargs=2, - help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' - 'connections [default: random]', - metavar='Hub.hb_ports') - paa('--ping', - type=int, dest='HubFactory.ping', - help='The frequency at which the Hub pings the engines for heartbeats ' - ' (in ms) [default: 100]', - metavar='Hub.ping') - - # Client config - paa('--client-ip', - type=str, dest='HubFactory.client_ip', - help='The IP address or hostname the Hub will listen on for ' - 'client connections. Both engine-ip and client-ip can be set simultaneously ' - 'via --ip [default: loopback]', - metavar='Hub.client_ip') - paa('--client-transport', - type=str, dest='HubFactory.client_transport', - help='The ZeroMQ transport the Hub will use for ' - 'client connections. Both engine-transport and client-transport can be set simultaneously ' - 'via --transport [default: tcp]', - metavar='Hub.client_transport') - paa('--query', - type=int, dest='HubFactory.query_port', - help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]', - metavar='Hub.query_port') - paa('--notifier', - type=int, dest='HubFactory.notifier_port', - help='The port on which the Hub PUB socket will listen for notification connections [default: random]', - metavar='Hub.notifier_port') - - # Engine config - paa('--engine-ip', - type=str, dest='HubFactory.engine_ip', - help='The IP address or hostname the Hub will listen on for ' - 'engine connections. This applies to the Hub and its schedulers' - 'engine-ip and client-ip can be set simultaneously ' - 'via --ip [default: loopback]', - metavar='Hub.engine_ip') - paa('--engine-transport', - type=str, dest='HubFactory.engine_transport', - help='The ZeroMQ transport the Hub will use for ' - 'client connections. Both engine-transport and client-transport can be set simultaneously ' - 'via --transport [default: tcp]', - metavar='Hub.engine_transport') - - # Scheduler config - paa('--mux', - type=int, dest='ControllerFactory.mux', nargs=2, - help='The (2) ports the MUX scheduler will listen on for client,engine ' - 'connections, respectively [default: random]', - metavar='Scheduler.mux_ports') - paa('--task', - type=int, dest='ControllerFactory.task', nargs=2, - help='The (2) ports the Task scheduler will listen on for client,engine ' - 'connections, respectively [default: random]', - metavar='Scheduler.task_ports') - paa('--control', - type=int, dest='ControllerFactory.control', nargs=2, - help='The (2) ports the Control scheduler will listen on for client,engine ' - 'connections, respectively [default: random]', - metavar='Scheduler.control_ports') - paa('--iopub', - type=int, dest='ControllerFactory.iopub', nargs=2, - help='The (2) ports the IOPub scheduler will listen on for client,engine ' - 'connections, respectively [default: random]', - metavar='Scheduler.iopub_ports') - - paa('--scheme', - type=str, dest='HubFactory.scheme', - choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], - help='select the task scheduler scheme [default: Python LRU]', - metavar='Scheduler.scheme') - paa('--usethreads', - dest='ControllerFactory.usethreads', action="store_true", - help='Use threads instead of processes for the schedulers', - ) - paa('--hwm', - dest='TaskScheduler.hwm', type=int, - help='specify the High Water Mark (HWM) ' - 'in the Python scheduler. This is the maximum number ' - 'of allowed outstanding tasks on each engine.', - ) - - ## Global config - paa('--log-to-file', - action='store_true', dest='Global.log_to_file', - help='Log to a file in the log directory (default is stdout)') - paa('--log-url', - type=str, dest='Global.log_url', - help='Broadcast logs to an iploggerz process [default: disabled]') - paa('-r','--reuse-files', - action='store_true', dest='Global.reuse_files', - help='Try to reuse existing json connection files.') - paa('--no-secure', - action='store_false', dest='Global.secure', - help='Turn off execution keys (default).') - paa('--secure', - action='store_true', dest='Global.secure', - help='Turn on execution keys.') - paa('--execkey', - type=str, dest='Global.exec_key', - help='path to a file containing an execution key.', - metavar='keyfile') - paa('--ssh', - type=str, dest='Global.sshserver', - help='ssh url for clients to use when connecting to the Controller ' - 'processes. It should be of the form: [user@]server[:port]. The ' - 'Controller\'s listening addresses must be accessible from the ssh server', - metavar='Global.sshserver') - paa('--location', - type=str, dest='Global.location', - help="The external IP or domain name of this machine, used for disambiguating " - "engine and client connections.", - metavar='Global.location') - factory.add_session_arguments(self.parser) - factory.add_registration_arguments(self.parser) #----------------------------------------------------------------------------- # The main application #----------------------------------------------------------------------------- - - -class IPControllerApp(ApplicationWithClusterDir): +flags = {} +flags.update(base_flags) +flags.update({ + 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}}, + 'Use threads instead of processes for the schedulers'), + 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}}, + 'use the SQLiteDB backend'), + 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}}, + 'use the MongoDB backend'), + 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}}, + 'use the in-memory DictDB backend'), +}) + +flags.update() + +class IPControllerApp(ClusterDirApplication): name = u'ipcontroller' description = _description - command_line_loader = IPControllerAppConfigLoader + # command_line_loader = IPControllerAppConfigLoader default_config_file_name = default_config_file_name auto_create_cluster_dir = True + classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo - - def create_default_config(self): - super(IPControllerApp, self).create_default_config() - # Don't set defaults for Global.secure or Global.reuse_furls - # as those are set in a component. - self.default_config.Global.import_statements = [] - self.default_config.Global.clean_logs = True - self.default_config.Global.secure = True - self.default_config.Global.reuse_files = False - self.default_config.Global.exec_key = "exec_key.key" - self.default_config.Global.sshserver = None - self.default_config.Global.location = None - - def pre_construct(self): - super(IPControllerApp, self).pre_construct() - c = self.master_config - # The defaults for these are set in FCClientServiceFactory and - # FCEngineServiceFactory, so we only set them here if the global - # options have be set to override the class level defaults. + reuse_files = Bool(False, config=True, + help='Whether to reuse existing json connection files [default: False]' + ) + secure = Bool(True, config=True, + help='Whether to use exec_keys for extra authentication [default: True]' + ) + ssh_server = Unicode(u'', config=True, + help="""ssh url for clients to use when connecting to the Controller + processes. It should be of the form: [user@]server[:port]. The + Controller\'s listening addresses must be accessible from the ssh server""", + ) + location = Unicode(u'', config=True, + help="""The external IP or domain name of the Controller, used for disambiguating + engine and client connections.""", + ) + import_statements = List([], config=True, + help="import statements to be run at startup. Necessary in some environments" + ) + + usethreads = Bool(False, config=True, + help='Use threads instead of processes for the schedulers', + ) + + # internal + children = List() + mq_class = CStr('zmq.devices.ProcessMonitoredQueue') + + def _usethreads_changed(self, name, old, new): + self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') + + aliases = Dict(dict( + config = 'IPControllerApp.config_file', + # file = 'IPControllerApp.url_file', + log_level = 'IPControllerApp.log_level', + reuse_files = 'IPControllerApp.reuse_files', + secure = 'IPControllerApp.secure', + ssh = 'IPControllerApp.ssh_server', + usethreads = 'IPControllerApp.usethreads', + import_statements = 'IPControllerApp.import_statements', + location = 'IPControllerApp.location', + + ident = 'StreamSession.session', + user = 'StreamSession.username', + exec_key = 'StreamSession.keyfile', + + url = 'HubFactory.url', + ip = 'HubFactory.ip', + transport = 'HubFactory.transport', + port = 'HubFactory.regport', + + ping = 'HeartMonitor.period', + + scheme = 'TaskScheduler.scheme_name', + hwm = 'TaskScheduler.hwm', + + + profile = "ClusterDir.profile", + cluster_dir = 'ClusterDir.location', - # if hasattr(c.Global, 'reuse_furls'): - # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls - # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls - # del c.Global.reuse_furls - # if hasattr(c.Global, 'secure'): - # c.FCClientServiceFactory.secure = c.Global.secure - # c.FCEngineServiceFactory.secure = c.Global.secure - # del c.Global.secure + )) + flags = Dict(flags) + def save_connection_dict(self, fname, cdict): """save a connection dict to json file.""" - c = self.master_config + c = self.config url = cdict['url'] location = cdict['location'] if not location: @@ -301,43 +189,43 @@ class IPControllerApp(ApplicationWithClusterDir): else: location = socket.gethostbyname_ex(socket.gethostname())[2][-1] cdict['location'] = location - fname = os.path.join(c.Global.security_dir, fname) + fname = os.path.join(self.cluster_dir.security_dir, fname) with open(fname, 'w') as f: f.write(json.dumps(cdict, indent=2)) os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR) def load_config_from_json(self): """load config from existing json connector files.""" - c = self.master_config + c = self.config # load from engine config - with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f: + with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f: cfg = json.loads(f.read()) - key = c.SessionFactory.exec_key = cfg['exec_key'] + key = c.StreamSession.key = cfg['exec_key'] xport,addr = cfg['url'].split('://') c.HubFactory.engine_transport = xport ip,ports = addr.split(':') c.HubFactory.engine_ip = ip c.HubFactory.regport = int(ports) - c.Global.location = cfg['location'] + self.location = cfg['location'] # load client config - with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f: + with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f: cfg = json.loads(f.read()) assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" xport,addr = cfg['url'].split('://') c.HubFactory.client_transport = xport ip,ports = addr.split(':') c.HubFactory.client_ip = ip - c.Global.sshserver = cfg['ssh'] + self.ssh_server = cfg['ssh'] assert int(ports) == c.HubFactory.regport, "regport mismatch" - def construct(self): + def init_hub(self): # This is the working dir by now. sys.path.insert(0, '') - c = self.master_config + c = self.config - self.import_statements() - reusing = c.Global.reuse_files + self.do_import_statements() + reusing = self.reuse_files if reusing: try: self.load_config_from_json() @@ -346,21 +234,20 @@ class IPControllerApp(ApplicationWithClusterDir): # check again, because reusing may have failed: if reusing: pass - elif c.Global.secure: - keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) + elif self.secure: key = str(uuid.uuid4()) - with open(keyfile, 'w') as f: - f.write(key) - os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) - c.SessionFactory.exec_key = key + # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key) + # with open(keyfile, 'w') as f: + # f.write(key) + # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) + c.StreamSession.key = key else: - c.SessionFactory.exec_key = '' - key = None + key = c.StreamSession.key = '' try: - self.factory = ControllerFactory(config=c, logname=self.log.name) - self.start_logging() - self.factory.construct() + self.factory = HubFactory(config=c, log=self.log) + # self.start_logging() + self.factory.init_hub() except: self.log.error("Couldn't construct the Controller", exc_info=True) self.exit(1) @@ -369,21 +256,82 @@ class IPControllerApp(ApplicationWithClusterDir): # save to new json config files f = self.factory cdict = {'exec_key' : key, - 'ssh' : c.Global.sshserver, + 'ssh' : self.ssh_server, 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), - 'location' : c.Global.location + 'location' : self.location } self.save_connection_dict('ipcontroller-client.json', cdict) edict = cdict edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) self.save_connection_dict('ipcontroller-engine.json', edict) + + # + def init_schedulers(self): + children = self.children + mq = import_item(self.mq_class) + hub = self.factory + # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url + # IOPub relay (in a Process) + q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') + q.bind_in(hub.client_info['iopub']) + q.bind_out(hub.engine_info['iopub']) + q.setsockopt_out(zmq.SUBSCRIBE, '') + q.connect_mon(hub.monitor_url) + q.daemon=True + children.append(q) + + # Multiplexer Queue (in a Process) + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') + q.bind_in(hub.client_info['mux']) + q.setsockopt_in(zmq.IDENTITY, 'mux') + q.bind_out(hub.engine_info['mux']) + q.connect_mon(hub.monitor_url) + q.daemon=True + children.append(q) + + # Control Queue (in a Process) + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') + q.bind_in(hub.client_info['control']) + q.setsockopt_in(zmq.IDENTITY, 'control') + q.bind_out(hub.engine_info['control']) + q.connect_mon(hub.monitor_url) + q.daemon=True + children.append(q) + try: + scheme = self.config.TaskScheduler.scheme_name + except AttributeError: + scheme = TaskScheduler.scheme_name.get_default_value() + # Task Queue (in a Process) + if scheme == 'pure': + self.log.warn("task::using pure XREQ Task scheduler") + q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') + # q.setsockopt_out(zmq.HWM, hub.hwm) + q.bind_in(hub.client_info['task'][1]) + q.setsockopt_in(zmq.IDENTITY, 'task') + q.bind_out(hub.engine_info['task']) + q.connect_mon(hub.monitor_url) + q.daemon=True + children.append(q) + elif scheme == 'none': + self.log.warn("task::using no Task scheduler") + + else: + self.log.info("task::using Python %s Task scheduler"%scheme) + sargs = (hub.client_info['task'][1], hub.engine_info['task'], + hub.monitor_url, hub.client_info['notification']) + kwargs = dict(logname=self.log.name, loglevel=self.log_level, + config=dict(self.config)) + q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) + q.daemon=True + children.append(q) + def save_urls(self): """save the registration urls to files.""" - c = self.master_config + c = self.config - sec_dir = c.Global.security_dir + sec_dir = self.cluster_dir.security_dir cf = self.factory with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f: @@ -393,8 +341,8 @@ class IPControllerApp(ApplicationWithClusterDir): f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport)) - def import_statements(self): - statements = self.master_config.Global.import_statements + def do_import_statements(self): + statements = self.import_statements for s in statements: try: self.log.msg("Executing statement: '%s'" % s) @@ -402,21 +350,32 @@ class IPControllerApp(ApplicationWithClusterDir): except: self.log.msg("Error running statement: %s" % s) - def start_logging(self): - super(IPControllerApp, self).start_logging() - if self.master_config.Global.log_url: - context = self.factory.context - lsock = context.socket(zmq.PUB) - lsock.connect(self.master_config.Global.log_url) - handler = PUBHandler(lsock) - handler.root_topic = 'controller' - handler.setLevel(self.log_level) - self.log.addHandler(handler) - # - def start_app(self): + # def start_logging(self): + # super(IPControllerApp, self).start_logging() + # if self.config.Global.log_url: + # context = self.factory.context + # lsock = context.socket(zmq.PUB) + # lsock.connect(self.config.Global.log_url) + # handler = PUBHandler(lsock) + # handler.root_topic = 'controller' + # handler.setLevel(self.log_level) + # self.log.addHandler(handler) + # # + def start(self): # Start the subprocesses: self.factory.start() + child_procs = [] + for child in self.children: + child.start() + if isinstance(child, ProcessMonitoredQueue): + child_procs.append(child.launcher) + elif isinstance(child, Process): + child_procs.append(child) + if child_procs: + signal_children(child_procs) + self.write_pid_file(overwrite=True) + try: self.factory.loop.start() except KeyboardInterrupt: @@ -426,6 +385,22 @@ class IPControllerApp(ApplicationWithClusterDir): def launch_new_instance(): """Create and run the IPython controller""" app = IPControllerApp() + app.parse_command_line() + cl_config = app.config + # app.load_config_file() + app.init_clusterdir() + if app.config_file: + app.load_config_file(app.config_file) + else: + app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) + # command-line should *override* config file, but command-line is necessary + # to determine clusterdir, etc. + app.update_config(cl_config) + + app.to_work_dir() + app.init_hub() + app.init_schedulers() + app.start() diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index 1d4abc0..e2261e0 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -23,17 +23,21 @@ import zmq from zmq.eventloop import ioloop from IPython.parallel.apps.clusterdir import ( - ApplicationWithClusterDir, - ClusterDirConfigLoader + ClusterDirApplication, + ClusterDir, + base_aliases, + # ClusterDirConfigLoader ) from IPython.zmq.log import EnginePUBHandler -from IPython.parallel import factory +from IPython.config.configurable import Configurable +from IPython.parallel.streamsession import StreamSession from IPython.parallel.engine.engine import EngineFactory from IPython.parallel.engine.streamkernel import Kernel from IPython.parallel.util import disambiguate_url from IPython.utils.importstring import import_item +from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr #----------------------------------------------------------------------------- @@ -43,6 +47,20 @@ from IPython.utils.importstring import import_item #: The default config file name for this application default_config_file_name = u'ipengine_config.py' +_description = """Start an IPython engine for parallel computing.\n\n + +IPython engines run in parallel and perform computations on behalf of a client +and controller. A controller needs to be started before the engines. The +engine can be configured using command line options or using a cluster +directory. Cluster directories contain config, log and security files and are +usually located in your ipython directory and named as "cluster_". +See the `profile` and `cluster_dir` options for details. +""" + + +#----------------------------------------------------------------------------- +# MPI configuration +#----------------------------------------------------------------------------- mpi4py_init = """from mpi4py import MPI as mpi mpi.size = mpi.COMM_WORLD.Get_size() @@ -58,123 +76,75 @@ mpi.rank = 0 mpi.size = 0 """ +class MPI(Configurable): + """Configurable for MPI initialization""" + use = Str('', config=True, + help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).' + ) -_description = """Start an IPython engine for parallel computing.\n\n + def _on_use_changed(self, old, new): + # load default init script if it's not set + if not self.init_script: + self.init_script = self.default_inits.get(new, '') + + init_script = Str('', config=True, + help="Initialization code for MPI") + + default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init}, + config=True) -IPython engines run in parallel and perform computations on behalf of a client -and controller. A controller needs to be started before the engines. The -engine can be configured using command line options or using a cluster -directory. Cluster directories contain config, log and security files and are -usually located in your ipython directory and named as "cluster_". -See the --profile and --cluster-dir options for details. -""" #----------------------------------------------------------------------------- -# Command line options +# Main application #----------------------------------------------------------------------------- -class IPEngineAppConfigLoader(ClusterDirConfigLoader): - - def _add_arguments(self): - super(IPEngineAppConfigLoader, self)._add_arguments() - paa = self.parser.add_argument - # Controller config - paa('--file', '-f', - type=unicode, dest='Global.url_file', - help='The full location of the file containing the connection information fo ' - 'controller. If this is not given, the file must be in the ' - 'security directory of the cluster directory. This location is ' - 'resolved using the --profile and --app-dir options.', - metavar='Global.url_file') - # MPI - paa('--mpi', - type=str, dest='MPI.use', - help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).', - metavar='MPI.use') - # Global config - paa('--log-to-file', - action='store_true', dest='Global.log_to_file', - help='Log to a file in the log directory (default is stdout)') - paa('--log-url', - dest='Global.log_url', - help="url of ZMQ logger, as started with iploggerz") - # paa('--execkey', - # type=str, dest='Global.exec_key', - # help='path to a file containing an execution key.', - # metavar='keyfile') - # paa('--no-secure', - # action='store_false', dest='Global.secure', - # help='Turn off execution keys.') - # paa('--secure', - # action='store_true', dest='Global.secure', - # help='Turn on execution keys (default).') - # init command - paa('-c', - type=str, dest='Global.extra_exec_lines', +class IPEngineApp(ClusterDirApplication): + + app_name = Unicode(u'ipengine') + description = Unicode(_description) + default_config_file_name = default_config_file_name + classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI]) + + startup_script = Unicode(u'', config=True, + help='specify a script to be run at startup') + startup_command = Str('', config=True, help='specify a command to be run at startup') - paa('-s', - type=unicode, dest='Global.extra_exec_file', - help='specify a script to be run at startup') - - factory.add_session_arguments(self.parser) - factory.add_registration_arguments(self.parser) + url_file = Unicode(u'', config=True, + help="""The full location of the file containing the connection information for + the controller. If this is not given, the file must be in the + security directory of the cluster directory. This location is + resolved using the `profile` or `cluster_dir` options.""", + ) -#----------------------------------------------------------------------------- -# Main application -#----------------------------------------------------------------------------- + url_file_name = Unicode(u'ipcontroller-engine.json') + aliases = Dict(dict( + config = 'IPEngineApp.config_file', + file = 'IPEngineApp.url_file', + c = 'IPEngineApp.startup_command', + s = 'IPEngineApp.startup_script', -class IPEngineApp(ApplicationWithClusterDir): + ident = 'StreamSession.session', + user = 'StreamSession.username', + exec_key = 'StreamSession.keyfile', - name = u'ipengine' - description = _description - command_line_loader = IPEngineAppConfigLoader - default_config_file_name = default_config_file_name - auto_create_cluster_dir = True - - def create_default_config(self): - super(IPEngineApp, self).create_default_config() - - # The engine should not clean logs as we don't want to remove the - # active log files of other running engines. - self.default_config.Global.clean_logs = False - self.default_config.Global.secure = True - - # Global config attributes - self.default_config.Global.exec_lines = [] - self.default_config.Global.extra_exec_lines = '' - self.default_config.Global.extra_exec_file = u'' - - # Configuration related to the controller - # This must match the filename (path not included) that the controller - # used for the FURL file. - self.default_config.Global.url_file = u'' - self.default_config.Global.url_file_name = u'ipcontroller-engine.json' - # If given, this is the actual location of the controller's FURL file. - # If not, this is computed using the profile, app_dir and furl_file_name - # self.default_config.Global.key_file_name = u'exec_key.key' - # self.default_config.Global.key_file = u'' - - # MPI related config attributes - self.default_config.MPI.use = '' - self.default_config.MPI.mpi4py = mpi4py_init - self.default_config.MPI.pytrilinos = pytrilinos_init - - def post_load_command_line_config(self): - pass - - def pre_construct(self): - super(IPEngineApp, self).pre_construct() - # self.find_cont_url_file() - self.find_url_file() - if self.master_config.Global.extra_exec_lines: - self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines) - if self.master_config.Global.extra_exec_file: - enc = sys.getfilesystemencoding() or 'utf8' - cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc) - self.master_config.Global.exec_lines.append(cmd) + url = 'EngineFactory.url', + ip = 'EngineFactory.ip', + transport = 'EngineFactory.transport', + port = 'EngineFactory.regport', + location = 'EngineFactory.location', + + timeout = 'EngineFactory.timeout', + + profile = "ClusterDir.profile", + cluster_dir = 'ClusterDir.location', + + mpi = 'MPI.use', + + log_level = 'IPEngineApp.log_level', + )) # def find_key_file(self): # """Set the key file. @@ -198,49 +168,58 @@ class IPEngineApp(ApplicationWithClusterDir): Here we don't try to actually see if it exists for is valid as that is hadled by the connection logic. """ - config = self.master_config + config = self.config # Find the actual controller key file - if not config.Global.url_file: - try_this = os.path.join( - config.Global.cluster_dir, - config.Global.security_dir, - config.Global.url_file_name + if not self.url_file: + self.url_file = os.path.join( + self.cluster_dir.security_dir, + self.url_file_name ) - config.Global.url_file = try_this - def construct(self): + def init_engine(self): # This is the working dir by now. sys.path.insert(0, '') - config = self.master_config + config = self.config + # print config + self.find_url_file() + # if os.path.exists(config.Global.key_file) and config.Global.secure: # config.SessionFactory.exec_key = config.Global.key_file - if os.path.exists(config.Global.url_file): - with open(config.Global.url_file) as f: + if os.path.exists(self.url_file): + with open(self.url_file) as f: d = json.loads(f.read()) for k,v in d.iteritems(): if isinstance(v, unicode): d[k] = v.encode() if d['exec_key']: - config.SessionFactory.exec_key = d['exec_key'] + config.StreamSession.key = d['exec_key'] d['url'] = disambiguate_url(d['url'], d['location']) - config.RegistrationFactory.url=d['url'] + config.EngineFactory.url = d['url'] config.EngineFactory.location = d['location'] + try: + exec_lines = config.Kernel.exec_lines + except AttributeError: + config.Kernel.exec_lines = [] + exec_lines = config.Kernel.exec_lines - - config.Kernel.exec_lines = config.Global.exec_lines - - self.start_mpi() + if self.startup_script: + enc = sys.getfilesystemencoding() or 'utf8' + cmd="execfile(%r)"%self.startup_script.encode(enc) + exec_lines.append(cmd) + if self.startup_command: + exec_lines.append(self.startup_command) - # Create the underlying shell class and EngineService + # Create the underlying shell class and Engine # shell_class = import_item(self.master_config.Global.shell_class) + # print self.config try: - self.engine = EngineFactory(config=config, logname=self.log.name) + self.engine = EngineFactory(config=config, log=self.log) except: self.log.error("Couldn't start the Engine", exc_info=True) self.exit(1) - self.start_logging() + # self.start_logging() # Create the service hierarchy # self.main_service = service.MultiService() @@ -258,22 +237,22 @@ class IPEngineApp(ApplicationWithClusterDir): # reactor.callWhenRunning(self.call_connect) - - def start_logging(self): - super(IPEngineApp, self).start_logging() - if self.master_config.Global.log_url: - context = self.engine.context - lsock = context.socket(zmq.PUB) - lsock.connect(self.master_config.Global.log_url) - handler = EnginePUBHandler(self.engine, lsock) - handler.setLevel(self.log_level) - self.log.addHandler(handler) - - def start_mpi(self): + # def start_logging(self): + # super(IPEngineApp, self).start_logging() + # if self.master_config.Global.log_url: + # context = self.engine.context + # lsock = context.socket(zmq.PUB) + # lsock.connect(self.master_config.Global.log_url) + # handler = EnginePUBHandler(self.engine, lsock) + # handler.setLevel(self.log_level) + # self.log.addHandler(handler) + # + def init_mpi(self): global mpi - mpikey = self.master_config.MPI.use - mpi_import_statement = self.master_config.MPI.get(mpikey, None) - if mpi_import_statement is not None: + self.mpi = MPI(config=self.config) + + mpi_import_statement = self.mpi.init_script + if mpi_import_statement: try: self.log.info("Initializing MPI:") self.log.info(mpi_import_statement) @@ -284,7 +263,7 @@ class IPEngineApp(ApplicationWithClusterDir): mpi = None - def start_app(self): + def start(self): self.engine.start() try: self.engine.loop.start() @@ -293,8 +272,27 @@ class IPEngineApp(ApplicationWithClusterDir): def launch_new_instance(): - """Create and run the IPython controller""" + """Create and run the IPython engine""" app = IPEngineApp() + app.parse_command_line() + cl_config = app.config + app.init_clusterdir() + # app.load_config_file() + # print app.config + if app.config_file: + app.load_config_file(app.config_file) + else: + app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location) + + # command-line should *override* config file, but command-line is necessary + # to determine clusterdir, etc. + app.update_config(cl_config) + + # print app.config + app.to_work_dir() + app.init_mpi() + app.init_engine() + print app.config app.start() diff --git a/IPython/parallel/apps/iploggerapp.py b/IPython/parallel/apps/iploggerapp.py index 25ac4c1..76792fb 100755 --- a/IPython/parallel/apps/iploggerapp.py +++ b/IPython/parallel/apps/iploggerapp.py @@ -21,7 +21,7 @@ import sys import zmq from IPython.parallel.apps.clusterdir import ( - ApplicationWithClusterDir, + ClusterDirApplication, ClusterDirConfigLoader ) from IPython.parallel.apps.logwatcher import LogWatcher @@ -74,7 +74,7 @@ class IPLoggerAppConfigLoader(ClusterDirConfigLoader): #----------------------------------------------------------------------------- -class IPLoggerApp(ApplicationWithClusterDir): +class IPLoggerApp(ClusterDirApplication): name = u'iploggerz' description = _description diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 171a248..5dce600 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -106,7 +106,7 @@ class BaseLauncher(LoggingFactory): # This should not be used to set the work_dir for the actual engine # and controller. Instead, use their own config files or the # controller_args, engine_args attributes of the launchers to add - # the --work-dir option. + # the work_dir option. work_dir = Unicode(u'.') loop = Instance('zmq.eventloop.ioloop.IOLoop') @@ -328,16 +328,18 @@ class LocalProcessLauncher(BaseLauncher): class LocalControllerLauncher(LocalProcessLauncher): """Launch a controller as a regular external process.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True) + controller_cmd = List(ipcontroller_cmd_argv, config=True, + help="""Popen command to launch ipcontroller.""") # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True) + controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="""command-line args to pass to ipcontroller""") def find_args(self): return self.controller_cmd + self.controller_args def start(self, cluster_dir): """Start the controller by cluster_dir.""" - self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.controller_args.extend(['cluster_dir=%s'%cluster_dir]) self.cluster_dir = unicode(cluster_dir) self.log.info("Starting LocalControllerLauncher: %r" % self.args) return super(LocalControllerLauncher, self).start() @@ -346,10 +348,11 @@ class LocalControllerLauncher(LocalProcessLauncher): class LocalEngineLauncher(LocalProcessLauncher): """Launch a single engine as a regular externall process.""" - engine_cmd = List(ipengine_cmd_argv, config=True) + engine_cmd = List(ipengine_cmd_argv, config=True, + help="""command to launch the Engine.""") # Command line arguments for ipengine. - engine_args = List( - ['--log-to-file','--log-level', str(logging.INFO)], config=True + engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="command-line arguments to pass to ipengine" ) def find_args(self): @@ -357,7 +360,7 @@ class LocalEngineLauncher(LocalProcessLauncher): def start(self, cluster_dir): """Start the engine by cluster_dir.""" - self.engine_args.extend(['--cluster-dir', cluster_dir]) + self.engine_args.extend(['cluster_dir=%s'%cluster_dir]) self.cluster_dir = unicode(cluster_dir) return super(LocalEngineLauncher, self).start() @@ -367,7 +370,8 @@ class LocalEngineSetLauncher(BaseLauncher): # Command line arguments for ipengine. engine_args = List( - ['--log-to-file','--log-level', str(logging.INFO)], config=True + ['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="command-line arguments to pass to ipengine" ) # launcher class launcher_class = LocalEngineLauncher @@ -442,16 +446,18 @@ class LocalEngineSetLauncher(BaseLauncher): class MPIExecLauncher(LocalProcessLauncher): """Launch an external process using mpiexec.""" - # The mpiexec command to use in starting the process. - mpi_cmd = List(['mpiexec'], config=True) - # The command line arguments to pass to mpiexec. - mpi_args = List([], config=True) - # The program to start using mpiexec. - program = List(['date'], config=True) - # The command line argument to the program. - program_args = List([], config=True) - # The number of instances of the program to start. - n = Int(1, config=True) + mpi_cmd = List(['mpiexec'], config=True, + help="The mpiexec command to use in starting the process." + ) + mpi_args = List([], config=True, + help="The command line arguments to pass to mpiexec." + ) + program = List(['date'], config=True, + help="The program to start via mpiexec.") + program_args = List([], config=True, + help="The command line argument to the program." + ) + n = Int(1) def find_args(self): """Build self.args using all the fields.""" @@ -467,14 +473,17 @@ class MPIExecLauncher(LocalProcessLauncher): class MPIExecControllerLauncher(MPIExecLauncher): """Launch a controller using mpiexec.""" - controller_cmd = List(ipcontroller_cmd_argv, config=True) - # Command line arguments to ipcontroller. - controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True) - n = Int(1, config=False) + controller_cmd = List(ipcontroller_cmd_argv, config=True, + help="Popen command to launch the Contropper" + ) + controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="Command line arguments to pass to ipcontroller." + ) + n = Int(1) def start(self, cluster_dir): """Start the controller by cluster_dir.""" - self.controller_args.extend(['--cluster-dir', cluster_dir]) + self.controller_args.extend(['cluster_dir=%s'%cluster_dir]) self.cluster_dir = unicode(cluster_dir) self.log.info("Starting MPIExecControllerLauncher: %r" % self.args) return super(MPIExecControllerLauncher, self).start(1) @@ -486,16 +495,18 @@ class MPIExecControllerLauncher(MPIExecLauncher): class MPIExecEngineSetLauncher(MPIExecLauncher): - program = List(ipengine_cmd_argv, config=True) - # Command line arguments for ipengine. + program = List(ipengine_cmd_argv, config=True, + help="Popen command for ipengine" + ) program_args = List( - ['--log-to-file','--log-level', str(logging.INFO)], config=True + ['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="Command line arguments for ipengine." ) - n = Int(1, config=True) + n = Int(1) def start(self, n, cluster_dir): """Start n engines by profile or cluster_dir.""" - self.program_args.extend(['--cluster-dir', cluster_dir]) + self.program_args.extend(['cluster_dir=%s'%cluster_dir]) self.cluster_dir = unicode(cluster_dir) self.n = n self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args) @@ -515,13 +526,20 @@ class SSHLauncher(LocalProcessLauncher): as well. """ - ssh_cmd = List(['ssh'], config=True) - ssh_args = List(['-tt'], config=True) - program = List(['date'], config=True) - program_args = List([], config=True) - hostname = CUnicode('', config=True) - user = CUnicode('', config=True) - location = CUnicode('') + ssh_cmd = List(['ssh'], config=True, + help="command for starting ssh") + ssh_args = List(['-tt'], config=True, + help="args to pass to ssh") + program = List(['date'], config=True, + help="Program to launch via ssh") + program_args = List([], config=True, + help="args to pass to remote program") + hostname = CUnicode('', config=True, + help="hostname on which to launch the program") + user = CUnicode('', config=True, + help="username for ssh") + location = CUnicode('', config=True, + help="user@hostname location for ssh in one setting") def _hostname_changed(self, name, old, new): if self.user: @@ -555,21 +573,26 @@ class SSHLauncher(LocalProcessLauncher): class SSHControllerLauncher(SSHLauncher): - program = List(ipcontroller_cmd_argv, config=True) - # Command line arguments to ipcontroller. - program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True) + program = List(ipcontroller_cmd_argv, config=True, + help="remote ipcontroller command.") + program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True, + help="Command line arguments to ipcontroller.") class SSHEngineLauncher(SSHLauncher): - program = List(ipengine_cmd_argv, config=True) + program = List(ipengine_cmd_argv, config=True, + help="remote ipengine command.") # Command line arguments for ipengine. program_args = List( - ['--log-to-file','--log-level', str(logging.INFO)], config=True + ['--log-to-file','log_level=%i'%logging.INFO], config=True, + help="Command line arguments to ipengine." ) class SSHEngineSetLauncher(LocalEngineSetLauncher): launcher_class = SSHEngineLauncher - engines = Dict(config=True) + engines = Dict(config=True, + help="""dict of engines to launch. This is a dict by hostname of ints, + corresponding to the number of engines to start on that host.""") def start(self, n, cluster_dir): """Start engines by profile or cluster_dir. @@ -624,17 +647,19 @@ def find_job_cmd(): class WindowsHPCLauncher(BaseLauncher): - # A regular expression used to get the job id from the output of the - # submit_command. - job_id_regexp = Str(r'\d+', config=True) - # The filename of the instantiated job script. - job_file_name = CUnicode(u'ipython_job.xml', config=True) + job_id_regexp = Str(r'\d+', config=True, + help="""A regular expression used to get the job id from the output of the + submit_command. """ + ) + job_file_name = CUnicode(u'ipython_job.xml', config=True, + help="The filename of the instantiated job script.") # The full path to the instantiated job script. This gets made dynamically # by combining the work_dir with the job_file_name. job_file = CUnicode(u'') - # The hostname of the scheduler to submit the job to - scheduler = CUnicode('', config=True) - job_cmd = CUnicode(find_job_cmd(), config=True) + scheduler = CUnicode('', config=True, + help="The hostname of the scheduler to submit the job to.") + job_cmd = CUnicode(find_job_cmd(), config=True, + help="The command for submitting jobs.") def __init__(self, work_dir=u'.', config=None, **kwargs): super(WindowsHPCLauncher, self).__init__( @@ -702,8 +727,10 @@ class WindowsHPCLauncher(BaseLauncher): class WindowsHPCControllerLauncher(WindowsHPCLauncher): - job_file_name = CUnicode(u'ipcontroller_job.xml', config=True) - extra_args = List([], config=False) + job_file_name = CUnicode(u'ipcontroller_job.xml', config=True, + help="WinHPC xml job file.") + extra_args = List([], config=False, + help="extra args to pass to ipcontroller") def write_job_file(self, n): job = IPControllerJob(config=self.config) @@ -713,7 +740,7 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): # the controller. It is used as the base path for the stdout/stderr # files that the scheduler redirects to. t.work_directory = self.cluster_dir - # Add the --cluster-dir and from self.start(). + # Add the cluster_dir and from self.start(). t.controller_args.extend(self.extra_args) job.add_task(t) @@ -726,15 +753,17 @@ class WindowsHPCControllerLauncher(WindowsHPCLauncher): def start(self, cluster_dir): """Start the controller by cluster_dir.""" - self.extra_args = ['--cluster-dir', cluster_dir] + self.extra_args = ['cluster_dir=%s'%cluster_dir] self.cluster_dir = unicode(cluster_dir) return super(WindowsHPCControllerLauncher, self).start(1) class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): - job_file_name = CUnicode(u'ipengineset_job.xml', config=True) - extra_args = List([], config=False) + job_file_name = CUnicode(u'ipengineset_job.xml', config=True, + help="jobfile for ipengines job") + extra_args = List([], config=False, + help="extra args to pas to ipengine") def write_job_file(self, n): job = IPEngineSetJob(config=self.config) @@ -745,7 +774,7 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): # the engine. It is used as the base path for the stdout/stderr # files that the scheduler redirects to. t.work_directory = self.cluster_dir - # Add the --cluster-dir and from self.start(). + # Add the cluster_dir and from self.start(). t.engine_args.extend(self.extra_args) job.add_task(t) @@ -758,7 +787,7 @@ class WindowsHPCEngineSetLauncher(WindowsHPCLauncher): def start(self, n, cluster_dir): """Start the controller by cluster_dir.""" - self.extra_args = ['--cluster-dir', cluster_dir] + self.extra_args = ['cluster_dir=%s'%cluster_dir] self.cluster_dir = unicode(cluster_dir) return super(WindowsHPCEngineSetLauncher, self).start(n) @@ -782,21 +811,21 @@ class BatchSystemLauncher(BaseLauncher): """ # Subclasses must fill these in. See PBSEngineSet - # The name of the command line program used to submit jobs. - submit_command = List([''], config=True) - # The name of the command line program used to delete jobs. - delete_command = List([''], config=True) - # A regular expression used to get the job id from the output of the - # submit_command. - job_id_regexp = CUnicode('', config=True) - # The string that is the batch script template itself. - batch_template = CUnicode('', config=True) - # The file that contains the batch template - batch_template_file = CUnicode(u'', config=True) - # The filename of the instantiated batch script. - batch_file_name = CUnicode(u'batch_script', config=True) - # The PBS Queue - queue = CUnicode(u'', config=True) + submit_command = List([''], config=True, + help="The name of the command line program used to submit jobs.") + delete_command = List([''], config=True, + help="The name of the command line program used to delete jobs.") + job_id_regexp = CUnicode('', config=True, + help="""A regular expression used to get the job id from the output of the + submit_command.""") + batch_template = CUnicode('', config=True, + help="The string that is the batch script template itself.") + batch_template_file = CUnicode(u'', config=True, + help="The file that contains the batch template.") + batch_file_name = CUnicode(u'batch_script', config=True, + help="The filename of the instantiated batch script.") + queue = CUnicode(u'', config=True, + help="The PBS Queue.") # not configurable, override in subclasses # PBS Job Array regex @@ -891,9 +920,12 @@ class BatchSystemLauncher(BaseLauncher): class PBSLauncher(BatchSystemLauncher): """A BatchSystemLauncher subclass for PBS.""" - submit_command = List(['qsub'], config=True) - delete_command = List(['qdel'], config=True) - job_id_regexp = CUnicode(r'\d+', config=True) + submit_command = List(['qsub'], config=True, + help="The PBS submit command ['qsub']") + delete_command = List(['qdel'], config=True, + help="The PBS delete command ['qsub']") + job_id_regexp = CUnicode(r'\d+', config=True, + help="Regular expresion for identifying the job ID [r'\d+']") batch_file = CUnicode(u'') job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+') @@ -905,11 +937,12 @@ class PBSLauncher(BatchSystemLauncher): class PBSControllerLauncher(PBSLauncher): """Launch a controller using PBS.""" - batch_file_name = CUnicode(u'pbs_controller', config=True) + batch_file_name = CUnicode(u'pbs_controller', config=True, + help="batch file name for the controller job.") default_template= CUnicode("""#!/bin/sh #PBS -V #PBS -N ipcontroller -%s --log-to-file --cluster-dir $cluster_dir +%s --log-to-file cluster_dir $cluster_dir """%(' '.join(ipcontroller_cmd_argv))) def start(self, cluster_dir): @@ -920,11 +953,12 @@ class PBSControllerLauncher(PBSLauncher): class PBSEngineSetLauncher(PBSLauncher): """Launch Engines using PBS""" - batch_file_name = CUnicode(u'pbs_engines', config=True) + batch_file_name = CUnicode(u'pbs_engines', config=True, + help="batch file name for the engine(s) job.") default_template= CUnicode(u"""#!/bin/sh #PBS -V #PBS -N ipengine -%s --cluster-dir $cluster_dir +%s cluster_dir $cluster_dir """%(' '.join(ipengine_cmd_argv))) def start(self, n, cluster_dir): @@ -944,11 +978,12 @@ class SGELauncher(PBSLauncher): class SGEControllerLauncher(SGELauncher): """Launch a controller using SGE.""" - batch_file_name = CUnicode(u'sge_controller', config=True) + batch_file_name = CUnicode(u'sge_controller', config=True, + help="batch file name for the ipontroller job.") default_template= CUnicode(u"""#$$ -V #$$ -S /bin/sh #$$ -N ipcontroller -%s --log-to-file --cluster-dir $cluster_dir +%s --log-to-file cluster_dir=$cluster_dir """%(' '.join(ipcontroller_cmd_argv))) def start(self, cluster_dir): @@ -958,11 +993,12 @@ class SGEControllerLauncher(SGELauncher): class SGEEngineSetLauncher(SGELauncher): """Launch Engines with SGE""" - batch_file_name = CUnicode(u'sge_engines', config=True) + batch_file_name = CUnicode(u'sge_engines', config=True, + help="batch file name for the engine(s) job.") default_template = CUnicode("""#$$ -V #$$ -S /bin/sh #$$ -N ipengine -%s --cluster-dir $cluster_dir +%s cluster_dir=$cluster_dir """%(' '.join(ipengine_cmd_argv))) def start(self, n, cluster_dir): @@ -979,18 +1015,56 @@ class SGEEngineSetLauncher(SGELauncher): class IPClusterLauncher(LocalProcessLauncher): """Launch the ipcluster program in an external process.""" - ipcluster_cmd = List(ipcluster_cmd_argv, config=True) - # Command line arguments to pass to ipcluster. + ipcluster_cmd = List(ipcluster_cmd_argv, config=True, + help="Popen command for ipcluster") ipcluster_args = List( - ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True) + ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True, + help="Command line arguments to pass to ipcluster.") ipcluster_subcommand = Str('start') ipcluster_n = Int(2) def find_args(self): - return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ - ['-n', repr(self.ipcluster_n)] + self.ipcluster_args + return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \ + ['n=%i'%self.ipcluster_n] + self.ipcluster_args def start(self): self.log.info("Starting ipcluster: %r" % self.args) return super(IPClusterLauncher, self).start() +#----------------------------------------------------------------------------- +# Collections of launchers +#----------------------------------------------------------------------------- + +local_launchers = [ + LocalControllerLauncher, + LocalEngineLauncher, + LocalEngineSetLauncher, +] +mpi_launchers = [ + MPIExecLauncher, + MPIExecControllerLauncher, + MPIExecEngineSetLauncher, +] +ssh_launchers = [ + SSHLauncher, + SSHControllerLauncher, + SSHEngineLauncher, + SSHEngineSetLauncher, +] +winhpc_launchers = [ + WindowsHPCLauncher, + WindowsHPCControllerLauncher, + WindowsHPCEngineSetLauncher, +] +pbs_launchers = [ + PBSLauncher, + PBSControllerLauncher, + PBSEngineSetLauncher, +] +sge_launchers = [ + SGELauncher, + SGEControllerLauncher, + SGEEngineSetLauncher, +] +all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\ + + pbs_launchers + sge_launchers \ No newline at end of file diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index c17434f..329f535 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -296,7 +296,7 @@ class Client(HasTraits): if username is None: self.session = ss.StreamSession(**key_arg) else: - self.session = ss.StreamSession(username, **key_arg) + self.session = ss.StreamSession(username=username, **key_arg) self._query_socket = self._context.socket(zmq.XREQ) self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) if self._ssh: diff --git a/IPython/parallel/controller/controller.py b/IPython/parallel/controller/controller.py deleted file mode 100755 index e19ed97..0000000 --- a/IPython/parallel/controller/controller.py +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python -"""The IPython Controller with 0MQ -This is a collection of one Hub and several Schedulers. -""" -#----------------------------------------------------------------------------- -# Copyright (C) 2010 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- -from __future__ import print_function - -from multiprocessing import Process - -import zmq -from zmq.devices import ProcessMonitoredQueue -# internal: -from IPython.utils.importstring import import_item -from IPython.utils.traitlets import Int, CStr, Instance, List, Bool - -from IPython.parallel.util import signal_children -from .hub import Hub, HubFactory -from .scheduler import launch_scheduler - -#----------------------------------------------------------------------------- -# Configurable -#----------------------------------------------------------------------------- - - -class ControllerFactory(HubFactory): - """Configurable for setting up a Hub and Schedulers.""" - - usethreads = Bool(False, config=True) - - # internal - children = List() - mq_class = CStr('zmq.devices.ProcessMonitoredQueue') - - def _usethreads_changed(self, name, old, new): - self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') - - def __init__(self, **kwargs): - super(ControllerFactory, self).__init__(**kwargs) - self.subconstructors.append(self.construct_schedulers) - - def start(self): - super(ControllerFactory, self).start() - child_procs = [] - for child in self.children: - child.start() - if isinstance(child, ProcessMonitoredQueue): - child_procs.append(child.launcher) - elif isinstance(child, Process): - child_procs.append(child) - if child_procs: - signal_children(child_procs) - - - def construct_schedulers(self): - children = self.children - mq = import_item(self.mq_class) - - # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url - # IOPub relay (in a Process) - q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') - q.bind_in(self.client_info['iopub']) - q.bind_out(self.engine_info['iopub']) - q.setsockopt_out(zmq.SUBSCRIBE, '') - q.connect_mon(self.monitor_url) - q.daemon=True - children.append(q) - - # Multiplexer Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') - q.bind_in(self.client_info['mux']) - q.setsockopt_in(zmq.IDENTITY, 'mux') - q.bind_out(self.engine_info['mux']) - q.connect_mon(self.monitor_url) - q.daemon=True - children.append(q) - - # Control Queue (in a Process) - q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') - q.bind_in(self.client_info['control']) - q.setsockopt_in(zmq.IDENTITY, 'control') - q.bind_out(self.engine_info['control']) - q.connect_mon(self.monitor_url) - q.daemon=True - children.append(q) - # Task Queue (in a Process) - if self.scheme == 'pure': - self.log.warn("task::using pure XREQ Task scheduler") - q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') - q.bind_in(self.client_info['task'][1]) - q.setsockopt_in(zmq.IDENTITY, 'task') - q.bind_out(self.engine_info['task']) - q.connect_mon(self.monitor_url) - q.daemon=True - children.append(q) - elif self.scheme == 'none': - self.log.warn("task::using no Task scheduler") - - else: - self.log.info("task::using Python %s Task scheduler"%self.scheme) - sargs = (self.client_info['task'][1], self.engine_info['task'], - self.monitor_url, self.client_info['notification']) - kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, - config=dict(self.config)) - q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs) - q.daemon=True - children.append(q) - diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index 31429ca..cee6467 100644 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -18,7 +18,7 @@ import zmq from zmq.devices import ProcessDevice, ThreadDevice from zmq.eventloop import ioloop, zmqstream -from IPython.utils.traitlets import Set, Instance, CFloat, Bool +from IPython.utils.traitlets import Set, Instance, CFloat, Bool, CStr from IPython.parallel.factory import LoggingFactory class Heart(object): @@ -53,14 +53,16 @@ class HeartMonitor(LoggingFactory): pongstream: an XREP stream period: the period of the heartbeat in milliseconds""" - period=CFloat(1000, config=True) # in milliseconds + period=CFloat(1000, config=True, + help='The frequency at which the Hub pings the engines for heartbeats ' + ' (in ms) [default: 100]', + ) pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') loop = Instance('zmq.eventloop.ioloop.IOLoop') def _loop_default(self): return ioloop.IOLoop.instance() - debug=Bool(False) # not settable: hearts=Set() diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index fc5ec77..1477da8 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -25,7 +25,9 @@ from zmq.eventloop.zmqstream import ZMQStream # internal: from IPython.utils.importstring import import_item -from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool +from IPython.utils.traitlets import ( + HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool, Tuple + ) from IPython.parallel import error, util from IPython.parallel.factory import RegistrationFactory, LoggingFactory @@ -112,59 +114,71 @@ class EngineConnector(HasTraits): class HubFactory(RegistrationFactory): """The Configurable for setting up a Hub.""" - # name of a scheduler scheme - scheme = Str('leastload', config=True) - # port-pairs for monitoredqueues: - hb = Instance(list, config=True) + hb = Tuple(Int,Int,config=True, + help="""XREQ/SUB Port pair for Engine heartbeats""") def _hb_default(self): - return util.select_random_ports(2) + return tuple(util.select_random_ports(2)) + + mux = Tuple(Int,Int,config=True, + help="""Engine/Client Port pair for MUX queue""") - mux = Instance(list, config=True) def _mux_default(self): - return util.select_random_ports(2) + return tuple(util.select_random_ports(2)) - task = Instance(list, config=True) + task = Tuple(Int,Int,config=True, + help="""Engine/Client Port pair for Task queue""") def _task_default(self): - return util.select_random_ports(2) + return tuple(util.select_random_ports(2)) + + control = Tuple(Int,Int,config=True, + help="""Engine/Client Port pair for Control queue""") - control = Instance(list, config=True) def _control_default(self): - return util.select_random_ports(2) + return tuple(util.select_random_ports(2)) + + iopub = Tuple(Int,Int,config=True, + help="""Engine/Client Port pair for IOPub relay""") - iopub = Instance(list, config=True) def _iopub_default(self): - return util.select_random_ports(2) + return tuple(util.select_random_ports(2)) # single ports: - mon_port = Instance(int, config=True) + mon_port = Int(config=True, + help="""Monitor (SUB) port for queue traffic""") + def _mon_port_default(self): return util.select_random_ports(1)[0] - notifier_port = Instance(int, config=True) + notifier_port = Int(config=True, + help="""PUB port for sending engine status notifications""") + def _notifier_port_default(self): return util.select_random_ports(1)[0] - ping = Int(1000, config=True) # ping frequency + engine_ip = CStr('127.0.0.1', config=True, + help="IP on which to listen for engine connections. [default: loopback]") + engine_transport = CStr('tcp', config=True, + help="0MQ transport for engine connections. [default: tcp]") - engine_ip = CStr('127.0.0.1', config=True) - engine_transport = CStr('tcp', config=True) + client_ip = CStr('127.0.0.1', config=True, + help="IP on which to listen for client connections. [default: loopback]") + client_transport = CStr('tcp', config=True, + help="0MQ transport for client connections. [default : tcp]") - client_ip = CStr('127.0.0.1', config=True) - client_transport = CStr('tcp', config=True) - - monitor_ip = CStr('127.0.0.1', config=True) - monitor_transport = CStr('tcp', config=True) + monitor_ip = CStr('127.0.0.1', config=True, + help="IP on which to listen for monitor messages. [default: loopback]") + monitor_transport = CStr('tcp', config=True, + help="0MQ transport for monitor messages. [default : tcp]") monitor_url = CStr('') - db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True) + db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True, + help="""The class to use for the DB backend""") # not configurable db = Instance('IPython.parallel.controller.dictdb.BaseDB') heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor') - subconstructors = List() - _constructed = Bool(False) def _ip_changed(self, name, old, new): self.engine_ip = new @@ -186,24 +200,17 @@ class HubFactory(RegistrationFactory): self._update_monitor_url() # self.on_trait_change(self._sync_ips, 'ip') # self.on_trait_change(self._sync_transports, 'transport') - self.subconstructors.append(self.construct_hub) + # self.subconstructors.append(self.construct_hub) def construct(self): - assert not self._constructed, "already constructed!" - - for subc in self.subconstructors: - subc() - - self._constructed = True - + self.init_hub() def start(self): - assert self._constructed, "must be constructed by self.construct() first!" self.heartmonitor.start() self.log.info("Heartmonitor started") - def construct_hub(self): + def init_hub(self): """construct""" client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i" engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i" @@ -227,7 +234,7 @@ class HubFactory(RegistrationFactory): hrep = ctx.socket(zmq.XREP) hrep.bind(engine_iface % self.hb[1]) self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop), - period=self.ping, logname=self.log.name) + config=self.config) ### Client connections ### # Notifier socket @@ -248,7 +255,11 @@ class HubFactory(RegistrationFactory): # cdir = self.config.Global.cluster_dir self.db = import_item(self.db_class)(session=self.session.session, config=self.config) time.sleep(.25) - + try: + scheme = self.config.TaskScheduler.scheme_name + except AttributeError: + from .scheduler import TaskScheduler + scheme = TaskScheduler.scheme_name.get_default_value() # build connection dicts self.engine_info = { 'control' : engine_iface%self.control[1], @@ -262,7 +273,7 @@ class HubFactory(RegistrationFactory): self.client_info = { 'control' : client_iface%self.control[0], 'mux': client_iface%self.mux[0], - 'task' : (self.scheme, client_iface%self.task[0]), + 'task' : (scheme, client_iface%self.task[0]), 'iopub' : client_iface%self.iopub[0], 'notification': client_iface%self.notifier_port } diff --git a/IPython/parallel/controller/mongodb.py b/IPython/parallel/controller/mongodb.py index 71cf6b9..0ecc16c 100644 --- a/IPython/parallel/controller/mongodb.py +++ b/IPython/parallel/controller/mongodb.py @@ -20,9 +20,20 @@ from .dictdb import BaseDB class MongoDB(BaseDB): """MongoDB TaskRecord backend.""" - connection_args = List(config=True) # args passed to pymongo.Connection - connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection - database = CUnicode(config=True) # name of the mongodb database + connection_args = List(config=True, + help="""Positional arguments to be passed to pymongo.Connection. Only + necessary if the default mongodb configuration does not point to your + mongod instance.""") + connection_kwargs = Dict(config=True, + help="""Keyword arguments to be passed to pymongo.Connection. Only + necessary if the default mongodb configuration does not point to your + mongod instance.""" + ) + database = CUnicode(config=True, + help="""The MongoDB database name to use for storing tasks for this session. If unspecified, + a new database will be created with the Hub's IDENT. Specifying the database will result + in tasks from previous sessions being available via Clients' db_query and + get_result methods.""") _connection = Instance(Connection) # pymongo connection diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 147700c..5f0a615 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -35,7 +35,7 @@ from zmq.eventloop import ioloop, zmqstream # local imports from IPython.external.decorator import decorator from IPython.config.loader import Config -from IPython.utils.traitlets import Instance, Dict, List, Set, Int +from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum from IPython.parallel import error from IPython.parallel.factory import SessionFactory @@ -126,7 +126,19 @@ class TaskScheduler(SessionFactory): """ - hwm = Int(0, config=True) # limit number of outstanding tasks + hwm = Int(0, config=True, shortname='hwm', + help="""specify the High Water Mark (HWM) for the downstream + socket in the Task scheduler. This is the maximum number + of allowed outstanding tasks on each engine.""" + ) + scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'), + 'leastload', config=True, shortname='scheme', allow_none=False, + help="""select the task scheduler scheme [default: Python LRU] + Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'""" + ) + def _scheme_name_changed(self, old, new): + self.log.debug("Using scheme %r"%new) + self.scheme = globals()[new] # input arguments: scheme = Instance(FunctionType, default=leastload) # function for determining the destination @@ -622,7 +634,7 @@ class TaskScheduler(SessionFactory): def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', - log_addr=None, loglevel=logging.DEBUG, scheme='lru', + log_addr=None, loglevel=logging.DEBUG, identity=b'task'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream @@ -646,7 +658,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname= nots.setsockopt(zmq.SUBSCRIBE, '') nots.connect(not_addr) - scheme = globals().get(scheme, None) + # scheme = globals().get(scheme, None) # setup logging if log_addr: connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel) @@ -655,7 +667,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname= scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, - scheme=scheme, loop=loop, logname=logname, + loop=loop, logname=logname, config=config) scheduler.start() try: diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index e542d14..26f1ce8 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -83,9 +83,16 @@ def _convert_bufs(bs): class SQLiteDB(BaseDB): """SQLite3 TaskRecord backend.""" - filename = CUnicode('tasks.db', config=True) - location = CUnicode('', config=True) - table = CUnicode("", config=True) + filename = CUnicode('tasks.db', config=True, + help="""The filename of the sqlite task database. [default: 'tasks.db']""") + location = CUnicode('', config=True, + help="""The directory containing the sqlite task database. The default + is to use the cluster_dir location.""") + table = CUnicode("", config=True, + help="""The SQLite Table to use for storing tasks for this session. If unspecified, + a new table will be created with the Hub's IDENT. Specifying the table will result + in tasks from previous sessions being available via Clients' db_query and + get_result methods.""") _db = Instance('sqlite3.Connection') _keys = List(['msg_id' , diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 9bbf7a0..c7dc6e2 100755 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -33,13 +33,22 @@ class EngineFactory(RegistrationFactory): """IPython engine""" # configurables: - user_ns=Dict(config=True) - out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True) - display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True) - location=Str(config=True) - timeout=CFloat(2,config=True) + out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True, + help="""The OutStream for handling stdout/err. + Typically 'IPython.zmq.iostream.OutStream'""") + display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True, + help="""The class for handling displayhook. + Typically 'IPython.zmq.displayhook.DisplayHook'""") + location=Str(config=True, + help="""The location (an IP address) of the controller. This is + used for disambiguating URLs, to determine whether + loopback should be used to connect or the public address.""") + timeout=CFloat(2,config=True, + help="""The time (in seconds) to wait for the Controller to respond + to registration requests before giving up.""") # not configurable: + user_ns=Dict() id=Int(allow_none=True) registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') kernel=Instance(Kernel) @@ -47,6 +56,7 @@ class EngineFactory(RegistrationFactory): def __init__(self, **kwargs): super(EngineFactory, self).__init__(**kwargs) + self.ident = self.session.session ctx = self.context reg = ctx.socket(zmq.XREQ) @@ -127,7 +137,7 @@ class EngineFactory(RegistrationFactory): self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, - loop=loop, user_ns = self.user_ns, logname=self.log.name) + loop=loop, user_ns = self.user_ns, log=self.log) self.kernel.start() hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ] heart = Heart(*map(str, hb_addrs), heart_id=identity) @@ -143,7 +153,7 @@ class EngineFactory(RegistrationFactory): def abort(self): - self.log.fatal("Registration timed out") + self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) time.sleep(1) sys.exit(255) diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 6c03e06..668bba7 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -28,7 +28,7 @@ import zmq from zmq.eventloop import ioloop, zmqstream # Local imports. -from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str +from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str, CStr from IPython.zmq.completer import KernelCompleter from IPython.parallel.error import wrap_exception @@ -64,9 +64,11 @@ class Kernel(SessionFactory): #--------------------------------------------------------------------------- # kwargs: - int_id = Int(-1, config=True) - user_ns = Dict(config=True) - exec_lines = List(config=True) + exec_lines = List(CStr, config=True, + help="List of lines to execute") + + int_id = Int(-1) + user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") control_stream = Instance(zmqstream.ZMQStream) task_stream = Instance(zmqstream.ZMQStream) diff --git a/IPython/parallel/factory.py b/IPython/parallel/factory.py index 4cd1ffc..593ff56 100644 --- a/IPython/parallel/factory.py +++ b/IPython/parallel/factory.py @@ -14,12 +14,10 @@ import logging import os -import uuid from zmq.eventloop.ioloop import IOLoop from IPython.config.configurable import Configurable -from IPython.utils.importstring import import_item from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr import IPython.parallel.streamsession as ss @@ -39,13 +37,6 @@ class LoggingFactory(Configurable): class SessionFactory(LoggingFactory): """The Base factory from which every factory in IPython.parallel inherits""" - packer = Str('',config=True) - unpacker = Str('',config=True) - ident = CStr('',config=True) - def _ident_default(self): - return str(uuid.uuid4()) - username = CUnicode(os.environ.get('USER','username'),config=True) - exec_key = CUnicode('',config=True) # not configurable: context = Instance('zmq.Context', (), {}) session = Instance('IPython.parallel.streamsession.StreamSession') @@ -56,33 +47,28 @@ class SessionFactory(LoggingFactory): def __init__(self, **kwargs): super(SessionFactory, self).__init__(**kwargs) - exec_key = self.exec_key or None - # set the packers: - if not self.packer: - packer_f = unpacker_f = None - elif self.packer.lower() == 'json': - packer_f = ss.json_packer - unpacker_f = ss.json_unpacker - elif self.packer.lower() == 'pickle': - packer_f = ss.pickle_packer - unpacker_f = ss.pickle_unpacker - else: - packer_f = import_item(self.packer) - unpacker_f = import_item(self.unpacker) # construct the session - self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, key=exec_key) + self.session = ss.StreamSession(**kwargs) class RegistrationFactory(SessionFactory): """The Base Configurable for objects that involve registration.""" - url = Str('', config=True) # url takes precedence over ip,regport,transport - transport = Str('tcp', config=True) - ip = Str('127.0.0.1', config=True) - regport = Instance(int, config=True) + url = Str('', config=True, + help="""The 0MQ url used for registration. This sets transport, ip, and port + in one variable. For example: url='tcp://127.0.0.1:12345' or + url='epgm://*:90210'""") # url takes precedence over ip,regport,transport + transport = Str('tcp', config=True, + help="""The 0MQ transport for communications. This will likely be + the default of 'tcp', but other values include 'ipc', 'epgm', 'inproc'.""") + ip = Str('127.0.0.1', config=True, + help="""The IP address for registration. This is generally either + '127.0.0.1' for loopback only or '*' for all interfaces. + [default: '127.0.0.1']""") + regport = Int(config=True, + help="""The port on which the Hub listens for registration.""") def _regport_default(self): - # return 10101 return select_random_ports(1)[0] def __init__(self, **kwargs): @@ -107,46 +93,3 @@ class RegistrationFactory(SessionFactory): self.ip = iface[0] if iface[1]: self.regport = int(iface[1]) - -#----------------------------------------------------------------------------- -# argparse argument extenders -#----------------------------------------------------------------------------- - - -def add_session_arguments(parser): - paa = parser.add_argument - paa('--ident', - type=str, dest='SessionFactory.ident', - help='set the ZMQ and session identity [default: random uuid]', - metavar='identity') - # paa('--execkey', - # type=str, dest='SessionFactory.exec_key', - # help='path to a file containing an execution key.', - # metavar='execkey') - paa('--packer', - type=str, dest='SessionFactory.packer', - help='method to serialize messages: {json,pickle} [default: json]', - metavar='packer') - paa('--unpacker', - type=str, dest='SessionFactory.unpacker', - help='inverse function of `packer`. Only necessary when using something other than json|pickle', - metavar='packer') - -def add_registration_arguments(parser): - paa = parser.add_argument - paa('--ip', - type=str, dest='RegistrationFactory.ip', - help="The IP used for registration [default: localhost]", - metavar='ip') - paa('--transport', - type=str, dest='RegistrationFactory.transport', - help="The ZeroMQ transport used for registration [default: tcp]", - metavar='transport') - paa('--url', - type=str, dest='RegistrationFactory.url', - help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101', - metavar='url') - paa('--regport', - type=int, dest='RegistrationFactory.regport', - help="The port used for registration [default: 10101]", - metavar='ip') diff --git a/IPython/parallel/streamsession.py b/IPython/parallel/streamsession.py index 570981c..5c32313 100644 --- a/IPython/parallel/streamsession.py +++ b/IPython/parallel/streamsession.py @@ -25,8 +25,13 @@ import zmq from zmq.utils import jsonapi from zmq.eventloop.zmqstream import ZMQStream +from IPython.config.configurable import Configurable +from IPython.utils.importstring import import_item +from IPython.utils.traitlets import Str, CStr, CUnicode, Bool, Any + from .util import ISO8601 + def squash_unicode(obj): """coerce unicode back to bytestrings.""" if isinstance(obj,dict): @@ -113,50 +118,72 @@ def extract_header(msg_or_header): h = dict(h) return h -class StreamSession(object): +class StreamSession(Configurable): """tweaked version of IPython.zmq.session.Session, for development in Parallel""" - debug=False - key=None - - def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None): - if username is None: - username = os.environ.get('USER','username') - self.username = username - if session is None: - self.session = str(uuid.uuid4()) + debug=Bool(False, config=True, help="""Debug output in the StreamSession""") + packer = Str('json',config=True, + help="""The name of the packer for serializing messages. + Should be one of 'json', 'pickle', or an import name + for a custom serializer.""") + def _packer_changed(self, name, old, new): + if new.lower() == 'json': + self.pack = json_packer + self.unpack = json_unpacker + elif new.lower() == 'pickle': + self.pack = pickle_packer + self.unpack = pickle_unpacker else: - self.session = session - self.msg_id = str(uuid.uuid4()) - if packer is None: - self.pack = default_packer + self.pack = import_item(new) + + unpacker = Str('json',config=True, + help="""The name of the unpacker for unserializing messages. + Only used with custom functions for `packer`.""") + def _unpacker_changed(self, name, old, new): + if new.lower() == 'json': + self.pack = json_packer + self.unpack = json_unpacker + elif new.lower() == 'pickle': + self.pack = pickle_packer + self.unpack = pickle_unpacker else: - if not callable(packer): - raise TypeError("packer must be callable, not %s"%type(packer)) - self.pack = packer + self.unpack = import_item(new) - if unpacker is None: - self.unpack = default_unpacker - else: - if not callable(unpacker): - raise TypeError("unpacker must be callable, not %s"%type(unpacker)) - self.unpack = unpacker + session = CStr('',config=True, + help="""The UUID identifying this session.""") + def _session_default(self): + return str(uuid.uuid4()) + username = CUnicode(os.environ.get('USER','username'),config=True, + help="""Username for the Session. Default is your system username.""") + key = CStr('', config=True, + help="""execution key, for extra authentication.""") + + keyfile = CUnicode('', config=True, + help="""path to file containing execution key.""") + def _keyfile_changed(self, name, old, new): + with open(new, 'rb') as f: + self.key = f.read().strip() + + pack = Any(default_packer) # the actual packer function + def _pack_changed(self, name, old, new): + if not callable(new): + raise TypeError("packer must be callable, not %s"%type(new)) - if key is not None and keyfile is not None: - raise TypeError("Must specify key OR keyfile, not both") - if keyfile is not None: - with open(keyfile) as f: - self.key = f.read().strip() - else: - self.key = key - if isinstance(self.key, unicode): - self.key = self.key.encode('utf8') - # print key, keyfile, self.key + unpack = Any(default_unpacker) # the actual packer function + def _unpack_changed(self, name, old, new): + if not callable(new): + raise TypeError("packer must be callable, not %s"%type(new)) + + def __init__(self, **kwargs): + super(StreamSession, self).__init__(**kwargs) self.none = self.pack({}) - + + @property + def msg_id(self): + """always return new uuid""" + return str(uuid.uuid4()) + def msg_header(self, msg_type): - h = msg_header(self.msg_id, msg_type, self.username, self.session) - self.msg_id = str(uuid.uuid4()) - return h + return msg_header(self.msg_id, msg_type, self.username, self.session) def msg(self, msg_type, content=None, parent=None, subheader=None): msg = {} @@ -171,10 +198,10 @@ class StreamSession(object): def check_key(self, msg_or_header): """Check that a message's header has the right key""" - if self.key is None: + if not self.key: return True header = extract_header(msg_or_header) - return header.get('key', None) == self.key + return header.get('key', '') == self.key def serialize(self, msg, ident=None): @@ -200,7 +227,7 @@ class StreamSession(object): elif ident is not None: to_send.append(ident) to_send.append(DELIM) - if self.key is not None: + if self.key: to_send.append(self.key) to_send.append(self.pack(msg['header'])) to_send.append(self.pack(msg['parent_header'])) @@ -297,7 +324,7 @@ class StreamSession(object): if ident is not None: to_send.extend(ident) to_send.append(DELIM) - if self.key is not None: + if self.key: to_send.append(self.key) to_send.extend(msg) stream.send_multipart(msg, flags, copy=copy) @@ -345,7 +372,7 @@ class StreamSession(object): msg will be a list of bytes or Messages, unchanged from input msg should be unpackable via self.unpack_message at this point. """ - ikey = int(self.key is not None) + ikey = int(self.key != '') minlen = 3 + ikey msg = list(msg) idents = [] @@ -379,7 +406,7 @@ class StreamSession(object): or the non-copying Message object in each place (False) """ - ikey = int(self.key is not None) + ikey = int(self.key != '') minlen = 3 + ikey message = {} if not copy: