diff --git a/IPython/utils/traitlets.py b/IPython/utils/traitlets.py index b3a3012..f570c62 100644 --- a/IPython/utils/traitlets.py +++ b/IPython/utils/traitlets.py @@ -60,7 +60,7 @@ from .importstring import import_item ClassTypes = (ClassType, type) -SequenceTypes = (ListType, TupleType) +SequenceTypes = (ListType, TupleType, set, frozenset) #----------------------------------------------------------------------------- # Basic classes @@ -1018,7 +1018,7 @@ class List(Instance): """An instance of a Python list.""" def __init__(self, default_value=None, allow_none=True, **metadata): - """Create a list trait type from a list or tuple. + """Create a list trait type from a list, set, or tuple. The default value is created by doing ``list(default_value)``, which creates a copy of the ``default_value``. @@ -1034,6 +1034,26 @@ class List(Instance): allow_none=allow_none, **metadata) +class Set(Instance): + """An instance of a Python set.""" + + def __init__(self, default_value=None, allow_none=True, **metadata): + """Create a set trait type from a set, list, or tuple. + + The default value is created by doing ``set(default_value)``, + which creates a copy of the ``default_value``. + """ + if default_value is None: + args = ((),) + elif isinstance(default_value, SequenceTypes): + args = (default_value,) + else: + raise TypeError('default value of Set was %s' % default_value) + + super(Set,self).__init__(klass=set, args=args, + allow_none=allow_none, **metadata) + + class Dict(Instance): """An instance of a Python dict.""" diff --git a/IPython/zmq/parallel/clusterdir.py b/IPython/zmq/parallel/clusterdir.py new file mode 100755 index 0000000..b89f84b --- /dev/null +++ b/IPython/zmq/parallel/clusterdir.py @@ -0,0 +1,549 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The IPython cluster directory +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import with_statement + +import os +import shutil +import sys +import logging +import warnings + +from IPython.config.loader import PyFileConfigLoader +from IPython.core.application import Application, BaseAppConfigLoader +from IPython.config.configurable import Configurable +from IPython.core.crashhandler import CrashHandler +from IPython.core import release +from IPython.utils.path import ( + get_ipython_package_dir, + expand_path +) +from IPython.utils.traitlets import Unicode + +#----------------------------------------------------------------------------- +# Warnings control +#----------------------------------------------------------------------------- +# Twisted generates annoying warnings with Python 2.6, as will do other code +# that imports 'sets' as of today +warnings.filterwarnings('ignore', 'the sets module is deprecated', + DeprecationWarning ) + +# This one also comes from Twisted +warnings.filterwarnings('ignore', 'the sha module is deprecated', + DeprecationWarning) + +#----------------------------------------------------------------------------- +# Module errors +#----------------------------------------------------------------------------- + +class ClusterDirError(Exception): + pass + + +class PIDFileError(Exception): + pass + + +#----------------------------------------------------------------------------- +# Class for managing cluster directories +#----------------------------------------------------------------------------- + +class ClusterDir(Configurable): + """An object to manage the cluster directory and its resources. + + The cluster directory is used by :command:`ipengine`, + :command:`ipcontroller` and :command:`ipclsuter` to manage the + configuration, logging and security of these applications. + + This object knows how to find, create and manage these directories. This + should be used by any code that want's to handle cluster directories. + """ + + security_dir_name = Unicode('security') + log_dir_name = Unicode('log') + pid_dir_name = Unicode('pid') + security_dir = Unicode(u'') + log_dir = Unicode(u'') + pid_dir = Unicode(u'') + location = Unicode(u'') + + def __init__(self, location=u''): + super(ClusterDir, self).__init__(location=location) + + def _location_changed(self, name, old, new): + if not os.path.isdir(new): + os.makedirs(new) + self.security_dir = os.path.join(new, self.security_dir_name) + self.log_dir = os.path.join(new, self.log_dir_name) + self.pid_dir = os.path.join(new, self.pid_dir_name) + self.check_dirs() + + def _log_dir_changed(self, name, old, new): + self.check_log_dir() + + def check_log_dir(self): + if not os.path.isdir(self.log_dir): + os.mkdir(self.log_dir) + + def _security_dir_changed(self, name, old, new): + self.check_security_dir() + + def check_security_dir(self): + if not os.path.isdir(self.security_dir): + os.mkdir(self.security_dir, 0700) + os.chmod(self.security_dir, 0700) + + def _pid_dir_changed(self, name, old, new): + self.check_pid_dir() + + def check_pid_dir(self): + if not os.path.isdir(self.pid_dir): + os.mkdir(self.pid_dir, 0700) + os.chmod(self.pid_dir, 0700) + + def check_dirs(self): + self.check_security_dir() + self.check_log_dir() + self.check_pid_dir() + + def load_config_file(self, filename): + """Load a config file from the top level of the cluster dir. + + Parameters + ---------- + filename : unicode or str + The filename only of the config file that must be located in + the top-level of the cluster directory. + """ + loader = PyFileConfigLoader(filename, self.location) + return loader.load_config() + + def copy_config_file(self, config_file, path=None, overwrite=False): + """Copy a default config file into the active cluster directory. + + Default configuration files are kept in :mod:`IPython.config.default`. + This function moves these from that location to the working cluster + directory. + """ + if path is None: + import IPython.config.default + path = IPython.config.default.__file__.split(os.path.sep)[:-1] + path = os.path.sep.join(path) + src = os.path.join(path, config_file) + dst = os.path.join(self.location, config_file) + if not os.path.isfile(dst) or overwrite: + shutil.copy(src, dst) + + def copy_all_config_files(self, path=None, overwrite=False): + """Copy all config files into the active cluster directory.""" + for f in [u'ipcontroller_config.py', u'ipengine_config.py', + u'ipcluster_config.py']: + self.copy_config_file(f, path=path, overwrite=overwrite) + + @classmethod + def create_cluster_dir(csl, cluster_dir): + """Create a new cluster directory given a full path. + + Parameters + ---------- + cluster_dir : str + The full path to the cluster directory. If it does exist, it will + be used. If not, it will be created. + """ + return ClusterDir(location=cluster_dir) + + @classmethod + def create_cluster_dir_by_profile(cls, path, profile=u'default'): + """Create a cluster dir by profile name and path. + + Parameters + ---------- + path : str + The path (directory) to put the cluster directory in. + profile : str + The name of the profile. The name of the cluster directory will + be "clusterz_". + """ + if not os.path.isdir(path): + raise ClusterDirError('Directory not found: %s' % path) + cluster_dir = os.path.join(path, u'clusterz_' + profile) + return ClusterDir(location=cluster_dir) + + @classmethod + def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'): + """Find an existing cluster dir by profile name, return its ClusterDir. + + This searches through a sequence of paths for a cluster dir. If it + is not found, a :class:`ClusterDirError` exception will be raised. + + The search path algorithm is: + 1. ``os.getcwd()`` + 2. ``ipython_dir`` + 3. The directories found in the ":" separated + :env:`IPCLUSTER_DIR_PATH` environment variable. + + Parameters + ---------- + ipython_dir : unicode or str + The IPython directory to use. + profile : unicode or str + The name of the profile. The name of the cluster directory + will be "clusterz_". + """ + dirname = u'clusterz_' + profile + cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','') + if cluster_dir_paths: + cluster_dir_paths = cluster_dir_paths.split(':') + else: + cluster_dir_paths = [] + paths = [os.getcwd(), ipython_dir] + cluster_dir_paths + for p in paths: + cluster_dir = os.path.join(p, dirname) + if os.path.isdir(cluster_dir): + return ClusterDir(location=cluster_dir) + else: + raise ClusterDirError('Cluster directory not found in paths: %s' % dirname) + + @classmethod + def find_cluster_dir(cls, cluster_dir): + """Find/create a cluster dir and return its ClusterDir. + + This will create the cluster directory if it doesn't exist. + + Parameters + ---------- + cluster_dir : unicode or str + The path of the cluster directory. This is expanded using + :func:`IPython.utils.genutils.expand_path`. + """ + cluster_dir = expand_path(cluster_dir) + if not os.path.isdir(cluster_dir): + raise ClusterDirError('Cluster directory not found: %s' % cluster_dir) + return ClusterDir(location=cluster_dir) + + +#----------------------------------------------------------------------------- +# Command line options +#----------------------------------------------------------------------------- + +class ClusterDirConfigLoader(BaseAppConfigLoader): + + def _add_cluster_profile(self, parser): + paa = parser.add_argument + paa('-p', '--profile', + dest='Global.profile',type=unicode, + help= + """The string name of the profile to be used. This determines the name + of the cluster dir as: cluster_. The default profile is named + 'default'. The cluster directory is resolve this way if the + --cluster-dir option is not used.""", + metavar='Global.profile') + + def _add_cluster_dir(self, parser): + paa = parser.add_argument + paa('--cluster-dir', + dest='Global.cluster_dir',type=unicode, + help="""Set the cluster dir. This overrides the logic used by the + --profile option.""", + metavar='Global.cluster_dir') + + def _add_work_dir(self, parser): + paa = parser.add_argument + paa('--work-dir', + dest='Global.work_dir',type=unicode, + help='Set the working dir for the process.', + metavar='Global.work_dir') + + def _add_clean_logs(self, parser): + paa = parser.add_argument + paa('--clean-logs', + dest='Global.clean_logs', action='store_true', + help='Delete old log flies before starting.') + + def _add_no_clean_logs(self, parser): + paa = parser.add_argument + paa('--no-clean-logs', + dest='Global.clean_logs', action='store_false', + help="Don't Delete old log flies before starting.") + + def _add_arguments(self): + super(ClusterDirConfigLoader, self)._add_arguments() + self._add_cluster_profile(self.parser) + self._add_cluster_dir(self.parser) + self._add_work_dir(self.parser) + self._add_clean_logs(self.parser) + self._add_no_clean_logs(self.parser) + + +#----------------------------------------------------------------------------- +# Crash handler for this application +#----------------------------------------------------------------------------- + + +_message_template = """\ +Oops, $self.app_name crashed. We do our best to make it stable, but... + +A crash report was automatically generated with the following information: + - A verbatim copy of the crash traceback. + - Data on your current $self.app_name configuration. + +It was left in the file named: +\t'$self.crash_report_fname' +If you can email this file to the developers, the information in it will help +them in understanding and correcting the problem. + +You can mail it to: $self.contact_name at $self.contact_email +with the subject '$self.app_name Crash Report'. + +If you want to do it now, the following command will work (under Unix): +mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname + +To ensure accurate tracking of this issue, please file a report about it at: +$self.bug_tracker +""" + +class ClusterDirCrashHandler(CrashHandler): + """sys.excepthook for IPython itself, leaves a detailed report on disk.""" + + message_template = _message_template + + def __init__(self, app): + contact_name = release.authors['Brian'][0] + contact_email = release.authors['Brian'][1] + bug_tracker = 'http://github.com/ipython/ipython/issues' + super(ClusterDirCrashHandler,self).__init__( + app, contact_name, contact_email, bug_tracker + ) + + +#----------------------------------------------------------------------------- +# Main application +#----------------------------------------------------------------------------- + +class ApplicationWithClusterDir(Application): + """An application that puts everything into a cluster directory. + + Instead of looking for things in the ipython_dir, this type of application + will use its own private directory called the "cluster directory" + for things like config files, log files, etc. + + The cluster directory is resolved as follows: + + * If the ``--cluster-dir`` option is given, it is used. + * If ``--cluster-dir`` is not given, the application directory is + resolve using the profile name as ``cluster_``. The search + path for this directory is then i) cwd if it is found there + and ii) in ipython_dir otherwise. + + The config file for the application is to be put in the cluster + dir and named the value of the ``config_file_name`` class attribute. + """ + + command_line_loader = ClusterDirConfigLoader + crash_handler_class = ClusterDirCrashHandler + auto_create_cluster_dir = True + # temporarily override default_log_level to DEBUG + default_log_level = logging.DEBUG + + def create_default_config(self): + super(ApplicationWithClusterDir, self).create_default_config() + self.default_config.Global.profile = u'default' + self.default_config.Global.cluster_dir = u'' + self.default_config.Global.work_dir = os.getcwd() + self.default_config.Global.log_to_file = False + self.default_config.Global.log_url = None + self.default_config.Global.clean_logs = False + + def find_resources(self): + """This resolves the cluster directory. + + This tries to find the cluster directory and if successful, it will + have done: + * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for + the application. + * Sets ``self.cluster_dir`` attribute of the application and config + objects. + + The algorithm used for this is as follows: + 1. Try ``Global.cluster_dir``. + 2. Try using ``Global.profile``. + 3. If both of these fail and ``self.auto_create_cluster_dir`` is + ``True``, then create the new cluster dir in the IPython directory. + 4. If all fails, then raise :class:`ClusterDirError`. + """ + + try: + cluster_dir = self.command_line_config.Global.cluster_dir + except AttributeError: + cluster_dir = self.default_config.Global.cluster_dir + cluster_dir = expand_path(cluster_dir) + try: + self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) + except ClusterDirError: + pass + else: + self.log.info('Using existing cluster dir: %s' % \ + self.cluster_dir_obj.location + ) + self.finish_cluster_dir() + return + + try: + self.profile = self.command_line_config.Global.profile + except AttributeError: + self.profile = self.default_config.Global.profile + try: + self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( + self.ipython_dir, self.profile) + except ClusterDirError: + pass + else: + self.log.info('Using existing cluster dir: %s' % \ + self.cluster_dir_obj.location + ) + self.finish_cluster_dir() + return + + if self.auto_create_cluster_dir: + self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( + self.ipython_dir, self.profile + ) + self.log.info('Creating new cluster dir: %s' % \ + self.cluster_dir_obj.location + ) + self.finish_cluster_dir() + else: + raise ClusterDirError('Could not find a valid cluster directory.') + + def finish_cluster_dir(self): + # Set the cluster directory + self.cluster_dir = self.cluster_dir_obj.location + + # These have to be set because they could be different from the one + # that we just computed. Because command line has the highest + # priority, this will always end up in the master_config. + self.default_config.Global.cluster_dir = self.cluster_dir + self.command_line_config.Global.cluster_dir = self.cluster_dir + + def find_config_file_name(self): + """Find the config file name for this application.""" + # For this type of Application it should be set as a class attribute. + if not hasattr(self, 'default_config_file_name'): + self.log.critical("No config filename found") + else: + self.config_file_name = self.default_config_file_name + + def find_config_file_paths(self): + # Set the search path to to the cluster directory. We should NOT + # include IPython.config.default here as the default config files + # are ALWAYS automatically moved to the cluster directory. + conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default') + self.config_file_paths = (self.cluster_dir,) + + def pre_construct(self): + # The log and security dirs were set earlier, but here we put them + # into the config and log them. + config = self.master_config + sdir = self.cluster_dir_obj.security_dir + self.security_dir = config.Global.security_dir = sdir + ldir = self.cluster_dir_obj.log_dir + self.log_dir = config.Global.log_dir = ldir + pdir = self.cluster_dir_obj.pid_dir + self.pid_dir = config.Global.pid_dir = pdir + self.log.info("Cluster directory set to: %s" % self.cluster_dir) + config.Global.work_dir = unicode(expand_path(config.Global.work_dir)) + # Change to the working directory. We do this just before construct + # is called so all the components there have the right working dir. + self.to_work_dir() + + def to_work_dir(self): + wd = self.master_config.Global.work_dir + if unicode(wd) != unicode(os.getcwd()): + os.chdir(wd) + self.log.info("Changing to working dir: %s" % wd) + + def start_logging(self): + # Remove old log files + if self.master_config.Global.clean_logs: + log_dir = self.master_config.Global.log_dir + for f in os.listdir(log_dir): + if f.startswith(self.name + u'-') and f.endswith('.log'): + os.remove(os.path.join(log_dir, f)) + # Start logging to the new log file + if self.master_config.Global.log_to_file: + log_filename = self.name + u'-' + str(os.getpid()) + u'.log' + logfile = os.path.join(self.log_dir, log_filename) + open_log_file = open(logfile, 'w') + elif self.master_config.Global.log_url: + open_log_file = None + else: + open_log_file = sys.stdout + logger = logging.getLogger() + level = self.log_level + self.log = logger + # since we've reconnected the logger, we need to reconnect the log-level + self.log_level = level + if open_log_file is not None and self._log_handler not in self.log.handlers: + self.log.addHandler(self._log_handler) + # log.startLogging(open_log_file) + + def write_pid_file(self, overwrite=False): + """Create a .pid file in the pid_dir with my pid. + + This must be called after pre_construct, which sets `self.pid_dir`. + This raises :exc:`PIDFileError` if the pid file exists already. + """ + pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + if os.path.isfile(pid_file): + pid = self.get_pid_from_file() + if not overwrite: + raise PIDFileError( + 'The pid file [%s] already exists. \nThis could mean that this ' + 'server is already running with [pid=%s].' % (pid_file, pid) + ) + with open(pid_file, 'w') as f: + self.log.info("Creating pid file: %s" % pid_file) + f.write(repr(os.getpid())+'\n') + + def remove_pid_file(self): + """Remove the pid file. + + This should be called at shutdown by registering a callback with + :func:`reactor.addSystemEventTrigger`. This needs to return + ``None``. + """ + pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + if os.path.isfile(pid_file): + try: + self.log.info("Removing pid file: %s" % pid_file) + os.remove(pid_file) + except: + self.log.warn("Error removing the pid file: %s" % pid_file) + + def get_pid_from_file(self): + """Get the pid from the pid file. + + If the pid file doesn't exist a :exc:`PIDFileError` is raised. + """ + pid_file = os.path.join(self.pid_dir, self.name + u'.pid') + if os.path.isfile(pid_file): + with open(pid_file, 'r') as f: + pid = int(f.read().strip()) + return pid + else: + raise PIDFileError('pid file not found: %s' % pid_file) + diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 3d3fed1..c052ee9 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -16,6 +16,7 @@ and monitors traffic through the various queues. from __future__ import print_function import os +import sys import time import logging from multiprocessing import Process @@ -23,12 +24,13 @@ from multiprocessing import Process import zmq from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream -from zmq.devices import ProcessMonitoredQueue +# from zmq.devices import ProcessMonitoredQueue # internal: +from IPython.utils.importstring import import_item +from IPython.utils.traitlets import Int, Str, Instance, List, Bool from IPython.zmq.entry_point import bind_port -from hub import Hub from entry_point import (make_base_argument_parser, select_random_ports, split_ports, connect_logger, parse_url, signal_children, generate_exec_key, local_logger) @@ -37,6 +39,7 @@ from entry_point import (make_base_argument_parser, select_random_ports, split_p import streamsession as session import heartmonitor from scheduler import launch_scheduler +from hub import Hub, HubFactory from dictdb import DictDB try: @@ -81,8 +84,84 @@ def make_argument_parser(): help='Manually specify the session id.') return parser + +class ControllerFactory(HubFactory): + """Configurable for setting up a Hub and Schedulers.""" + + scheme = Str('pure', config=True) + usethreads = Bool(False, config=True) + + # internal + children = List() + mq_class = Str('zmq.devices.ProcessMonitoredQueue') + + def _update_mq(self): + self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process') + + def __init__(self, **kwargs): + super(ControllerFactory, self).__init__(**kwargs) + self.subconstructors.append(self.construct_schedulers) + self._update_mq() + self.on_trait_change(self._update_mq, 'usethreads') + + def start(self): + super(ControllerFactory, self).start() + for child in self.children: + child.start() + if not self.usethreads: + signal_children([ getattr(c, 'launcher', c) for c in self.children ]) + + + def construct_schedulers(self): + children = self.children + mq = import_item(self.mq_class) + + # IOPub relay (in a Process) + q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub') + q.bind_in(self.client_addrs['iopub']) + q.bind_out(self.engine_addrs['iopub']) + q.setsockopt_out(zmq.SUBSCRIBE, '') + q.connect_mon(self.monitor_url) + q.daemon=True + children.append(q) + + # Multiplexer Queue (in a Process) + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') + q.bind_in(self.client_addrs['mux']) + q.bind_out(self.engine_addrs['mux']) + q.connect_mon(self.monitor_url) + q.daemon=True + children.append(q) + + # Control Queue (in a Process) + q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') + q.bind_in(self.client_addrs['control']) + q.bind_out(self.engine_addrs['control']) + q.connect_mon(self.monitor_url) + q.daemon=True + children.append(q) + # Task Queue (in a Process) + if self.scheme == 'pure': + logging.warn("task::using pure XREQ Task scheduler") + q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') + q.bind_in(self.client_addrs['task']) + q.bind_out(self.engine_addrs['task']) + q.connect_mon(self.monitor_url) + q.daemon=True + children.append(q) + elif self.scheme == 'none': + logging.warn("task::using no Task scheduler") + + else: + logging.warn("task::using Python %s Task scheduler"%self.scheme) + sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) + q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) + q.daemon=True + children.append(q) + def main(argv=None): + """DO NOT USE ME ANYMORE""" parser = make_argument_parser() @@ -256,9 +335,10 @@ def main(argv=None): dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) dc.start() - loop.start() - - + try: + loop.start() + except KeyboardInterrupt: + print ("interrupted, exiting...", file=sys.__stderr__) if __name__ == '__main__': diff --git a/IPython/zmq/parallel/dictdb.py b/IPython/zmq/parallel/dictdb.py index 62646f0..c096068 100644 --- a/IPython/zmq/parallel/dictdb.py +++ b/IPython/zmq/parallel/dictdb.py @@ -75,7 +75,11 @@ class CompositeFilter(object): return False return True -class DictDB(object): +class BaseDB(object): + """Empty Parent class so traitlets work on DB.""" + pass + +class DictDB(BaseDB): """Basic in-memory dict-based object for saving Task Records. This is the first object to present the DB interface diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index aa56e00..c11e902 100755 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -16,43 +16,49 @@ from zmq.eventloop import ioloop, zmqstream # internal from IPython.config.configurable import Configurable -from IPython.utils.traitlets import Instance, Str, Dict +from IPython.utils.traitlets import Instance, Str, Dict, Int, Type # from IPython.utils.localinterfaces import LOCALHOST +from factory import RegistrationFactory + from streamsession import Message, StreamSession from streamkernel import Kernel, make_kernel import heartmonitor from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, local_logger) # import taskthread -logger = logging.getLogger() def printer(*msg): - # print (logger.handlers, file=sys.__stdout__) - logger.info(str(msg)) + # print (logging.handlers, file=sys.__stdout__) + logging.info(str(msg)) -class Engine(Configurable): +class EngineFactory(RegistrationFactory): """IPython engine""" - kernel=None - id=None - # configurables: - context=Instance(zmq.Context) - loop=Instance(ioloop.IOLoop) - session=Instance(StreamSession) - ident=Str() - registrar=Instance(zmqstream.ZMQStream) - user_ns=Dict() + user_ns=Dict(config=True) + out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True) + display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True) + + # not configurable: + id=Int(allow_none=True) + registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') + kernel=Instance(Kernel) + def __init__(self, **kwargs): - super(Engine, self).__init__(**kwargs) - if not self.ident: - self.ident = str(uuid.uuid4()) - self.registrar.on_send(printer) + super(EngineFactory, self).__init__(**kwargs) + ctx = self.context + + reg = ctx.socket(zmq.PAIR) + reg.setsockopt(zmq.IDENTITY, self.ident) + reg.connect(self.url) + self.registrar = zmqstream.ZMQStream(reg, self.loop) def register(self): + """send the registration_request""" + logging.info("registering") content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) self.registrar.on_recv(self.complete_registration) # print (self.session.key) @@ -60,50 +66,87 @@ class Engine(Configurable): def complete_registration(self, msg): # print msg + ctx = self.context + loop = self.loop + identity = self.ident + print (identity) + idents,msg = self.session.feed_identities(msg) msg = Message(self.session.unpack_message(msg)) + if msg.content.status == 'ok': self.id = int(msg.content.id) - self.session.username = 'engine-%i'%self.id + + # create Shell Streams (MUX, Task, etc.): queue_addr = msg.content.mux shell_addrs = [ str(queue_addr) ] - control_addr = str(msg.content.control) task_addr = msg.content.task - iopub_addr = msg.content.iopub if task_addr: shell_addrs.append(str(task_addr)) + shell_streams = [] + for addr in shell_addrs: + stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) + stream.setsockopt(zmq.IDENTITY, identity) + stream.connect(addr) + shell_streams.append(stream) + + # control stream: + control_addr = str(msg.content.control) + control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) + control_stream.setsockopt(zmq.IDENTITY, identity) + control_stream.connect(control_addr) + # create iopub stream: + iopub_addr = msg.content.iopub + iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop) + iopub_stream.setsockopt(zmq.IDENTITY, identity) + iopub_stream.connect(iopub_addr) + + # launch heartbeat hb_addrs = msg.content.heartbeat + # print (hb_addrs) + + # # Redirect input streams and set a display hook. + # if self.out_stream_factory: + # sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') + # sys.stdout.topic = 'engine.%i.stdout'%self.id + # sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') + # sys.stderr.topic = 'engine.%i.stderr'%self.id + # if self.display_hook_factory: + # sys.displayhook = self.display_hook_factory(self.session, iopub_stream) + # sys.displayhook.topic = 'engine.%i.pyout'%self.id + # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() - k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr, - hb_addrs, client_addr=None, loop=self.loop, - context=self.context, key=self.session.key)[-1] - self.kernel = k - if self.user_ns is not None: - self.user_ns.update(self.kernel.user_ns) - self.kernel.user_ns = self.user_ns + self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, + control_stream=control_stream, + shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, + user_ns = self.user_ns, config=self.config) + self.kernel.start() + + heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) + heart.start() + else: - logger.error("Registration Failed: %s"%msg) + logging.error("Registration Failed: %s"%msg) raise Exception("Registration Failed: %s"%msg) - logger.info("completed registration with id %i"%self.id) - - # logger.info(str(msg)) + logging.info("Completed registration with id %i"%self.id) + def unregister(self): - self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) + self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) time.sleep(1) sys.exit(0) def start(self): - logger.info("registering") - self.register() + dc = ioloop.DelayedCallback(self.register, 0, self.loop) + dc.start() + - def main(argv=None, user_ns=None): - + """DO NOT USE ME ANYMORE""" parser = make_base_argument_parser() args = parser.parse_args(argv) @@ -137,7 +180,10 @@ def main(argv=None, user_ns=None): dc = ioloop.DelayedCallback(e.start, 0, loop) dc.start() - loop.start() + try: + loop.start() + except KeyboardInterrupt: + print ("interrupted, exiting...", file=sys.__stderr__) # Execution as a script if __name__ == '__main__': diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index a2f1c5b..1a9ba76 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -1,5 +1,9 @@ """ Defines helper functions for creating kernel entry points and process launchers. + +************ +NOTE: Most of this module has been deprecated by moving to Configurables +************ """ # Standard library imports. @@ -33,17 +37,24 @@ def split_ports(s, n): raise ValueError return ports +_random_ports = set() + def select_random_ports(n): """Selects and return n random ports that are available.""" ports = [] for i in xrange(n): sock = socket.socket() sock.bind(('', 0)) + while sock.getsockname()[1] in _random_ports: + sock.close() + sock = socket.socket() + sock.bind(('', 0)) ports.append(sock) for i, sock in enumerate(ports): port = sock.getsockname()[1] sock.close() ports[i] = port + _random_ports.add(port) return ports def parse_url(args): @@ -61,8 +72,11 @@ def parse_url(args): def signal_children(children): """Relay interupt/term signals to children, for more solid process cleanup.""" def terminate_children(sig, frame): + logging.critical("Got signal %i, terminating children..."%sig) for child in children: child.terminate() + + sys.exit(sig != SIGINT) # sys.exit(sig) for sig in (SIGINT, SIGABRT, SIGTERM): signal(sig, terminate_children) @@ -72,7 +86,7 @@ def generate_exec_key(keyfile): newkey = str(uuid.uuid4()) with open(keyfile, 'w') as f: # f.write('ipython-key ') - f.write(newkey) + f.write(newkey+'\n') # set user-only RW permissions (0600) # this will have no effect on Windows os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR) @@ -115,18 +129,24 @@ def integer_loglevel(loglevel): return loglevel def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): + logger = logging.getLogger() + if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): + # don't add a second PUBHandler + return loglevel = integer_loglevel(loglevel) lsock = context.socket(zmq.PUB) lsock.connect(iface) handler = handlers.PUBHandler(lsock) handler.setLevel(loglevel) handler.root_topic = root - logger = logging.getLogger() logger.addHandler(handler) logger.setLevel(loglevel) def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): logger = logging.getLogger() + if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): + # don't add a second PUBHandler + return loglevel = integer_loglevel(loglevel) lsock = context.socket(zmq.PUB) lsock.connect(iface) @@ -138,8 +158,8 @@ def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): def local_logger(loglevel=logging.DEBUG): loglevel = integer_loglevel(loglevel) logger = logging.getLogger() - if logger.handlers: - # if there are any handlers, skip the hookup + if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]): + # don't add a second StreamHandler return handler = logging.StreamHandler() handler.setLevel(loglevel) diff --git a/IPython/zmq/parallel/factory.py b/IPython/zmq/parallel/factory.py new file mode 100644 index 0000000..04d5173 --- /dev/null +++ b/IPython/zmq/parallel/factory.py @@ -0,0 +1,149 @@ +"""Base config factories.""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + + +import os +import logging + +import uuid + +from zmq.eventloop.ioloop import IOLoop + +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr +from IPython.utils.importstring import import_item + +from IPython.zmq.parallel.entry_point import select_random_ports +import IPython.zmq.parallel.streamsession as ss + +#----------------------------------------------------------------------------- +# Classes +#----------------------------------------------------------------------------- + + +class SessionFactory(Configurable): + """The Base factory from which every factory in IPython.zmq.parallel inherits""" + + packer = Str('',config=True) + unpacker = Str('',config=True) + ident = CStr('',config=True) + def _ident_default(self): + return str(uuid.uuid4()) + username = Str(os.environ.get('USER','username'),config=True) + exec_key = CUnicode('',config=True) + + # not configurable: + context = Instance('zmq.Context', (), {}) + session = Instance('IPython.zmq.parallel.streamsession.StreamSession') + loop = Instance('zmq.eventloop.ioloop.IOLoop') + def _loop_default(self): + return IOLoop.instance() + + def __init__(self, **kwargs): + super(SessionFactory, self).__init__(**kwargs) + + keyfile = self.exec_key or None + + # set the packers: + if not self.packer: + packer_f = unpacker_f = None + elif self.packer.lower() == 'json': + packer_f = ss.json_packer + unpacker_f = ss.json_unpacker + elif self.packer.lower() == 'pickle': + packer_f = ss.pickle_packer + unpacker_f = ss.pickle_unpacker + else: + packer_f = import_item(self.packer) + unpacker_f = import_item(self.unpacker) + + # construct the session + self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, keyfile=keyfile) + + +class RegistrationFactory(SessionFactory): + """The Base Configurable for objects that involve registration.""" + + url = Str('', config=True) # url takes precedence over ip,regport,transport + transport = Str('tcp', config=True) + ip = Str('127.0.0.1', config=True) + regport = Instance(int, config=True) + def _regport_default(self): + return 10101 + # return select_random_ports(1)[0] + + def __init__(self, **kwargs): + super(RegistrationFactory, self).__init__(**kwargs) + self._propagate_url() + self._rebuild_url() + self.on_trait_change(self._propagate_url, 'url') + self.on_trait_change(self._rebuild_url, 'ip') + self.on_trait_change(self._rebuild_url, 'transport') + self.on_trait_change(self._rebuild_url, 'regport') + + def _rebuild_url(self): + self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport) + + def _propagate_url(self): + """Ensure self.url contains full transport://interface:port""" + if self.url: + iface = self.url.split('://',1) + if len(iface) == 2: + self.transport,iface = iface + iface = iface.split(':') + self.ip = iface[0] + if iface[1]: + self.regport = int(iface[1]) + +#----------------------------------------------------------------------------- +# argparse argument extenders +#----------------------------------------------------------------------------- + + +def add_session_arguments(parser): + paa = parser.add_argument + paa('--ident', + type=str, dest='SessionFactory.ident', + help='set the ZMQ and session identity [default: random uuid]', + metavar='identity') + # paa('--execkey', + # type=str, dest='SessionFactory.exec_key', + # help='path to a file containing an execution key.', + # metavar='execkey') + paa('--packer', + type=str, dest='SessionFactory.packer', + help='method to serialize messages: {json,pickle} [default: json]', + metavar='packer') + paa('--unpacker', + type=str, dest='SessionFactory.unpacker', + help='inverse function of `packer`. Only necessary when using something other than json|pickle', + metavar='packer') + +def add_registration_arguments(parser): + paa = parser.add_argument + paa('--ip', + type=str, dest='RegistrationFactory.ip', + help="The IP used for registration [default: localhost]", + metavar='ip') + paa('--transport', + type=str, dest='RegistrationFactory.transport', + help="The ZeroMQ transport used for registration [default: tcp]", + metavar='transport') + paa('--url', + type=str, dest='RegistrationFactory.url', + help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101', + metavar='url') + paa('--regport', + type=int, dest='RegistrationFactory.regport', + help="The port used for registration [default: 10101]", + metavar='ip') diff --git a/IPython/zmq/parallel/heartmonitor.py b/IPython/zmq/parallel/heartmonitor.py index 33df58f..b722f4f 100644 --- a/IPython/zmq/parallel/heartmonitor.py +++ b/IPython/zmq/parallel/heartmonitor.py @@ -13,8 +13,6 @@ import zmq from zmq.devices import ProcessDevice,ThreadDevice from zmq.eventloop import ioloop, zmqstream -logger = logging.getLogger() - class Heart(object): """A basic heart object for responding to a HeartMonitor. This is a simple wrapper with defaults for the most common @@ -78,12 +76,12 @@ class HeartMonitor(object): def add_new_heart_handler(self, handler): """add a new handler for new hearts""" - logger.debug("heartbeat::new_heart_handler: %s"%handler) + logging.debug("heartbeat::new_heart_handler: %s"%handler) self._new_handlers.add(handler) def add_heart_failure_handler(self, handler): """add a new handler for heart failure""" - logger.debug("heartbeat::new heart failure handler: %s"%handler) + logging.debug("heartbeat::new heart failure handler: %s"%handler) self._failure_handlers.add(handler) def beat(self): @@ -93,7 +91,7 @@ class HeartMonitor(object): toc = time.time() self.lifetime += toc-self.tic self.tic = toc - # logger.debug("heartbeat::%s"%self.lifetime) + # logging.debug("heartbeat::%s"%self.lifetime) goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) heartfailures = self.on_probation.intersection(missed_beats) @@ -103,7 +101,7 @@ class HeartMonitor(object): self.on_probation = missed_beats.intersection(self.hearts) self.responses = set() # print self.on_probation, self.hearts - # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) + # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) self.pingstream.send(str(self.lifetime)) def handle_new_heart(self, heart): @@ -111,7 +109,7 @@ class HeartMonitor(object): for handler in self._new_handlers: handler(heart) else: - logger.info("heartbeat::yay, got new heart %s!"%heart) + logging.info("heartbeat::yay, got new heart %s!"%heart) self.hearts.add(heart) def handle_heart_failure(self, heart): @@ -120,11 +118,10 @@ class HeartMonitor(object): try: handler(heart) except Exception as e: - print (e) - logger.error("heartbeat::Bad Handler! %s"%handler) + logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) pass else: - logger.info("heartbeat::Heart %s failed :("%heart) + logging.info("heartbeat::Heart %s failed :("%heart) self.hearts.remove(heart) @@ -132,14 +129,14 @@ class HeartMonitor(object): "a heart just beat" if msg[1] == str(self.lifetime): delta = time.time()-self.tic - # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) + # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) elif msg[1] == str(self.last_ping): delta = time.time()-self.tic + (self.lifetime-self.last_ping) - logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) + logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) else: - logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% + logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% (msg[1],self.lifetime)) diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index ca42577..622c75b 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -21,12 +21,16 @@ import time import logging import zmq -from zmq.eventloop import ioloop, zmqstream +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream # internal: from IPython.config.configurable import Configurable -from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict -# from IPython.zmq.log import logger # a Logger object +from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool +from IPython.utils.importstring import import_item + +from entry_point import select_random_ports +from factory import RegistrationFactory from streamsession import Message, wrap_exception, ISO8601 from heartmonitor import HeartMonitor @@ -43,8 +47,6 @@ else: # Code #----------------------------------------------------------------------------- -logger = logging.getLogger() - def _passer(*args, **kwargs): return @@ -92,13 +94,179 @@ class EngineConnector(HasTraits): control=Str() registration=Str() heartbeat=Str() - pending=Instance(set) + pending=Set() def __init__(self, **kwargs): super(EngineConnector, self).__init__(**kwargs) - logger.info("engine::Engine Connected: %i"%self.id) + logging.info("engine::Engine Connected: %i"%self.id) + +class HubFactory(RegistrationFactory): + """The Configurable for setting up a Hub.""" + + # port-pairs for monitoredqueues: + hb = Instance(list, config=True) + def _hb_default(self): + return select_random_ports(2) + + mux = Instance(list, config=True) + def _mux_default(self): + return select_random_ports(2) + + task = Instance(list, config=True) + def _task_default(self): + return select_random_ports(2) + + control = Instance(list, config=True) + def _control_default(self): + return select_random_ports(2) + + iopub = Instance(list, config=True) + def _iopub_default(self): + return select_random_ports(2) + + # single ports: + mon_port = Instance(int, config=True) + def _mon_port_default(self): + return select_random_ports(1)[0] + + query_port = Instance(int, config=True) + def _query_port_default(self): + return select_random_ports(1)[0] + + notifier_port = Instance(int, config=True) + def _notifier_port_default(self): + return select_random_ports(1)[0] + + ping = Int(1000, config=True) # ping frequency + + engine_ip = Str('127.0.0.1', config=True) + engine_transport = Str('tcp', config=True) + + client_ip = Str('127.0.0.1', config=True) + client_transport = Str('tcp', config=True) + + monitor_ip = Str('127.0.0.1', config=True) + monitor_transport = Str('tcp', config=True) + + monitor_url = Str('') + + db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True) + + # not configurable + db = Instance('IPython.zmq.parallel.dictdb.BaseDB') + heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor') + subconstructors = List() + _constructed = Bool(False) + + def _update_monitor_url(self): + self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) + + def _sync_ips(self): + self.engine_ip = self.ip + self.client_ip = self.ip + self.monitor_ip = self.ip + self._update_monitor_url() + + def _sync_transports(self): + self.engine_transport = self.transport + self.client_transport = self.transport + self.monitor_transport = self.transport + self._update_monitor_url() + + def __init__(self, **kwargs): + super(HubFactory, self).__init__(**kwargs) + self._update_monitor_url() + self.on_trait_change(self._sync_ips, 'ip') + self.on_trait_change(self._sync_transports, 'transport') + self.subconstructors.append(self.construct_hub) + + + def construct(self): + assert not self._constructed, "already constructed!" + + for subc in self.subconstructors: + subc() + + self._constructed = True + + + def start(self): + assert self._constructed, "must be constructed by self.construct() first!" + self.heartmonitor.start() + logging.info("Heartmonitor started") + + def construct_hub(self): + """construct""" + client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i" + engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i" + + ctx = self.context + loop = self.loop + + # Registrar socket + reg = ZMQStream(ctx.socket(zmq.XREP), loop) + reg.bind(client_iface % self.regport) + logging.info("Hub listening on %s for registration."%(client_iface%self.regport)) + if self.client_ip != self.engine_ip: + reg.bind(engine_iface % self.regport) + logging.info("Hub listening on %s for registration."%(engine_iface%self.regport)) + + ### Engine connections ### + + # heartbeat + hpub = ctx.socket(zmq.PUB) + hpub.bind(engine_iface % self.hb[0]) + hrep = ctx.socket(zmq.XREP) + hrep.bind(engine_iface % self.hb[1]) + + self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping) + + ### Client connections ### + # Clientele socket + c = ZMQStream(ctx.socket(zmq.XREP), loop) + c.bind(client_iface%self.query_port) + # Notifier socket + n = ZMQStream(ctx.socket(zmq.PUB), loop) + n.bind(client_iface%self.notifier_port) + + ### build and launch the queues ### + + # monitor socket + sub = ctx.socket(zmq.SUB) + sub.setsockopt(zmq.SUBSCRIBE, "") + sub.bind(self.monitor_url) + sub = ZMQStream(sub, loop) -class Hub(Configurable): + # connect the db + self.db = import_item(self.db_class)() + time.sleep(.25) + + # build connection dicts + self.engine_addrs = { + 'control' : engine_iface%self.control[1], + 'mux': engine_iface%self.mux[1], + 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), + 'task' : engine_iface%self.task[1], + 'iopub' : engine_iface%self.iopub[1], + # 'monitor' : engine_iface%self.mon_port, + } + + self.client_addrs = { + 'control' : client_iface%self.control[0], + 'query': client_iface%self.query_port, + 'mux': client_iface%self.mux[0], + 'task' : client_iface%self.task[0], + 'iopub' : client_iface%self.iopub[0], + 'notification': client_iface%self.notifier_port + } + logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs) + logging.debug("hub::Hub client addrs: %s"%self.client_addrs) + self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, + registrar=reg, clientele=c, notifier=n, db=self.db, + engine_addrs=self.engine_addrs, client_addrs=self.client_addrs) + + +class Hub(HasTraits): """The IPython Controller Hub with 0MQ connections Parameters @@ -120,25 +288,29 @@ class Hub(Configurable): to the queues. """ # internal data structures: - ids=None # engine IDs - keytable=None - engines=None - clients=None - hearts=None - pending=None - tasks=None - completed=None + ids=Set() # engine IDs + keytable=Dict() + by_ident=Dict() + engines=Dict() + clients=Dict() + hearts=Dict() + pending=Set() + queues=Dict() # pending msg_ids keyed by engine_id + tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id + completed=Dict() # completed msg_ids keyed by engine_id + all_completed=Set() # completed msg_ids keyed by engine_id # mia=None - incoming_registrations=None - registration_timeout=None + incoming_registrations=Dict() + registration_timeout=Int() + _idcounter=Int(0) # objects from constructor: loop=Instance(ioloop.IOLoop) - registrar=Instance(zmqstream.ZMQStream) - clientele=Instance(zmqstream.ZMQStream) - monitor=Instance(zmqstream.ZMQStream) + registrar=Instance(ZMQStream) + clientele=Instance(ZMQStream) + monitor=Instance(ZMQStream) heartmonitor=Instance(HeartMonitor) - notifier=Instance(zmqstream.ZMQStream) + notifier=Instance(ZMQStream) db=Instance(object) client_addrs=Dict() engine_addrs=Dict() @@ -163,21 +335,22 @@ class Hub(Configurable): super(Hub, self).__init__(**kwargs) self.ids = set() - self.keytable={} - self.incoming_registrations={} - self.engines = {} - self.by_ident = {} - self.clients = {} - self.hearts = {} + self.pending = set() + # self.keytable={} + # self.incoming_registrations={} + # self.engines = {} + # self.by_ident = {} + # self.clients = {} + # self.hearts = {} # self.mia = set() self.registration_timeout = max(5000, 2*self.heartmonitor.period) # this is the stuff that will move to DB: - self.pending = set() # pending messages, keyed by msg_id - self.queues = {} # pending msg_ids keyed by engine_id - self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id - self.completed = {} # completed msg_ids keyed by engine_id - self.all_completed = set() - self._idcounter = 0 + # self.pending = set() # pending messages, keyed by msg_id + # self.queues = {} # pending msg_ids keyed by engine_id + # self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id + # self.completed = {} # completed msg_ids keyed by engine_id + # self.all_completed = set() + # self._idcounter = 0 # self.sockets = {} # self.loop = loop # self.session = session @@ -232,7 +405,7 @@ class Hub(Configurable): 'connection_request': self.connection_request, } - logger.info("controller::created controller") + logging.info("hub::created hub") @property def _next_id(self): @@ -281,7 +454,7 @@ class Hub(Configurable): try: msg = self.session.unpack_message(msg[1:], content=True) except: - logger.error("client::Invalid Message %s"%msg, exc_info=True) + logging.error("client::Invalid Message %s"%msg, exc_info=True) return False msg_type = msg.get('msg_type', None) @@ -298,15 +471,15 @@ class Hub(Configurable): def dispatch_register_request(self, msg): """""" - logger.debug("registration::dispatch_register_request(%s)"%msg) + logging.debug("registration::dispatch_register_request(%s)"%msg) idents,msg = self.session.feed_identities(msg) if not idents: - logger.error("Bad Queue Message: %s"%msg, exc_info=True) + logging.error("Bad Queue Message: %s"%msg, exc_info=True) return try: msg = self.session.unpack_message(msg,content=True) except: - logger.error("registration::got bad registration message: %s"%msg, exc_info=True) + logging.error("registration::got bad registration message: %s"%msg, exc_info=True) return msg_type = msg['msg_type'] @@ -314,53 +487,53 @@ class Hub(Configurable): handler = self.registrar_handlers.get(msg_type, None) if handler is None: - logger.error("registration::got bad registration message: %s"%msg) + logging.error("registration::got bad registration message: %s"%msg) else: handler(idents, msg) def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - logger.debug("monitor traffic: %s"%msg[:2]) + logging.debug("monitor traffic: %s"%msg[:2]) switch = msg[0] idents, msg = self.session.feed_identities(msg[1:]) if not idents: - logger.error("Bad Monitor Message: %s"%msg) + logging.error("Bad Monitor Message: %s"%msg) return handler = self.monitor_handlers.get(switch, None) if handler is not None: handler(idents, msg) else: - logger.error("Invalid monitor topic: %s"%switch) + logging.error("Invalid monitor topic: %s"%switch) def dispatch_client_msg(self, msg): """Route messages from clients""" idents, msg = self.session.feed_identities(msg) if not idents: - logger.error("Bad Client Message: %s"%msg) + logging.error("Bad Client Message: %s"%msg) return client_id = idents[0] try: msg = self.session.unpack_message(msg, content=True) except: content = wrap_exception() - logger.error("Bad Client Message: %s"%msg, exc_info=True) - self.session.send(self.clientele, "controller_error", ident=client_id, + logging.error("Bad Client Message: %s"%msg, exc_info=True) + self.session.send(self.clientele, "hub_error", ident=client_id, content=content) return # print client_id, header, parent, content #switch on message type: msg_type = msg['msg_type'] - logger.info("client:: client %s requested %s"%(client_id, msg_type)) + logging.info("client:: client %s requested %s"%(client_id, msg_type)) handler = self.client_handlers.get(msg_type, None) try: assert handler is not None, "Bad Message Type: %s"%msg_type except: content = wrap_exception() - logger.error("Bad Message Type: %s"%msg_type, exc_info=True) - self.session.send(self.clientele, "controller_error", ident=client_id, + logging.error("Bad Message Type: %s"%msg_type, exc_info=True) + self.session.send(self.clientele, "hub_error", ident=client_id, content=content) return else: @@ -380,9 +553,9 @@ class Hub(Configurable): """handler to attach to heartbeater. Called when a new heart starts to beat. Triggers completion of registration.""" - logger.debug("heartbeat::handle_new_heart(%r)"%heart) + logging.debug("heartbeat::handle_new_heart(%r)"%heart) if heart not in self.incoming_registrations: - logger.info("heartbeat::ignoring new heart: %r"%heart) + logging.info("heartbeat::ignoring new heart: %r"%heart) else: self.finish_registration(heart) @@ -391,11 +564,11 @@ class Hub(Configurable): """handler to attach to heartbeater. called when a previously registered heart fails to respond to beat request. triggers unregistration""" - logger.debug("heartbeat::handle_heart_failure(%r)"%heart) + logging.debug("heartbeat::handle_heart_failure(%r)"%heart) eid = self.hearts.get(heart, None) queue = self.engines[eid].queue if eid is None: - logger.info("heartbeat::ignoring heart failure %r"%heart) + logging.info("heartbeat::ignoring heart failure %r"%heart) else: self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) @@ -403,19 +576,19 @@ class Hub(Configurable): def save_queue_request(self, idents, msg): if len(idents) < 2: - logger.error("invalid identity prefix: %s"%idents) + logging.error("invalid identity prefix: %s"%idents) return queue_id, client_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: - logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) + logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - logger.error("queue::target %r not registered"%queue_id) - logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) + logging.error("queue::target %r not registered"%queue_id) + logging.debug("queue:: valid are: %s"%(self.by_ident.keys())) return header = msg['header'] @@ -432,21 +605,21 @@ class Hub(Configurable): def save_queue_result(self, idents, msg): if len(idents) < 2: - logger.error("invalid identity prefix: %s"%idents) + logging.error("invalid identity prefix: %s"%idents) return client_id, queue_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: - logger.error("queue::engine %r sent invalid message to %r: %s"%( + logging.error("queue::engine %r sent invalid message to %r: %s"%( queue_id,client_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - logger.error("queue::unknown engine %r is sending a reply: "%queue_id) - logger.debug("queue:: %s"%msg[2:]) + logging.error("queue::unknown engine %r is sending a reply: "%queue_id) + logging.debug("queue:: %s"%msg[2:]) return parent = msg['parent_header'] @@ -475,7 +648,7 @@ class Hub(Configurable): result['result_buffers'] = msg['buffers'] self.db.update_record(msg_id, result) else: - logger.debug("queue:: unknown msg finished %s"%msg_id) + logging.debug("queue:: unknown msg finished %s"%msg_id) #--------------------- Task Queue Traffic ------------------------------ @@ -486,7 +659,7 @@ class Hub(Configurable): try: msg = self.session.unpack_message(msg, content=False) except: - logger.error("task::client %r sent invalid task message: %s"%( + logging.error("task::client %r sent invalid task message: %s"%( client_id, msg), exc_info=True) return record = init_record(msg) @@ -505,7 +678,7 @@ class Hub(Configurable): try: msg = self.session.unpack_message(msg, content=False) except: - logger.error("task::invalid task result message send to %r: %s"%( + logging.error("task::invalid task result message send to %r: %s"%( client_id, msg), exc_info=True) raise return @@ -513,7 +686,7 @@ class Hub(Configurable): parent = msg['parent_header'] if not parent: # print msg - logger.warn("Task %r had no parent!"%msg) + logging.warn("Task %r had no parent!"%msg) return msg_id = parent['msg_id'] @@ -546,13 +719,13 @@ class Hub(Configurable): self.db.update_record(msg_id, result) else: - logger.debug("task::unknown task %s finished"%msg_id) + logging.debug("task::unknown task %s finished"%msg_id) def save_task_destination(self, idents, msg): try: msg = self.session.unpack_message(msg, content=True) except: - logger.error("task::invalid task tracking message", exc_info=True) + logging.error("task::invalid task tracking message", exc_info=True) return content = msg['content'] print (content) @@ -560,11 +733,11 @@ class Hub(Configurable): engine_uuid = content['engine_id'] eid = self.by_ident[engine_uuid] - logger.info("task::task %s arrived on %s"%(msg_id, eid)) + logging.info("task::task %s arrived on %s"%(msg_id, eid)) # if msg_id in self.mia: # self.mia.remove(msg_id) # else: - # logger.debug("task::task %s not listed as MIA?!"%(msg_id)) + # logging.debug("task::task %s not listed as MIA?!"%(msg_id)) self.tasks[eid].append(msg_id) # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) @@ -585,12 +758,12 @@ class Hub(Configurable): try: msg = self.session.unpack_message(msg, content=True) except: - logger.error("iopub::invalid IOPub message", exc_info=True) + logging.error("iopub::invalid IOPub message", exc_info=True) return parent = msg['parent_header'] if not parent: - logger.error("iopub::invalid IOPub message: %s"%msg) + logging.error("iopub::invalid IOPub message: %s"%msg) return msg_id = parent['msg_id'] msg_type = msg['msg_type'] @@ -600,7 +773,7 @@ class Hub(Configurable): try: rec = self.db.get_record(msg_id) except: - logger.error("iopub::IOPub message has invalid parent", exc_info=True) + logging.error("iopub::IOPub message has invalid parent", exc_info=True) return # stream d = {} @@ -624,7 +797,7 @@ class Hub(Configurable): def connection_request(self, client_id, msg): """Reply with connection addresses for clients.""" - logger.info("client::client %s connected"%client_id) + logging.info("client::client %s connected"%client_id) content = dict(status='ok') content.update(self.client_addrs) jsonable = {} @@ -639,14 +812,14 @@ class Hub(Configurable): try: queue = content['queue'] except KeyError: - logger.error("registration::queue not specified", exc_info=True) + logging.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) - logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) + logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) content = dict(id=eid,status='ok') content.update(self.engine_addrs) @@ -656,12 +829,12 @@ class Hub(Configurable): raise KeyError("queue_id %r in use"%queue) except: content = wrap_exception() - logger.error("queue_id %r in use"%queue, exc_info=True) + logging.error("queue_id %r in use"%queue, exc_info=True) elif heart in self.hearts: # need to check unique hearts? try: raise KeyError("heart_id %r in use"%heart) except: - logger.error("heart_id %r in use"%heart, exc_info=True) + logging.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() else: for h, pack in self.incoming_registrations.iteritems(): @@ -669,14 +842,14 @@ class Hub(Configurable): try: raise KeyError("heart_id %r in use"%heart) except: - logger.error("heart_id %r in use"%heart, exc_info=True) + logging.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() break elif queue == pack[1]: try: raise KeyError("queue_id %r in use"%queue) except: - logger.error("queue_id %r in use"%queue, exc_info=True) + logging.error("queue_id %r in use"%queue, exc_info=True) content = wrap_exception() break @@ -695,7 +868,7 @@ class Hub(Configurable): dc.start() self.incoming_registrations[heart] = (eid,queue,reg[0],dc) else: - logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) + logging.error("registration::registration %i failed: %s"%(eid, content['evalue'])) return eid def unregister_engine(self, ident, msg): @@ -703,9 +876,9 @@ class Hub(Configurable): try: eid = msg['content']['id'] except: - logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) + logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) return - logger.info("registration::unregister_engine(%s)"%eid) + logging.info("registration::unregister_engine(%s)"%eid) content=dict(id=eid, queue=self.engines[eid].queue) self.ids.remove(eid) self.keytable.pop(eid) @@ -726,9 +899,9 @@ class Hub(Configurable): try: (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) except KeyError: - logger.error("registration::tried to finish nonexistant registration", exc_info=True) + logging.error("registration::tried to finish nonexistant registration", exc_info=True) return - logger.info("registration::finished registering engine %i:%r"%(eid,queue)) + logging.info("registration::finished registering engine %i:%r"%(eid,queue)) if purge is not None: purge.stop() control = queue @@ -748,7 +921,7 @@ class Hub(Configurable): def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: eid = self.incoming_registrations.pop(heart)[0] - logger.info("registration::purging stalled registration: %i"%eid) + logging.info("registration::purging stalled registration: %i"%eid) else: pass @@ -769,7 +942,7 @@ class Hub(Configurable): dc.start() def _shutdown(self): - logger.info("controller::controller shutting down.") + logging.info("hub::hub shutting down.") time.sleep(0.1) sys.exit(0) @@ -781,7 +954,7 @@ class Hub(Configurable): targets = self._validate_targets(targets) except: content = wrap_exception() - self.session.send(self.clientele, "controller_error", + self.session.send(self.clientele, "hub_error", content=content, ident=client_id) return @@ -805,7 +978,7 @@ class Hub(Configurable): targets = self._validate_targets(targets) except: content = wrap_exception() - self.session.send(self.clientele, "controller_error", + self.session.send(self.clientele, "hub_error", content=content, ident=client_id) return verbose = content.get('verbose', False) diff --git a/IPython/zmq/parallel/ipcluster.py b/IPython/zmq/parallel/ipcluster.py index c413e8c..fb078fa 100644 --- a/IPython/zmq/parallel/ipcluster.py +++ b/IPython/zmq/parallel/ipcluster.py @@ -4,8 +4,7 @@ import sys,os import time from subprocess import Popen, PIPE -from entry_point import parse_url -from controller import make_argument_parser +from IPython.external.argparse import ArgumentParser, SUPPRESS def _filter_arg(flag, args): filtered = [] @@ -48,7 +47,7 @@ def strip_args(flags, args=sys.argv[1:]): def launch_process(mod, args): """Launch a controller or engine in a subprocess.""" - code = "from IPython.zmq.parallel.%s import main;main()"%mod + code = "from IPython.zmq.parallel.%s import launch_new_instance;launch_new_instance()"%mod arguments = [ sys.executable, '-c', code ] + args blackholew = file(os.devnull, 'w') blackholer = file(os.devnull, 'r') @@ -57,17 +56,13 @@ def launch_process(mod, args): return proc def main(): - parser = make_argument_parser() + parser = ArgumentParser(argument_default=SUPPRESS) parser.add_argument('--n', '-n', type=int, default=1, help="The number of engines to start.") - args = parser.parse_args() - parse_url(args) - - controller_args = strip_args([('--n','-n')]) - engine_args = filter_args(['--url', '--regport', '--logport', '--ip', - '--transport','--loglevel','--packer', '--execkey'])+['--ident'] + ns,args = parser.parse_known_args() + n = ns.n - controller = launch_process('controller', controller_args) + controller = launch_process('ipcontrollerapp', args) for i in range(10): time.sleep(.1) if controller.poll() is not None: @@ -75,9 +70,9 @@ def main(): print (controller.stderr.read()) sys.exit(255) - print("Launched Controller at %s"%args.url) - engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ] - print("%i Engines started"%args.n) + print("Launched Controller") + engines = [ launch_process('ipengineapp', args+['--ident', 'engine-%i'%i]) for i in range(n) ] + print("%i Engines started"%n) def wait_quietly(p): try: diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py new file mode 100755 index 0000000..b4cce50 --- /dev/null +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The IPython controller application. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import with_statement + +import copy +import sys +import os +import logging +# from twisted.application import service +# from twisted.internet import reactor +# from twisted.python import log + +import zmq +from zmq.log.handlers import PUBHandler + +from IPython.config.loader import Config +from IPython.zmq.parallel import factory +from IPython.zmq.parallel.controller import ControllerFactory +from IPython.zmq.parallel.clusterdir import ( + ApplicationWithClusterDir, + ClusterDirConfigLoader +) +# from IPython.kernel.fcutil import FCServiceFactory, FURLError +from IPython.utils.traitlets import Instance, Unicode + +from entry_point import generate_exec_key + + +#----------------------------------------------------------------------------- +# Module level variables +#----------------------------------------------------------------------------- + + +#: The default config file name for this application +default_config_file_name = u'ipcontroller_config.py' + + +_description = """Start the IPython controller for parallel computing. + +The IPython controller provides a gateway between the IPython engines and +clients. The controller needs to be started before the engines and can be +configured using command line options or using a cluster directory. Cluster +directories contain config, log and security files and are usually located in +your .ipython directory and named as "cluster_". See the --profile +and --cluster-dir options for details. +""" + +#----------------------------------------------------------------------------- +# Default interfaces +#----------------------------------------------------------------------------- + +# The default client interfaces for FCClientServiceFactory.interfaces +default_client_interfaces = Config() +default_client_interfaces.Default.url_file = 'ipcontroller-client.url' + +# Make this a dict we can pass to Config.__init__ for the default +default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items())) + + + +# The default engine interfaces for FCEngineServiceFactory.interfaces +default_engine_interfaces = Config() +default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url' + +# Make this a dict we can pass to Config.__init__ for the default +default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items())) + + +#----------------------------------------------------------------------------- +# Service factories +#----------------------------------------------------------------------------- + +# +# class FCClientServiceFactory(FCServiceFactory): +# """A Foolscap implementation of the client services.""" +# +# cert_file = Unicode(u'ipcontroller-client.pem', config=True) +# interfaces = Instance(klass=Config, kw=default_client_interfaces, +# allow_none=False, config=True) +# +# +# class FCEngineServiceFactory(FCServiceFactory): +# """A Foolscap implementation of the engine services.""" +# +# cert_file = Unicode(u'ipcontroller-engine.pem', config=True) +# interfaces = Instance(klass=dict, kw=default_engine_interfaces, +# allow_none=False, config=True) +# + +#----------------------------------------------------------------------------- +# Command line options +#----------------------------------------------------------------------------- + + +class IPControllerAppConfigLoader(ClusterDirConfigLoader): + + def _add_arguments(self): + super(IPControllerAppConfigLoader, self)._add_arguments() + paa = self.parser.add_argument + + ## Hub Config: + paa('--mongodb', + dest='HubFactory.db_class', action='store_const', + const='IPython.zmq.parallel.mongodb.MongoDB', + help='Use MongoDB task storage [default: in-memory]') + paa('--hb', + type=int, dest='HubFactory.hb', nargs=2, + help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' + 'connections [default: random]', + metavar='Hub.hb_ports') + + # Client config + paa('--client-ip', + type=str, dest='HubFactory.client_ip', + help='The IP address or hostname the Hub will listen on for ' + 'client connections. Both engine-ip and client-ip can be set simultaneously ' + 'via --ip [default: loopback]', + metavar='Hub.client_ip') + paa('--client-transport', + type=str, dest='HubFactory.client_transport', + help='The ZeroMQ transport the Hub will use for ' + 'client connections. Both engine-transport and client-transport can be set simultaneously ' + 'via --transport [default: tcp]', + metavar='Hub.client_transport') + paa('--query', + type=int, dest='HubFactory.query_port', + help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]', + metavar='Hub.query_port') + paa('--notifier', + type=int, dest='HubFactory.notifier_port', + help='The port on which the Hub PUB socket will listen for notification connections [default: random]', + metavar='Hub.notifier_port') + + # Engine config + paa('--engine-ip', + type=str, dest='HubFactory.engine_ip', + help='The IP address or hostname the Hub will listen on for ' + 'engine connections. This applies to the Hub and its schedulers' + 'engine-ip and client-ip can be set simultaneously ' + 'via --ip [default: loopback]', + metavar='Hub.engine_ip') + paa('--engine-transport', + type=str, dest='HubFactory.engine_transport', + help='The ZeroMQ transport the Hub will use for ' + 'client connections. Both engine-transport and client-transport can be set simultaneously ' + 'via --transport [default: tcp]', + metavar='Hub.engine_transport') + + # Scheduler config + paa('--mux', + type=int, dest='ControllerFactory.mux', nargs=2, + help='The (2) ports the MUX scheduler will listen on for client,engine ' + 'connections, respectively [default: random]', + metavar='Scheduler.mux_ports') + paa('--task', + type=int, dest='ControllerFactory.task', nargs=2, + help='The (2) ports the Task scheduler will listen on for client,engine ' + 'connections, respectively [default: random]', + metavar='Scheduler.task_ports') + paa('--control', + type=int, dest='ControllerFactory.control', nargs=2, + help='The (2) ports the Control scheduler will listen on for client,engine ' + 'connections, respectively [default: random]', + metavar='Scheduler.control_ports') + paa('--iopub', + type=int, dest='ControllerFactory.iopub', nargs=2, + help='The (2) ports the IOPub scheduler will listen on for client,engine ' + 'connections, respectively [default: random]', + metavar='Scheduler.iopub_ports') + paa('--scheme', + type=str, dest='ControllerFactory.scheme', + choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], + help='select the task scheduler scheme [default: Python LRU]', + metavar='Scheduler.scheme') + paa('--usethreads', + dest='ControllerFactory.usethreads', action="store_true", + help='Use threads instead of processes for the schedulers', + ) + + ## Global config + paa('--log-to-file', + action='store_true', dest='Global.log_to_file', + help='Log to a file in the log directory (default is stdout)') + paa('--log-url', + type=str, dest='Global.log_url', + help='Broadcast logs to an iploggerz process [default: disabled]') + paa('-r','--reuse-key', + action='store_true', dest='Global.reuse_key', + help='Try to reuse existing execution keys.') + paa('--no-secure', + action='store_false', dest='Global.secure', + help='Turn off execution keys.') + paa('--secure', + action='store_true', dest='Global.secure', + help='Turn on execution keys (default).') + paa('--execkey', + type=str, dest='Global.exec_key', + help='path to a file containing an execution key.', + metavar='keyfile') + factory.add_session_arguments(self.parser) + factory.add_registration_arguments(self.parser) + + +#----------------------------------------------------------------------------- +# The main application +#----------------------------------------------------------------------------- + + +class IPControllerApp(ApplicationWithClusterDir): + + name = u'ipcontrollerz' + description = _description + command_line_loader = IPControllerAppConfigLoader + default_config_file_name = default_config_file_name + auto_create_cluster_dir = True + + def create_default_config(self): + super(IPControllerApp, self).create_default_config() + # Don't set defaults for Global.secure or Global.reuse_furls + # as those are set in a component. + self.default_config.Global.import_statements = [] + self.default_config.Global.clean_logs = True + self.default_config.Global.secure = False + self.default_config.Global.reuse_key = False + self.default_config.Global.exec_key = "exec_key.key" + + def pre_construct(self): + super(IPControllerApp, self).pre_construct() + c = self.master_config + # The defaults for these are set in FCClientServiceFactory and + # FCEngineServiceFactory, so we only set them here if the global + # options have be set to override the class level defaults. + + # if hasattr(c.Global, 'reuse_furls'): + # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls + # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls + # del c.Global.reuse_furls + # if hasattr(c.Global, 'secure'): + # c.FCClientServiceFactory.secure = c.Global.secure + # c.FCEngineServiceFactory.secure = c.Global.secure + # del c.Global.secure + + def construct(self): + # This is the working dir by now. + sys.path.insert(0, '') + c = self.master_config + + self.import_statements() + + if c.Global.secure: + keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) + if not c.Global.reuse_key or not os.path.exists(keyfile): + generate_exec_key(keyfile) + c.SessionFactory.exec_key = keyfile + else: + keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key) + if os.path.exists(keyfile): + os.remove(keyfile) + c.SessionFactory.exec_key = '' + + try: + self.factory = ControllerFactory(config=c) + self.start_logging() + self.factory.construct() + except: + self.log.error("Couldn't construct the Controller", exc_info=True) + self.exit(1) + + + def import_statements(self): + statements = self.master_config.Global.import_statements + for s in statements: + try: + self.log.msg("Executing statement: '%s'" % s) + exec s in globals(), locals() + except: + self.log.msg("Error running statement: %s" % s) + + # def start_logging(self): + # super(IPControllerApp, self).start_logging() + # if self.master_config.Global.log_url: + # context = self.factory.context + # lsock = context.socket(zmq.PUB) + # lsock.connect(self.master_config.Global.log_url) + # handler = PUBHandler(lsock) + # handler.root_topic = 'controller' + # handler.setLevel(self.log_level) + # self.log.addHandler(handler) + # + def start_app(self): + # Start the controller service. + self.factory.start() + self.write_pid_file(overwrite=True) + try: + self.factory.loop.start() + except KeyboardInterrupt: + self.log.critical("Interrupted, Exiting...\n") + + +def launch_new_instance(): + """Create and run the IPython controller""" + app = IPControllerApp() + app.start() + + +if __name__ == '__main__': + launch_new_instance() diff --git a/IPython/zmq/parallel/ipengineapp.py b/IPython/zmq/parallel/ipengineapp.py new file mode 100755 index 0000000..a75c239 --- /dev/null +++ b/IPython/zmq/parallel/ipengineapp.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The IPython engine application +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import os +import sys + +import zmq +from zmq.eventloop import ioloop + +from IPython.zmq.parallel.clusterdir import ( + ApplicationWithClusterDir, + ClusterDirConfigLoader +) +from IPython.zmq.log import EnginePUBHandler + +from IPython.zmq.parallel import factory +from IPython.zmq.parallel.engine import EngineFactory +from IPython.zmq.parallel.streamkernel import Kernel +from IPython.utils.importstring import import_item + +#----------------------------------------------------------------------------- +# Module level variables +#----------------------------------------------------------------------------- + +#: The default config file name for this application +default_config_file_name = u'ipengine_config.py' + + +mpi4py_init = """from mpi4py import MPI as mpi +mpi.size = mpi.COMM_WORLD.Get_size() +mpi.rank = mpi.COMM_WORLD.Get_rank() +""" + + +pytrilinos_init = """from PyTrilinos import Epetra +class SimpleStruct: +pass +mpi = SimpleStruct() +mpi.rank = 0 +mpi.size = 0 +""" + + +_description = """Start an IPython engine for parallel computing.\n\n + +IPython engines run in parallel and perform computations on behalf of a client +and controller. A controller needs to be started before the engines. The +engine can be configured using command line options or using a cluster +directory. Cluster directories contain config, log and security files and are +usually located in your .ipython directory and named as "cluster_". +See the --profile and --cluster-dir options for details. +""" + +#----------------------------------------------------------------------------- +# Command line options +#----------------------------------------------------------------------------- + + +class IPEngineAppConfigLoader(ClusterDirConfigLoader): + + def _add_arguments(self): + super(IPEngineAppConfigLoader, self)._add_arguments() + paa = self.parser.add_argument + # Controller config + paa('--url-file', + type=unicode, dest='Global.url_file', + help='The full location of the file containing the FURL of the ' + 'controller. If this is not given, the FURL file must be in the ' + 'security directory of the cluster directory. This location is ' + 'resolved using the --profile and --app-dir options.', + metavar='Global.url_file') + # MPI + paa('--mpi', + type=str, dest='MPI.use', + help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).', + metavar='MPI.use') + # Global config + paa('--log-to-file', + action='store_true', dest='Global.log_to_file', + help='Log to a file in the log directory (default is stdout)') + paa('--log-url', + dest='Global.log_url', + help="url of ZMQ logger, as started with iploggerz") + paa('--execkey', + type=str, dest='Global.exec_key', + help='path to a file containing an execution key.', + metavar='keyfile') + paa('--no-secure', + action='store_false', dest='Global.secure', + help='Turn off execution keys.') + paa('--secure', + action='store_true', dest='Global.secure', + help='Turn on execution keys (default).') + # init command + paa('-c', + type=str, dest='Global.extra_exec_lines', + help='specify a command to be run at startup') + + factory.add_session_arguments(self.parser) + factory.add_registration_arguments(self.parser) + + +#----------------------------------------------------------------------------- +# Main application +#----------------------------------------------------------------------------- + + +class IPEngineApp(ApplicationWithClusterDir): + + name = u'ipenginez' + description = _description + command_line_loader = IPEngineAppConfigLoader + default_config_file_name = default_config_file_name + auto_create_cluster_dir = True + + def create_default_config(self): + super(IPEngineApp, self).create_default_config() + + # The engine should not clean logs as we don't want to remove the + # active log files of other running engines. + self.default_config.Global.clean_logs = False + self.default_config.Global.secure = True + + # Global config attributes + self.default_config.Global.exec_lines = [] + self.default_config.Global.extra_exec_lines = '' + + # Configuration related to the controller + # This must match the filename (path not included) that the controller + # used for the FURL file. + self.default_config.Global.url = u'tcp://localhost:10101' + # If given, this is the actual location of the controller's FURL file. + # If not, this is computed using the profile, app_dir and furl_file_name + self.default_config.Global.key_file_name = u'exec_key.key' + self.default_config.Global.key_file = u'' + + # MPI related config attributes + self.default_config.MPI.use = '' + self.default_config.MPI.mpi4py = mpi4py_init + self.default_config.MPI.pytrilinos = pytrilinos_init + + def post_load_command_line_config(self): + pass + + def pre_construct(self): + super(IPEngineApp, self).pre_construct() + # self.find_cont_url_file() + self.find_key_file() + if self.master_config.Global.extra_exec_lines: + self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines) + + def find_key_file(self): + """Set the key file. + + Here we don't try to actually see if it exists for is valid as that + is hadled by the connection logic. + """ + config = self.master_config + # Find the actual controller key file + if not config.Global.key_file: + try_this = os.path.join( + config.Global.cluster_dir, + config.Global.security_dir, + config.Global.key_file_name + ) + config.Global.key_file = try_this + + def construct(self): + # This is the working dir by now. + sys.path.insert(0, '') + config = self.master_config + if os.path.exists(config.Global.key_file) and config.Global.secure: + config.SessionFactory.exec_key = config.Global.key_file + + config.Kernel.exec_lines = config.Global.exec_lines + + self.start_mpi() + + # Create the underlying shell class and EngineService + # shell_class = import_item(self.master_config.Global.shell_class) + try: + self.engine = EngineFactory(config=config) + except: + self.log.error("Couldn't start the Engine", exc_info=True) + self.exit(1) + + self.start_logging() + + # Create the service hierarchy + # self.main_service = service.MultiService() + # self.engine_service.setServiceParent(self.main_service) + # self.tub_service = Tub() + # self.tub_service.setServiceParent(self.main_service) + # # This needs to be called before the connection is initiated + # self.main_service.startService() + + # This initiates the connection to the controller and calls + # register_engine to tell the controller we are ready to do work + # self.engine_connector = EngineConnector(self.tub_service) + + # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file) + + # reactor.callWhenRunning(self.call_connect) + + + def start_logging(self): + super(IPEngineApp, self).start_logging() + if self.master_config.Global.log_url: + context = self.engine.context + lsock = context.socket(zmq.PUB) + lsock.connect(self.master_config.Global.log_url) + handler = EnginePUBHandler(self.engine, lsock) + handler.setLevel(self.log_level) + self.log.addHandler(handler) + + def start_mpi(self): + global mpi + mpikey = self.master_config.MPI.use + mpi_import_statement = self.master_config.MPI.get(mpikey, None) + if mpi_import_statement is not None: + try: + self.log.info("Initializing MPI:") + self.log.info(mpi_import_statement) + exec mpi_import_statement in globals() + except: + mpi = None + else: + mpi = None + + + def start_app(self): + self.engine.start() + try: + self.engine.loop.start() + except KeyboardInterrupt: + self.log.critical("Engine Interrupted, shutting down...\n") + + +def launch_new_instance(): + """Create and run the IPython controller""" + app = IPEngineApp() + app.start() + + +if __name__ == '__main__': + launch_new_instance() + diff --git a/IPython/zmq/parallel/iploggerapp.py b/IPython/zmq/parallel/iploggerapp.py new file mode 100755 index 0000000..940ba18 --- /dev/null +++ b/IPython/zmq/parallel/iploggerapp.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +A simple IPython logger application +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import os +import sys + +import zmq + +from IPython.zmq.parallel.clusterdir import ( + ApplicationWithClusterDir, + ClusterDirConfigLoader +) +from IPython.zmq.parallel.logwatcher import LogWatcher + +#----------------------------------------------------------------------------- +# Module level variables +#----------------------------------------------------------------------------- + +#: The default config file name for this application +default_config_file_name = u'iplogger_config.py' + +_description = """Start an IPython logger for parallel computing.\n\n + +IPython controllers and engines (and your own processes) can broadcast log messages +by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The +logger can be configured using command line options or using a cluster +directory. Cluster directories contain config, log and security files and are +usually located in your .ipython directory and named as "cluster_". +See the --profile and --cluster-dir options for details. +""" + +#----------------------------------------------------------------------------- +# Command line options +#----------------------------------------------------------------------------- + + +class IPLoggerAppConfigLoader(ClusterDirConfigLoader): + + def _add_arguments(self): + super(IPLoggerAppConfigLoader, self)._add_arguments() + paa = self.parser.add_argument + # Controller config + paa('--url', + type=str, dest='LogWatcher.url', + help='The url the LogWatcher will listen on', + ) + # MPI + paa('--topics', + type=str, dest='LogWatcher.topics', nargs='+', + help='What topics to subscribe to', + metavar='topics') + # Global config + paa('--log-to-file', + action='store_true', dest='Global.log_to_file', + help='Log to a file in the log directory (default is stdout)') + + +#----------------------------------------------------------------------------- +# Main application +#----------------------------------------------------------------------------- + + +class IPLoggerApp(ApplicationWithClusterDir): + + name = u'iploggerz' + description = _description + command_line_loader = IPLoggerAppConfigLoader + default_config_file_name = default_config_file_name + auto_create_cluster_dir = True + + def create_default_config(self): + super(IPLoggerApp, self).create_default_config() + + # The engine should not clean logs as we don't want to remove the + # active log files of other running engines. + self.default_config.Global.clean_logs = False + + # If given, this is the actual location of the logger's URL file. + # If not, this is computed using the profile, app_dir and furl_file_name + self.default_config.Global.url_file_name = u'iplogger.url' + self.default_config.Global.url_file = u'' + + def post_load_command_line_config(self): + pass + + def pre_construct(self): + super(IPLoggerApp, self).pre_construct() + + def construct(self): + # This is the working dir by now. + sys.path.insert(0, '') + + self.start_logging() + + try: + self.watcher = LogWatcher(config=self.master_config) + except: + self.log.error("Couldn't start the LogWatcher", exc_info=True) + self.exit(1) + + + def start_app(self): + try: + self.watcher.loop.start() + except KeyboardInterrupt: + self.log.critical("Logging Interrupted, shutting down...\n") + + +def launch_new_instance(): + """Create and run the IPython LogWatcher""" + app = IPLoggerApp() + app.start() + + +if __name__ == '__main__': + launch_new_instance() + diff --git a/IPython/zmq/parallel/logwatcher.py b/IPython/zmq/parallel/logwatcher.py new file mode 100644 index 0000000..6f02195 --- /dev/null +++ b/IPython/zmq/parallel/logwatcher.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +"""A simple logger object that consolidates messages incoming from ipclusterz processes.""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + + +import sys +import logging + +import zmq +from zmq.eventloop import ioloop, zmqstream +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import Int, Str, Instance, List + +#----------------------------------------------------------------------------- +# Classes +#----------------------------------------------------------------------------- + + +class LogWatcher(Configurable): + """A simple class that receives messages on a SUB socket, as published + by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. + + This can subscribe to multiple topics, but defaults to all topics. + """ + # configurables + topics = List([''], config=True) + url = Str('tcp://127.0.0.1:20202', config=True) + + # internals + context = Instance(zmq.Context, (), {}) + stream = Instance('zmq.eventloop.zmqstream.ZMQStream') + loop = Instance('zmq.eventloop.ioloop.IOLoop') + def _loop_default(self): + return ioloop.IOLoop.instance() + + def __init__(self, config=None): + super(LogWatcher, self).__init__(config=config) + s = self.context.socket(zmq.SUB) + s.bind(self.url) + self.stream = zmqstream.ZMQStream(s, self.loop) + self.subscribe() + self.on_trait_change(self.subscribe, 'topics') + + self.stream.on_recv(self.log_message) + + def subscribe(self): + """Update our SUB socket's subscriptions.""" + self.stream.setsockopt(zmq.UNSUBSCRIBE, '') + for topic in self.topics: + logging.debug("Subscribing to: %r"%topic) + self.stream.setsockopt(zmq.SUBSCRIBE, topic) + + def _extract_level(self, topic_str): + """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" + topics = topic_str.split('.') + for idx,t in enumerate(topics): + level = getattr(logging, t, None) + if level is not None: + break + + if level is None: + level = logging.INFO + else: + topics.pop(idx) + + return level, '.'.join(topics) + + + def log_message(self, raw): + """receive and parse a message, then log it.""" + if len(raw) != 2 or '.' not in raw[0]: + logging.error("Invalid log message: %s"%raw) + return + else: + topic, msg = raw + # don't newline, since log messages always newline: + topic,level_name = topic.rsplit('.',1) + level,topic = self._extract_level(topic) + if msg[-1] == '\n': + msg = msg[:-1] + logging.log(level, "[%s] %s" % (topic, msg)) + diff --git a/IPython/zmq/parallel/mongodb.py b/IPython/zmq/parallel/mongodb.py index 61e7836..2c8732d 100644 --- a/IPython/zmq/parallel/mongodb.py +++ b/IPython/zmq/parallel/mongodb.py @@ -10,10 +10,11 @@ from datetime import datetime from pymongo import Connection +from dictdb import BaseDB #---------------------- # MongoDB class #---------------------- -class MongoDB(object): +class MongoDB(BaseDB): """MongoDB TaskRecord backend.""" def __init__(self, session_uuid, *args, **kwargs): self._connection = Connection(*args, **kwargs) diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 7dc920e..6a8f5ac 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -25,7 +25,7 @@ from zmq.eventloop import ioloop, zmqstream # local imports from IPython.external.decorator import decorator from IPython.config.configurable import Configurable -from IPython.utils.traitlets import Instance +from IPython.utils.traitlets import Instance, Dict, List, Set from client import Client from dependency import Dependency @@ -33,12 +33,10 @@ import streamsession as ss from entry_point import connect_logger, local_logger -logger = logging.getLogger() - @decorator def logged(f,self,*args,**kwargs): # print ("#--------------------") - logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) + logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) # print ("#--") return f(self,*args, **kwargs) @@ -115,7 +113,7 @@ class TaskScheduler(Configurable): """ - # configurables: + # input arguments: scheme = Instance(FunctionType, default=leastload) # function for determining the destination client_stream = Instance(zmqstream.ZMQStream) # client-facing stream engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream @@ -124,30 +122,22 @@ class TaskScheduler(Configurable): io_loop = Instance(ioloop.IOLoop) # internals: - dependencies = None # dict by msg_id of [ msg_ids that depend on key ] - depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) - pending = None # dict by engine_uuid of submitted tasks - completed = None # dict by engine_uuid of completed tasks - clients = None # dict by msg_id for who submitted the task - targets = None # list of target IDENTs - loads = None # list of engine loads - all_done = None # set of all completed tasks - blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency + dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ] + depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) + pending = Dict() # dict by engine_uuid of submitted tasks + completed = Dict() # dict by engine_uuid of completed tasks + clients = Dict() # dict by msg_id for who submitted the task + targets = List() # list of target IDENTs + loads = List() # list of engine loads + all_done = Set() # set of all completed tasks + blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency + session = Instance(ss.StreamSession) def __init__(self, **kwargs): super(TaskScheduler, self).__init__(**kwargs) self.session = ss.StreamSession(username="TaskScheduler") - self.dependencies = {} - self.depending = {} - self.completed = {} - self.pending = {} - self.all_done = set() - self.blacklist = {} - - self.targets = [] - self.loads = [] self.engine_stream.on_recv(self.dispatch_result, copy=False) self._notification_handlers = dict( @@ -155,7 +145,7 @@ class TaskScheduler(Configurable): unregistration_notification = self._unregister_engine ) self.notifier_stream.on_recv(self.dispatch_notification) - logger.info("Scheduler started...%r"%self) + logging.info("Scheduler started...%r"%self) def resume_receiving(self): """Resume accepting jobs.""" @@ -182,7 +172,7 @@ class TaskScheduler(Configurable): try: handler(str(msg['content']['queue'])) except KeyError: - logger.error("task::Invalid notification msg: %s"%msg) + logging.error("task::Invalid notification msg: %s"%msg) @logged def _register_engine(self, uid): @@ -232,7 +222,7 @@ class TaskScheduler(Configurable): try: idents, msg = self.session.feed_identities(raw_msg, copy=False) except Exception as e: - logger.error("task::Invaid msg: %s"%msg) + logging.error("task::Invaid msg: %s"%msg) return # send to monitor @@ -318,7 +308,7 @@ class TaskScheduler(Configurable): try: idents,msg = self.session.feed_identities(raw_msg, copy=False) except Exception as e: - logger.error("task::Invaid result: %s"%msg) + logging.error("task::Invaid result: %s"%msg) return msg = self.session.unpack_message(msg, content=False, copy=False) header = msg['header'] @@ -404,8 +394,6 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle ctx = zmq.Context() loop = ioloop.IOLoop() - scheme = globals().get(scheme) - ins = ZMQStream(ctx.socket(zmq.XREP),loop) ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.XREP),loop) @@ -416,6 +404,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle nots.setsockopt(zmq.SUBSCRIBE, '') nots.connect(not_addr) + scheme = globals().get(scheme, None) # setup logging if log_addr: connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel) @@ -426,7 +415,10 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle mon_stream=mons,notifier_stream=nots, scheme=scheme,io_loop=loop) - loop.start() + try: + loop.start() + except KeyboardInterrupt: + print ("interrupted, exiting...", file=sys.__stderr__) if __name__ == '__main__': diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 4fcad1c..1516257 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -26,69 +26,97 @@ from zmq.eventloop import ioloop, zmqstream # Local imports. from IPython.core import ultratb -from IPython.utils.traitlets import HasTraits, Instance, List, Int +from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str from IPython.zmq.completer import KernelCompleter from IPython.zmq.iostream import OutStream from IPython.zmq.displayhook import DisplayHook - +from factory import SessionFactory from streamsession import StreamSession, Message, extract_header, serialize_object,\ unpack_apply_message, ISO8601, wrap_exception from dependency import UnmetDependency import heartmonitor from client import Client -logger = logging.getLogger() - def printer(*args): pprint(args, stream=sys.__stdout__) + +class _Passer: + """Empty class that implements `send()` that does nothing.""" + def send(self, *args, **kwargs): + pass + send_multipart = send + + #----------------------------------------------------------------------------- # Main kernel class #----------------------------------------------------------------------------- -class Kernel(HasTraits): +class Kernel(SessionFactory): #--------------------------------------------------------------------------- # Kernel interface #--------------------------------------------------------------------------- - - id = Int(-1) - session = Instance(StreamSession) - shell_streams = List() + + # kwargs: + int_id = Int(-1, config=True) + user_ns = Dict(config=True) + exec_lines = List(config=True) + control_stream = Instance(zmqstream.ZMQStream) task_stream = Instance(zmqstream.ZMQStream) iopub_stream = Instance(zmqstream.ZMQStream) - client = Instance(Client) - loop = Instance(ioloop.IOLoop) + client = Instance('IPython.zmq.parallel.client.Client') + + # internals + shell_streams = List() + compiler = Instance(CommandCompiler, (), {}) + completer = Instance(KernelCompleter) + + aborted = Set() + shell_handlers = Dict() + control_handlers = Dict() + + def _set_prefix(self): + self.prefix = "engine.%s"%self.int_id + + def _connect_completer(self): + self.completer = KernelCompleter(self.user_ns) def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) - self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) - self.prefix = 'engine.%s'%self.id - logger.root_topic = self.prefix - self.user_ns = {} - self.history = [] - self.compiler = CommandCompiler() - self.completer = KernelCompleter(self.user_ns) - self.aborted = set() + self._set_prefix() + self._connect_completer() + + self.on_trait_change(self._set_prefix, 'id') + self.on_trait_change(self._connect_completer, 'user_ns') # Build dict of handlers for message types - self.shell_handlers = {} - self.control_handlers = {} for msg_type in ['execute_request', 'complete_request', 'apply_request', 'clear_request']: self.shell_handlers[msg_type] = getattr(self, msg_type) for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): self.control_handlers[msg_type] = getattr(self, msg_type) - + + self._initial_exec_lines() def _wrap_exception(self, method=None): - e_info = dict(engineid=self.identity, method=method) + e_info = dict(engineid=self.ident, method=method) content=wrap_exception(e_info) return content + def _initial_exec_lines(self): + s = _Passer() + content = dict(silent=True, user_variable=[],user_expressions=[]) + for line in self.exec_lines: + logging.debug("executing initialization: %s"%line) + content.update({'code':line}) + msg = self.session.msg('execute_request', content) + self.execute_request(s, [], msg) + + #-------------------- control handlers ----------------------------- def abort_queues(self): for stream in self.shell_streams: @@ -112,8 +140,8 @@ class Kernel(HasTraits): # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." # msg = self.reply_socket.recv_json() - logger.info("Aborting:") - logger.info(str(msg)) + logging.info("Aborting:") + logging.info(str(msg)) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) @@ -121,7 +149,7 @@ class Kernel(HasTraits): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents)[0] - logger.debug(str(reply_msg)) + logging.debug(str(reply_msg)) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.05) @@ -139,7 +167,7 @@ class Kernel(HasTraits): content = dict(status='ok') reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, ident=ident)[0] - logger(Message(reply_msg), file=sys.__stdout__) + logging.debug(str(reply_msg)) def shutdown_request(self, stream, ident, parent): """kill ourself. This should really be handled in an external process""" @@ -164,7 +192,7 @@ class Kernel(HasTraits): try: msg = self.session.unpack_message(msg, content=True, copy=False) except: - logger.error("Invalid Message", exc_info=True) + logging.error("Invalid Message", exc_info=True) return header = msg['header'] @@ -172,7 +200,7 @@ class Kernel(HasTraits): handler = self.control_handlers.get(msg['msg_type'], None) if handler is None: - logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) + logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) else: handler(self.control_stream, idents, msg) @@ -210,15 +238,15 @@ class Kernel(HasTraits): self.user_ns = {} msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, content = dict(status='ok')) + self._initial_exec_lines() def execute_request(self, stream, ident, parent): + logging.debug('execute request %s'%parent) try: code = parent[u'content'][u'code'] except: - logger.error("Got bad msg: %s"%parent, exc_info=True) + logging.error("Got bad msg: %s"%parent, exc_info=True) return - # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - # self.iopub_stream.send(pyin_msg) self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, ident='%s.pyin'%self.prefix) started = datetime.now().strftime(ISO8601) @@ -243,7 +271,7 @@ class Kernel(HasTraits): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident, subheader = dict(started=started)) - logger.debug(str(reply_msg)) + logging.debug(str(reply_msg)) if reply_msg['content']['status'] == u'error': self.abort_queues() @@ -265,12 +293,12 @@ class Kernel(HasTraits): msg_id = parent['header']['msg_id'] bound = content.get('bound', False) except: - logger.error("Got bad msg: %s"%parent, exc_info=True) + logging.error("Got bad msg: %s"%parent, exc_info=True) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) - sub = {'dependencies_met' : True, 'engine' : self.identity, + sub = {'dependencies_met' : True, 'engine' : self.ident, 'started': datetime.now().strftime(ISO8601)} try: # allow for not overriding displayhook @@ -341,7 +369,7 @@ class Kernel(HasTraits): try: msg = self.session.unpack_message(msg, content=True, copy=False) except: - logger.error("Invalid Message", exc_info=True) + logging.error("Invalid Message", exc_info=True) return @@ -356,7 +384,7 @@ class Kernel(HasTraits): return handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: - logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) + logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) else: handler(stream, idents, msg) @@ -372,8 +400,9 @@ class Kernel(HasTraits): return dispatcher for s in self.shell_streams: + # s.on_recv(printer) s.on_recv(make_dispatcher(s), copy=False) - s.on_err(printer) + # s.on_err(printer) if self.iopub_stream: self.iopub_stream.on_err(printer) @@ -403,7 +432,7 @@ class Kernel(HasTraits): def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=None, context=None, key=None, out_stream_factory=OutStream, display_hook_factory=DisplayHook): - + """NO LONGER IN USE""" # create loop, context, and session: if loop is None: loop = ioloop.IOLoop.instance() @@ -453,7 +482,7 @@ def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addr else: client = None - kernel = Kernel(id=int_id, session=session, control_stream=control_stream, + kernel = Kernel(id=int_id, session=session, control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, client=client, loop=loop) kernel.start() diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index e012a64..2df2a93 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -51,12 +51,18 @@ def squash_unicode(obj): obj = obj.encode('utf8') return obj +json_packer = jsonapi.dumps +json_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) + +pickle_packer = lambda o: pickle.dumps(o,-1) +pickle_unpacker = pickle.loads + if use_json: - default_packer = jsonapi.dumps - default_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) + default_packer = json_packer + default_unpacker = json_unpacker else: - default_packer = lambda o: pickle.dumps(o,-1) - default_unpacker = pickle.loads + default_packer = pickle_packer + default_unpacker = pickle_unpacker DELIM="" diff --git a/setup.py b/setup.py index 2d83b9a..53f2144 100755 --- a/setup.py +++ b/setup.py @@ -215,8 +215,9 @@ if 'setuptools' in sys.modules: 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance', 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main', 'pycolor = IPython.utils.PyColorize:main', - 'ipcontrollerz = IPython.zmq.parallel.controller:main', - 'ipenginez = IPython.zmq.parallel.engine:main', + 'ipcontrollerz = IPython.zmq.parallel.ipcontrollerapp:launch_new_instance', + 'ipenginez = IPython.zmq.parallel.ipengineapp:launch_new_instance', + 'iploggerz = IPython.zmq.parallel.iploggerapp:launch_new_instance', 'ipclusterz = IPython.zmq.parallel.ipcluster:main', 'iptest = IPython.testing.iptest:main', 'irunner = IPython.lib.irunner:main'