From cc803e0b923f30706fff9392d69bb4e6d5b7bbf7 2009-10-31 18:42:44 From: Brian Granger Date: 2009-10-31 18:42:44 Subject: [PATCH] Most of the new ipcluster is now working, including a nice client. --- diff --git a/IPython/config/default/ipcluster_config.py b/IPython/config/default/ipcluster_config.py index d4ee051..988c0de 100644 --- a/IPython/config/default/ipcluster_config.py +++ b/IPython/config/default/ipcluster_config.py @@ -16,6 +16,7 @@ c = get_config() # c.Global.log_to_file = False # c.Global.n = 2 # c.Global.reset_config = False +# c.Global.clean_logs = True # c.MPIExecLauncher.mpi_cmd = ['mpiexec'] # c.MPIExecLauncher.mpi_args = [] diff --git a/IPython/config/default/ipcontroller_config.py b/IPython/config/default/ipcontroller_config.py index 0ee2548..cf41bca 100644 --- a/IPython/config/default/ipcontroller_config.py +++ b/IPython/config/default/ipcontroller_config.py @@ -8,15 +8,11 @@ c = get_config() # Basic Global config attributes # c.Global.log_to_file = False +# c.Global.clean_logs = True # c.Global.import_statements = ['import math'] # c.Global.reuse_furls = True # c.Global.secure = True -# You shouldn't have to modify these -# c.Global.log_dir_name = 'log' -# c.Global.security_dir_name = 'security' - - #----------------------------------------------------------------------------- # Configure the client services #----------------------------------------------------------------------------- diff --git a/IPython/config/default/ipengine_config.py b/IPython/config/default/ipengine_config.py index ed35df9..9595e98 100644 --- a/IPython/config/default/ipengine_config.py +++ b/IPython/config/default/ipengine_config.py @@ -1,13 +1,17 @@ c = get_config() # c.Global.log_to_file = False +# c.Global.clean_logs = False # c.Global.exec_lines = ['import numpy'] -# c.Global.log_dir_name = 'log' -# c.Global.security_dir_name = 'security' # c.Global.log_level = 10 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter' # c.Global.furl_file_name = 'ipcontroller-engine.furl' # c.Global.furl_file = '' +# The max number of connection attemps and the initial delay between +# those attemps. +# c.Global.connect_delay = 0.1 +# c.Global.connect_max_tries = 15 + # c.MPI.use = '' # c.MPI.mpi4py = """from mpi4py import MPI as mpi diff --git a/IPython/config/profile/ipython_config_cluster.py b/IPython/config/profile/ipython_config_cluster.py new file mode 100644 index 0000000..3273396 --- /dev/null +++ b/IPython/config/profile/ipython_config_cluster.py @@ -0,0 +1,17 @@ +c = get_config() + +# This can be used at any point in a config file to load a sub config +# and merge it into the current one. +load_subconfig('ipython_config.py') + +lines = """ +from IPython.kernel.client import * +""" + +# You have to make sure that attributes that are containers already +# exist before using them. Simple assigning a new list will override +# all previous values. +if hasattr(c.Global, 'exec_lines'): + c.Global.exec_lines.append(lines) +else: + c.Global.exec_lines = [lines] \ No newline at end of file diff --git a/IPython/kernel/asyncclient.py b/IPython/kernel/asyncclient.py index 586202b..b59058c 100644 --- a/IPython/kernel/asyncclient.py +++ b/IPython/kernel/asyncclient.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # encoding: utf-8 """Asynchronous clients for the IPython controller. @@ -9,32 +10,32 @@ deferreds to the result. The main methods are are `get_*_client` and `get_client`. """ - -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- from IPython.kernel import codeutil -from IPython.kernel.clientconnector import ClientConnector +from IPython.kernel.clientconnector import ( + AsyncClientConnector, + AsyncCluster +) # Other things that the user will need from IPython.kernel.task import MapTask, StringTask from IPython.kernel.error import CompositeError -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Code -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -_client_tub = ClientConnector() +_client_tub = AsyncClientConnector() get_multiengine_client = _client_tub.get_multiengine_client get_task_client = _client_tub.get_task_client get_client = _client_tub.get_client diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py index 9b2499f..ed432a3 100644 --- a/IPython/kernel/client.py +++ b/IPython/kernel/client.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # encoding: utf-8 """This module contains blocking clients for the controller interfaces. @@ -15,27 +16,30 @@ The main classes in this module are: * CompositeError """ -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- +from cStringIO import StringIO import sys +import warnings # from IPython.utils import growl # growl.start("IPython1 Client") from twisted.internet import reactor -from IPython.kernel.clientconnector import ClientConnector +from twisted.internet.error import PotentialZombieWarning +from twisted.python import log + +from IPython.kernel.clientconnector import ClientConnector, Cluster from IPython.kernel.twistedutil import ReactorInThread from IPython.kernel.twistedutil import blockingCallFromThread @@ -51,46 +55,33 @@ from IPython.kernel.error import CompositeError # Code #------------------------------------------------------------------------------- -_client_tub = ClientConnector() - - -def get_multiengine_client(furl_or_file=''): - """Get the blocking MultiEngine client. - - :Parameters: - furl_or_file : str - A furl or a filename containing a furl. If empty, the - default furl_file will be used - - :Returns: - The connected MultiEngineClient instance - """ - client = blockingCallFromThread(_client_tub.get_multiengine_client, - furl_or_file) - return client.adapt_to_blocking_client() - -def get_task_client(furl_or_file=''): - """Get the blocking Task client. - - :Parameters: - furl_or_file : str - A furl or a filename containing a furl. If empty, the - default furl_file will be used - - :Returns: - The connected TaskClient instance - """ - client = blockingCallFromThread(_client_tub.get_task_client, - furl_or_file) - return client.adapt_to_blocking_client() +warnings.simplefilter('ignore', PotentialZombieWarning) +_client_tub = ClientConnector() +get_multiengine_client = _client_tub.get_multiengine_client +get_task_client = _client_tub.get_task_client MultiEngineClient = get_multiengine_client TaskClient = get_task_client - +twisted_log = StringIO() +log.startLogging(sys.stdout, setStdout=0) # Now we start the reactor in a thread rit = ReactorInThread() rit.setDaemon(True) -rit.start() \ No newline at end of file +rit.start() + + + + +__all__ = [ + 'MapTask', + 'StringTask', + 'MultiEngineClient', + 'TaskClient', + 'CompositeError', + 'get_task_client', + 'get_multiengine_client', + 'Cluster' +] diff --git a/IPython/kernel/clientconnector.py b/IPython/kernel/clientconnector.py index 5d90467..f7833f3 100644 --- a/IPython/kernel/clientconnector.py +++ b/IPython/kernel/clientconnector.py @@ -1,142 +1,229 @@ +#!/usr/bin/env python # encoding: utf-8 -"""A class for handling client connections to the controller.""" +"""Facilities for handling client connections to the controller.""" -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -from twisted.internet import defer +import os -from IPython.kernel.fcutil import Tub, UnauthenticatedTub +from IPython.kernel.fcutil import Tub, find_furl +from IPython.kernel.clusterdir import ClusterDir, ClusterDirError +from IPython.kernel.launcher import IPClusterLauncher +from IPython.kernel.twistedutil import gatherBoth, make_deferred +from IPython.kernel.twistedutil import blockingCallFromThread -from IPython.kernel.config import config_manager as kernel_config_manager from IPython.utils.importstring import import_item -from IPython.kernel.fcutil import find_furl +from IPython.utils.genutils import get_ipython_dir -co = kernel_config_manager.get_config_obj() -client_co = co['client'] +from twisted.internet import defer +from twisted.python import failure -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # The ClientConnector class -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -class ClientConnector(object): - """ - This class gets remote references from furls and returns the wrapped clients. - - This class is also used in `client.py` and `asyncclient.py` to create - a single per client-process Tub. + +class AsyncClientConnector(object): + """A class for getting remote references and clients from furls. + + This start a single :class:`Tub` for all remote reference and caches + references. """ - + def __init__(self): self._remote_refs = {} self.tub = Tub() self.tub.startService() - - def get_reference(self, furl_or_file): + + def _find_furl(self, profile='default', cluster_dir=None, + furl_or_file=None, furl_file_name=None, + ipythondir=None): + """Find a FURL file by profile+ipythondir or cluster dir. + + This raises an exception if a FURL file can't be found. """ - Get a remote reference using a furl or a file containing a furl. - + # Try by furl_or_file + if furl_or_file is not None: + try: + furl = find_furl(furl_or_file) + except ValueError: + return furl + + if furl_file_name is None: + raise ValueError('A furl_file_name must be provided') + + # Try by cluster_dir + if cluster_dir is not None: + cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) + sdir = cluster_dir_obj.security_dir + furl_file = os.path.join(sdir, furl_file_name) + return find_furl(furl_file) + + # Try by profile + if ipythondir is None: + ipythondir = get_ipython_dir() + if profile is not None: + cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( + ipythondir, profile) + sdir = cluster_dir_obj.security_dir + furl_file = os.path.join(sdir, furl_file_name) + return find_furl(furl_file) + + raise ValueError('Could not find a valid FURL file.') + + def get_reference(self, furl_or_file): + """Get a remote reference using a furl or a file containing a furl. + Remote references are cached locally so once a remote reference has been retrieved for a given furl, the cached version is returned. - - :Parameters: - furl_or_file : str - A furl or a filename containing a furl - - :Returns: - A deferred to a remote reference + + Parameters + ---------- + furl_or_file : str + A furl or a filename containing a furl + + Returns + ------- + A deferred to a remote reference """ furl = find_furl(furl_or_file) if furl in self._remote_refs: d = defer.succeed(self._remote_refs[furl]) else: d = self.tub.getReference(furl) - d.addCallback(self.save_ref, furl) + d.addCallback(self._save_ref, furl) return d - def save_ref(self, ref, furl): - """ - Cache a remote reference by its furl. - """ + def _save_ref(self, ref, furl): + """Cache a remote reference by its furl.""" self._remote_refs[furl] = ref return ref - def get_task_client(self, furl_or_file=''): - """ - Get the task controller client. + def get_task_client(self, profile='default', cluster_dir=None, + furl_or_file=None, ipythondir=None): + """Get the task controller client. - This method is a simple wrapper around `get_client` that allow - `furl_or_file` to be empty, in which case, the furls is taken - from the default furl file given in the configuration. + This method is a simple wrapper around `get_client` that passes in + the default name of the task client FURL file. Usually only + the ``profile`` option will be needed. If a FURL file can't be + found by its profile, use ``cluster_dir`` or ``furl_or_file``. - :Parameters: - furl_or_file : str - A furl or a filename containing a furl. If empty, the - default furl_file will be used - - :Returns: - A deferred to the actual client class - """ - task_co = client_co['client_interfaces']['task'] - if furl_or_file: - ff = furl_or_file - else: - ff = task_co['furl_file'] - return self.get_client(ff) + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. - def get_multiengine_client(self, furl_or_file=''): + Returns + ------- + A deferred to the actual client class. """ - Get the multiengine controller client. + return self.get_client( + profile, cluster_dir, furl_or_file, + 'ipcontroller-tc.furl', ipythondir + ) + + def get_multiengine_client(self, profile='default', cluster_dir=None, + furl_or_file=None, ipythondir=None): + """Get the multiengine controller client. - This method is a simple wrapper around `get_client` that allow - `furl_or_file` to be empty, in which case, the furls is taken - from the default furl file given in the configuration. + This method is a simple wrapper around `get_client` that passes in + the default name of the task client FURL file. Usually only + the ``profile`` option will be needed. If a FURL file can't be + found by its profile, use ``cluster_dir`` or ``furl_or_file``. - :Parameters: - furl_or_file : str - A furl or a filename containing a furl. If empty, the - default furl_file will be used + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. - :Returns: - A deferred to the actual client class + Returns + ------- + A deferred to the actual client class. """ - task_co = client_co['client_interfaces']['multiengine'] - if furl_or_file: - ff = furl_or_file - else: - ff = task_co['furl_file'] - return self.get_client(ff) + return self.get_client( + profile, cluster_dir, furl_or_file, + 'ipcontroller-mec.furl', ipythondir + ) - def get_client(self, furl_or_file): - """ - Get a remote reference and wrap it in a client by furl. - - This method first gets a remote reference and then calls its - `get_client_name` method to find the apprpriate client class - that should be used to wrap the remote reference. - - :Parameters: - furl_or_file : str - A furl or a filename containing a furl + def get_client(self, profile='default', cluster_dir=None, + furl_or_file=None, furl_file_name=None, ipythondir=None): + """Get a remote reference and wrap it in a client by furl. + + This method is a simple wrapper around `get_client` that passes in + the default name of the task client FURL file. Usually only + the ``profile`` option will be needed. If a FURL file can't be + found by its profile, use ``cluster_dir`` or ``furl_or_file``. - :Returns: - A deferred to the actual client class + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + furl_file_name : str + The filename (not the full path) of the FURL. This must be + provided if ``furl_or_file`` is not. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. + + Returns + ------- + A deferred to the actual client class. """ - furl = find_furl(furl_or_file) + try: + furl = self._find_furl( + profile, cluster_dir, furl_or_file, + furl_file_name, ipythondir + ) + except: + return defer.fail(failure.Failure()) + d = self.get_reference(furl) - def wrap_remote_reference(rr): + + def _wrap_remote_reference(rr): d = rr.callRemote('get_client_name') d.addCallback(lambda name: import_item(name)) def adapt(client_interface): @@ -146,5 +233,352 @@ class ClientConnector(object): d.addCallback(adapt) return d - d.addCallback(wrap_remote_reference) + + d.addCallback(_wrap_remote_reference) return d + + +class ClientConnector(object): + """A blocking version of a client connector. + + This class creates a single :class:`Tub` instance and allows remote + references and client to be retrieved by their FURLs. Remote references + are cached locally and FURL files can be found using profiles and cluster + directories. + """ + + def __init__(self): + self.async_cc = AsyncClientConnector() + + def get_task_client(self, profile='default', cluster_dir=None, + furl_or_file=None, ipythondir=None): + """Get the task client. + + Usually only the ``profile`` option will be needed. If a FURL file + can't be found by its profile, use ``cluster_dir`` or + ``furl_or_file``. + + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. + + Returns + ------- + The task client instance. + """ + client = blockingCallFromThread( + self.async_cc.get_task_client, profile, cluster_dir, + furl_or_file, ipythondir + ) + return client.adapt_to_blocking_client() + + def get_multiengine_client(self, profile='default', cluster_dir=None, + furl_or_file=None, ipythondir=None): + """Get the multiengine client. + + Usually only the ``profile`` option will be needed. If a FURL file + can't be found by its profile, use ``cluster_dir`` or + ``furl_or_file``. + + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. + + Returns + ------- + The multiengine client instance. + """ + client = blockingCallFromThread( + self.async_cc.get_multiengine_client, profile, cluster_dir, + furl_or_file, ipythondir + ) + return client.adapt_to_blocking_client() + + def get_client(self, profile='default', cluster_dir=None, + furl_or_file=None, ipythondir=None): + client = blockingCallFromThread( + self.async_cc.get_client, profile, cluster_dir, + furl_or_file, ipythondir + ) + return client.adapt_to_blocking_client() + + +class ClusterStateError(Exception): + pass + + +class AsyncCluster(object): + """An class that wraps the :command:`ipcluster` script.""" + + def __init__(self, profile='default', cluster_dir=None, ipythondir=None, + auto_create=False, auto_stop=True): + """Create a class to manage an IPython cluster. + + This class calls the :command:`ipcluster` command with the right + options to start an IPython cluster. Typically a cluster directory + must be created (:command:`ipcluster create`) and configured before + using this class. Configuration is done by editing the + configuration files in the top level of the cluster directory. + + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. + auto_create : bool + Automatically create the cluster directory it is dones't exist. + This will usually only make sense if using a local cluster + (default=False). + auto_stop : bool + Automatically stop the cluster when this instance is garbage + collected (default=True). This is useful if you want the cluster + to live beyond your current process. There is also an instance + attribute ``auto_stop`` to change this behavior. + """ + self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create) + self.state = 'before' + self.launcher = None + self.client_connector = None + self.auto_stop = auto_stop + + def __del__(self): + if self.auto_stop and self.state=='running': + print "Auto stopping the cluster..." + self.stop() + + @property + def location(self): + if hasattr(self, 'cluster_dir_obj'): + return self.cluster_dir_obj.location + else: + return '' + + @property + def running(self): + if self.state=='running': + return True + else: + return False + + def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create): + if ipythondir is None: + ipythondir = get_ipython_dir() + if cluster_dir is not None: + try: + self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) + except ClusterDirError: + pass + if profile is not None: + try: + self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile( + ipythondir, profile) + except ClusterDirError: + pass + if auto_create or profile=='default': + # This should call 'ipcluster create --profile default + self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile( + ipythondir, profile) + else: + raise ClusterDirError('Cluster dir not found.') + + @make_deferred + def start(self, n=2): + """Start the IPython cluster with n engines. + + Parameters + ---------- + n : int + The number of engine to start. + """ + # We might want to add logic to test if the cluster has started + # by another process.... + if not self.state=='running': + self.launcher = IPClusterLauncher(os.getcwd()) + self.launcher.ipcluster_n = n + self.launcher.ipcluster_subcommand = 'start' + d = self.launcher.start() + d.addCallback(self._handle_start) + return d + else: + raise ClusterStateError('Cluster is already running') + + @make_deferred + def stop(self): + """Stop the IPython cluster if it is running.""" + if self.state=='running': + d1 = self.launcher.observe_stop() + d1.addCallback(self._handle_stop) + d2 = self.launcher.stop() + return gatherBoth([d1, d2], consumeErrors=True) + else: + raise ClusterStateError("Cluster not running") + + def get_multiengine_client(self): + """Get the multiengine client for the running cluster. + + If this fails, it means that the cluster has not finished starting. + Usually waiting a few seconds are re-trying will solve this. + """ + if self.client_connector is None: + self.client_connector = AsyncClientConnector() + return self.client_connector.get_multiengine_client( + cluster_dir=self.cluster_dir_obj.location + ) + + def get_task_client(self): + """Get the task client for the running cluster. + + If this fails, it means that the cluster has not finished starting. + Usually waiting a few seconds are re-trying will solve this. + """ + if self.client_connector is None: + self.client_connector = AsyncClientConnector() + return self.client_connector.get_task_client( + cluster_dir=self.cluster_dir_obj.location + ) + + def _handle_start(self, r): + self.state = 'running' + + def _handle_stop(self, r): + self.state = 'after' + + +class Cluster(object): + + + def __init__(self, profile='default', cluster_dir=None, ipythondir=None, + auto_create=False, auto_stop=True): + """Create a class to manage an IPython cluster. + + This class calls the :command:`ipcluster` command with the right + options to start an IPython cluster. Typically a cluster directory + must be created (:command:`ipcluster create`) and configured before + using this class. Configuration is done by editing the + configuration files in the top level of the cluster directory. + + Parameters + ---------- + profile : str + The name of a cluster directory profile (default="default"). The + cluster directory "cluster_" will be searched for + in ``os.getcwd()``, the ipythondir and then in the directories + listed in the :env:`IPCLUSTERDIR_PATH` environment variable. + cluster_dir : str + The full path to a cluster directory. This is useful if profiles + are not being used. + furl_or_file : str + A furl or a filename containing a FURLK. This is useful if you + simply know the location of the FURL file. + ipythondir : str + The location of the ipythondir if different from the default. + This is used if the cluster directory is being found by profile. + auto_create : bool + Automatically create the cluster directory it is dones't exist. + This will usually only make sense if using a local cluster + (default=False). + auto_stop : bool + Automatically stop the cluster when this instance is garbage + collected (default=True). This is useful if you want the cluster + to live beyond your current process. There is also an instance + attribute ``auto_stop`` to change this behavior. + """ + self.async_cluster = AsyncCluster( + profile, cluster_dir, ipythondir, auto_create, auto_stop + ) + self.cluster_dir_obj = self.async_cluster.cluster_dir_obj + self.client_connector = None + + def _set_auto_stop(self, value): + self.async_cluster.auto_stop = value + + def _get_auto_stop(self): + return self.async_cluster.auto_stop + + auto_stop = property(_get_auto_stop, _set_auto_stop) + + @property + def location(self): + return self.async_cluster.location + + @property + def running(self): + return self.async_cluster.running + + def start(self, n=2): + """Start the IPython cluster with n engines. + + Parameters + ---------- + n : int + The number of engine to start. + """ + return blockingCallFromThread(self.async_cluster.start, n) + + def stop(self): + """Stop the IPython cluster if it is running.""" + return blockingCallFromThread(self.async_cluster.stop) + + def get_multiengine_client(self): + """Get the multiengine client for the running cluster. + + If this fails, it means that the cluster has not finished starting. + Usually waiting a few seconds are re-trying will solve this. + """ + if self.client_connector is None: + self.client_connector = ClientConnector() + return self.client_connector.get_multiengine_client( + cluster_dir=self.cluster_dir_obj.location + ) + + def get_task_client(self): + """Get the task client for the running cluster. + + If this fails, it means that the cluster has not finished starting. + Usually waiting a few seconds are re-trying will solve this. + """ + if self.client_connector is None: + self.client_connector = ClientConnector() + return self.client_connector.get_task_client( + cluster_dir=self.cluster_dir_obj.location + ) + + + diff --git a/IPython/kernel/clusterdir.py b/IPython/kernel/clusterdir.py index 25b56d3..9c54141 100644 --- a/IPython/kernel/clusterdir.py +++ b/IPython/kernel/clusterdir.py @@ -17,6 +17,9 @@ The IPython cluster directory import os import shutil +import sys + +from twisted.python import log from IPython.core import release from IPython.config.loader import PyFileConfigLoader @@ -210,7 +213,8 @@ class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader): dest='Global.ipythondir',type=str, help='Set to override default location of Global.ipythondir.', default=NoConfigDefault, - metavar='Global.ipythondir') + metavar='Global.ipythondir' + ) self.parser.add_argument('-p','-profile', '--profile', dest='Global.profile',type=str, help='The string name of the profile to be used. This determines ' @@ -218,19 +222,31 @@ class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader): 'is named "default". The cluster directory is resolve this way ' 'if the --cluster-dir option is not used.', default=NoConfigDefault, - metavar='Global.profile') + metavar='Global.profile' + ) self.parser.add_argument('-log_level', '--log-level', dest="Global.log_level",type=int, help='Set the log level (0,10,20,30,40,50). Default is 30.', default=NoConfigDefault, - metavar="Global.log_level") + metavar="Global.log_level" + ) self.parser.add_argument('-cluster_dir', '--cluster-dir', dest='Global.cluster_dir',type=str, help='Set the cluster dir. This overrides the logic used by the ' '--profile option.', default=NoConfigDefault, - metavar='Global.cluster_dir') - + metavar='Global.cluster_dir' + ) + self.parser.add_argument('-clean_logs', '--clean-logs', + dest='Global.clean_logs', action='store_true', + help='Delete old log flies before starting.', + default=NoConfigDefault + ) + self.parser.add_argument('-noclean_logs', '--no-clean-logs', + dest='Global.clean_logs', action='store_false', + help="Don't Delete old log flies before starting.", + default=NoConfigDefault + ) class ApplicationWithClusterDir(Application): """An application that puts everything into a cluster directory. @@ -257,6 +273,8 @@ class ApplicationWithClusterDir(Application): super(ApplicationWithClusterDir, self).create_default_config() self.default_config.Global.profile = 'default' self.default_config.Global.cluster_dir = '' + self.default_config.Global.log_to_file = False + self.default_config.Global.clean_logs = False def create_command_line_config(self): """Create and return a command line config loader.""" @@ -349,4 +367,28 @@ class ApplicationWithClusterDir(Application): # Set the search path to the cluster directory self.config_file_paths = (self.cluster_dir,) - + def pre_construct(self): + # The log and security dirs were set earlier, but here we put them + # into the config and log them. + config = self.master_config + sdir = self.cluster_dir_obj.security_dir + self.security_dir = config.Global.security_dir = sdir + ldir = self.cluster_dir_obj.log_dir + self.log_dir = config.Global.log_dir = ldir + self.log.info("Cluster directory set to: %s" % self.cluster_dir) + + def start_logging(self): + # Remove old log files + if self.master_config.Global.clean_logs: + log_dir = self.master_config.Global.log_dir + for f in os.listdir(log_dir): + if f.startswith(self.name + '-') and f.endswith('.log'): + os.remove(os.path.join(log_dir, f)) + # Start logging to the new log file + if self.master_config.Global.log_to_file: + log_filename = self.name + '-' + str(os.getpid()) + '.log' + logfile = os.path.join(self.log_dir, log_filename) + open_log_file = open(logfile, 'w') + else: + open_log_file = sys.stdout + log.startLogging(open_log_file) diff --git a/IPython/kernel/engineconnector.py b/IPython/kernel/engineconnector.py index 31b3c81..eea93c3 100644 --- a/IPython/kernel/engineconnector.py +++ b/IPython/kernel/engineconnector.py @@ -1,19 +1,18 @@ +#!/usr/bin/env python # encoding: utf-8 """A class that manages the engines connection to the controller.""" -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- import os import cPickle as pickle @@ -26,9 +25,9 @@ from IPython.kernel.fcutil import find_furl from IPython.kernel.enginefc import IFCEngine from IPython.kernel.twistedutil import sleep_deferred -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # The ClientConnector class -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- class EngineConnectorError(Exception): diff --git a/IPython/kernel/ipclusterapp.py b/IPython/kernel/ipclusterapp.py index 3401375..6c6360e 100644 --- a/IPython/kernel/ipclusterapp.py +++ b/IPython/kernel/ipclusterapp.py @@ -122,7 +122,16 @@ class IPClusterCLLoader(ArgParseConfigLoader): help='The number of engines to start.', metavar='Global.n' ) - + parser_start.add_argument('-clean_logs', '--clean-logs', + dest='Global.clean_logs', action='store_true', + help='Delete old log flies before starting.', + default=NoConfigDefault + ) + parser_start.add_argument('-noclean_logs', '--no-clean-logs', + dest='Global.clean_logs', action='store_false', + help="Don't delete old log flies before starting.", + default=NoConfigDefault + ) default_config_file_name = 'ipcluster_config.py' @@ -141,9 +150,9 @@ class IPClusterApp(ApplicationWithClusterDir): 'IPython.kernel.launcher.LocalControllerLauncher' self.default_config.Global.engine_launcher = \ 'IPython.kernel.launcher.LocalEngineSetLauncher' - self.default_config.Global.log_to_file = False self.default_config.Global.n = 2 self.default_config.Global.reset_config = False + self.default_config.Global.clean_logs = True def create_command_line_config(self): """Create and return a command line config loader.""" @@ -172,6 +181,7 @@ class IPClusterApp(ApplicationWithClusterDir): "'ipcluster create -h' or 'ipcluster list -h' for more " "information about creating and listing cluster dirs." ) + def construct(self): config = self.master_config if config.Global.subcommand=='list': @@ -184,15 +194,21 @@ class IPClusterApp(ApplicationWithClusterDir): self.start_logging() reactor.callWhenRunning(self.start_launchers) - def list_cluster_dirs(self): + def list_cluster_dirs(self): + # Find the search paths cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','') if cluster_dir_paths: cluster_dir_paths = cluster_dir_paths.split(':') else: cluster_dir_paths = [] - # We need to look both in default_config and command_line_config!!! - paths = [os.getcwd(), self.default_config.Global.ipythondir] + \ + try: + ipythondir = self.command_line_config.Global.ipythondir + except AttributeError: + ipythondir = self.default_config.Global.ipythondir + paths = [os.getcwd(), ipythondir] + \ cluster_dir_paths + paths = list(set(paths)) + self.log.info('Searching for cluster dirs in paths: %r' % paths) for path in paths: files = os.listdir(path) @@ -203,15 +219,6 @@ class IPClusterApp(ApplicationWithClusterDir): start_cmd = '"ipcluster start -n 4 -p %s"' % profile print start_cmd + " ==> " + full_path - def start_logging(self): - if self.master_config.Global.log_to_file: - log_filename = self.name + '-' + str(os.getpid()) + '.log' - logfile = os.path.join(self.log_dir, log_filename) - open_log_file = open(logfile, 'w') - else: - open_log_file = sys.stdout - log.startLogging(open_log_file) - def start_launchers(self): config = self.master_config @@ -227,6 +234,7 @@ class IPClusterApp(ApplicationWithClusterDir): # Setup signals signal.signal(signal.SIGINT, self.stop_launchers) + # signal.signal(signal.SIGKILL, self.stop_launchers) # Setup the observing of stopping d1 = self.controller_launcher.observe_stop() @@ -261,10 +269,24 @@ class IPClusterApp(ApplicationWithClusterDir): def stop_launchers(self, signum, frame): log.msg("Stopping cluster") d1 = self.engine_launcher.stop() - d1.addCallback(lambda _: self.controller_launcher.stop) + d2 = self.controller_launcher.stop() + # d1.addCallback(lambda _: self.controller_launcher.stop) d1.addErrback(self.err_and_stop) + d2.addErrback(self.err_and_stop) reactor.callLater(2.0, reactor.stop) + def start_logging(self): + # Remove old log files + if self.master_config.Global.clean_logs: + log_dir = self.master_config.Global.log_dir + for f in os.listdir(log_dir): + if f.startswith('ipengine' + '-') and f.endswith('.log'): + os.remove(os.path.join(log_dir, f)) + for f in os.listdir(log_dir): + if f.startswith('ipcontroller' + '-') and f.endswith('.log'): + os.remove(os.path.join(log_dir, f)) + super(IPClusterApp, self).start_logging() + def start_app(self): config = self.master_config if config.Global.subcommand=='create' or config.Global.subcommand=='list': @@ -280,4 +302,5 @@ def launch_new_instance(): if __name__ == '__main__': - launch_new_instance() \ No newline at end of file + launch_new_instance() + diff --git a/IPython/kernel/ipcontrollerapp.py b/IPython/kernel/ipcontrollerapp.py index 6c9ea3c..4ac0954 100644 --- a/IPython/kernel/ipcontrollerapp.py +++ b/IPython/kernel/ipcontrollerapp.py @@ -185,7 +185,7 @@ class IPControllerApp(ApplicationWithClusterDir): self.default_config.Global.reuse_furls = False self.default_config.Global.secure = True self.default_config.Global.import_statements = [] - self.default_config.Global.log_to_file = False + self.default_config.Global.clean_logs = True def create_command_line_config(self): """Create and return a command line config loader.""" @@ -206,18 +206,6 @@ class IPControllerApp(ApplicationWithClusterDir): c.FCEngineServiceFactory.secure = c.Global.secure del c.Global.secure - def pre_construct(self): - # The log and security dirs were set earlier, but here we put them - # into the config and log them. - config = self.master_config - sdir = self.cluster_dir_obj.security_dir - self.security_dir = config.Global.security_dir = sdir - ldir = self.cluster_dir_obj.log_dir - self.log_dir = config.Global.log_dir = ldir - self.log.info("Cluster directory set to: %s" % self.cluster_dir) - self.log.info("Log directory set to: %s" % self.log_dir) - self.log.info("Security directory set to: %s" % self.security_dir) - def construct(self): # I am a little hesitant to put these into InteractiveShell itself. # But that might be the place for them @@ -240,15 +228,6 @@ class IPControllerApp(ApplicationWithClusterDir): engine_service = esfactory.create() engine_service.setServiceParent(self.main_service) - def start_logging(self): - if self.master_config.Global.log_to_file: - log_filename = self.name + '-' + str(os.getpid()) + '.log' - logfile = os.path.join(self.log_dir, log_filename) - open_log_file = open(logfile, 'w') - else: - open_log_file = sys.stdout - log.startLogging(open_log_file) - def import_statements(self): statements = self.master_config.Global.import_statements for s in statements: diff --git a/IPython/kernel/ipengineapp.py b/IPython/kernel/ipengineapp.py index a144795..e9cfa5f 100644 --- a/IPython/kernel/ipengineapp.py +++ b/IPython/kernel/ipengineapp.py @@ -97,12 +97,12 @@ class IPEngineApp(ApplicationWithClusterDir): def create_default_config(self): super(IPEngineApp, self).create_default_config() + # The engine should not clean logs as we don't want to remove the + # active log files of other running engines. + self.default_config.Global.clean_logs = False + # Global config attributes - self.default_config.Global.log_to_file = False self.default_config.Global.exec_lines = [] - # The log and security dir names must match that of the controller - self.default_config.Global.log_dir_name = 'log' - self.default_config.Global.security_dir_name = 'security' self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter' # Configuration related to the controller @@ -113,6 +113,11 @@ class IPEngineApp(ApplicationWithClusterDir): # If not, this is computed using the profile, app_dir and furl_file_name self.default_config.Global.furl_file = '' + # The max number of connection attemps and the initial delay between + # those attemps. + self.default_config.Global.connect_delay = 0.1 + self.default_config.Global.connect_max_tries = 15 + # MPI related config attributes self.default_config.MPI.use = '' self.default_config.MPI.mpi4py = mpi4py_init @@ -129,15 +134,7 @@ class IPEngineApp(ApplicationWithClusterDir): pass def pre_construct(self): - config = self.master_config - sdir = self.cluster_dir_obj.security_dir - self.security_dir = config.Global.security_dir = sdir - ldir = self.cluster_dir_obj.log_dir - self.log_dir = config.Global.log_dir = ldir - self.log.info("Cluster directory set to: %s" % self.cluster_dir) - self.log.info("Log directory set to: %s" % self.log_dir) - self.log.info("Security directory set to: %s" % self.security_dir) - + super(IPEngineApp, self).pre_construct() self.find_cont_furl_file() def find_cont_furl_file(self): @@ -189,7 +186,9 @@ class IPEngineApp(ApplicationWithClusterDir): def call_connect(self): d = self.engine_connector.connect_to_controller( self.engine_service, - self.master_config.Global.furl_file + self.master_config.Global.furl_file, + self.master_config.Global.connect_delay, + self.master_config.Global.connect_max_tries ) def handle_error(f): @@ -216,15 +215,6 @@ class IPEngineApp(ApplicationWithClusterDir): else: mpi = None - def start_logging(self): - if self.master_config.Global.log_to_file: - log_filename = self.name + '-' + str(os.getpid()) + '.log' - logfile = os.path.join(self.log_dir, log_filename) - open_log_file = open(logfile, 'w') - else: - open_log_file = sys.stdout - log.startLogging(open_log_file) - def exec_lines(self): for line in self.master_config.Global.exec_lines: try: diff --git a/IPython/kernel/launcher.py b/IPython/kernel/launcher.py index 9fe0826..1eec79a 100644 --- a/IPython/kernel/launcher.py +++ b/IPython/kernel/launcher.py @@ -475,7 +475,9 @@ def find_engine_cmd(): class LocalEngineLauncher(LocalProcessLauncher): engine_cmd = List(find_engine_cmd()) - engine_args = List(['--log-to-file','--log-level', '40'], config=True) + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) def find_args(self): return self.engine_cmd + self.engine_args @@ -490,7 +492,9 @@ class LocalEngineLauncher(LocalProcessLauncher): class LocalEngineSetLauncher(BaseLauncher): - engine_args = List(['--log-to-file','--log-level', '40'], config=True) + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) def __init__(self, working_dir, parent=None, name=None, config=None): super(LocalEngineSetLauncher, self).__init__( @@ -547,7 +551,9 @@ class LocalEngineSetLauncher(BaseLauncher): class MPIExecEngineSetLauncher(MPIExecLauncher): engine_cmd = List(find_engine_cmd(), config=False) - engine_args = List(['--log-to-file','--log-level', '40'], config=True) + engine_args = List( + ['--log-to-file','--log-level', '40'], config=True + ) n = Int(1, config=True) def start(self, n, profile=None, cluster_dir=None): @@ -582,4 +588,41 @@ class SSHEngineSetLauncher(BaseLauncher): pass +#----------------------------------------------------------------------------- +# A launcher for ipcluster itself! +#----------------------------------------------------------------------------- + + +def find_ipcluster_cmd(): + if sys.platform == 'win32': + # This logic is needed because the ipcluster script doesn't + # always get installed in the same way or in the same location. + from IPython.kernel import ipclusterapp + script_location = ipclusterapp.__file__.replace('.pyc', '.py') + # The -u option here turns on unbuffered output, which is required + # on Win32 to prevent wierd conflict and problems with Twisted. + # Also, use sys.executable to make sure we are picking up the + # right python exe. + cmd = [sys.executable, '-u', script_location] + else: + # ipcontroller has to be on the PATH in this case. + cmd = ['ipcluster'] + return cmd + + +class IPClusterLauncher(LocalProcessLauncher): + + ipcluster_cmd = List(find_ipcluster_cmd()) + ipcluster_args = List( + ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True) + ipcluster_subcommand = Str('start') + ipcluster_n = Int(2) + + def find_args(self): + return self.ipcluster_cmd + [self.ipcluster_subcommand] + \ + ['-n', repr(self.ipcluster_n)] + self.ipcluster_args + + def start(self): + log.msg("Starting ipcluster: %r" % self.args) + return super(IPClusterLauncher, self).start() diff --git a/IPython/kernel/twistedutil.py b/IPython/kernel/twistedutil.py index 76456d7..2d0798a 100644 --- a/IPython/kernel/twistedutil.py +++ b/IPython/kernel/twistedutil.py @@ -3,18 +3,16 @@ """Things directly related to all of twisted.""" -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- import os, sys import threading, Queue, atexit @@ -25,9 +23,9 @@ from twisted.python import log, failure from IPython.kernel.error import FileTimeoutError -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Classes related to twisted and threads -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- class ReactorInThread(threading.Thread):