diff --git a/IPython/kernel/clientconnector.py b/IPython/kernel/clientconnector.py index 034dbd5..c440017 100644 --- a/IPython/kernel/clientconnector.py +++ b/IPython/kernel/clientconnector.py @@ -17,22 +17,39 @@ from __future__ import with_statement import os -from IPython.kernel.fcutil import Tub, find_furl +from IPython.kernel.fcutil import ( + Tub, + find_furl, + is_valid_furl_or_file, + validate_furl_or_file, + FURLError +) from IPython.kernel.clusterdir import ClusterDir, ClusterDirError from IPython.kernel.launcher import IPClusterLauncher -from IPython.kernel.twistedutil import gatherBoth, make_deferred -from IPython.kernel.twistedutil import blockingCallFromThread - +from IPython.kernel.twistedutil import ( + gatherBoth, + make_deferred, + blockingCallFromThread, + sleep_deferred +) from IPython.utils.importstring import import_item from IPython.utils.genutils import get_ipython_dir from twisted.internet import defer -from twisted.python import failure +from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.python import failure, log #----------------------------------------------------------------------------- # The ClientConnector class #----------------------------------------------------------------------------- +DELAY = 0.2 +MAX_TRIES = 9 + + +class ClientConnectorError(Exception): + pass + class AsyncClientConnector(object): """A class for getting remote references and clients from furls. @@ -51,24 +68,24 @@ class AsyncClientConnector(object): ipythondir=None): """Find a FURL file by profile+ipythondir or cluster dir. - This raises an exception if a FURL file can't be found. + This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception + if a FURL file can't be found. """ # Try by furl_or_file if furl_or_file is not None: - try: - furl = find_furl(furl_or_file) - except ValueError: - return furl + validate_furl_or_file(furl_or_file) + return furl_or_file if furl_file_name is None: - raise ValueError('A furl_file_name must be provided') + raise FURLError('A furl_file_name must be provided') # Try by cluster_dir if cluster_dir is not None: cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir) sdir = cluster_dir_obj.security_dir furl_file = os.path.join(sdir, furl_file_name) - return find_furl(furl_file) + validate_furl_or_file(furl_file) + return furl_file # Try by profile if ipythondir is None: @@ -78,9 +95,10 @@ class AsyncClientConnector(object): ipythondir, profile) sdir = cluster_dir_obj.security_dir furl_file = os.path.join(sdir, furl_file_name) - return find_furl(furl_file) + validate_furl_or_file(furl_file) + return furl_file - raise ValueError('Could not find a valid FURL file.') + raise FURLError('Could not find a valid FURL file.') def get_reference(self, furl_or_file): """Get a remote reference using a furl or a file containing a furl. @@ -92,13 +110,14 @@ class AsyncClientConnector(object): Parameters ---------- furl_or_file : str - A furl or a filename containing a furl + A furl or a filename containing a furl. This should already be + validated, but might not yet exist. Returns ------- A deferred to a remote reference """ - furl = find_furl(furl_or_file) + furl = furl_or_file if furl in self._remote_refs: d = defer.succeed(self._remote_refs[furl]) else: @@ -112,7 +131,8 @@ class AsyncClientConnector(object): return ref def get_task_client(self, profile='default', cluster_dir=None, - furl_or_file=None, ipythondir=None): + furl_or_file=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): """Get the task controller client. This method is a simple wrapper around `get_client` that passes in @@ -143,11 +163,13 @@ class AsyncClientConnector(object): """ return self.get_client( profile, cluster_dir, furl_or_file, - 'ipcontroller-tc.furl', ipythondir + 'ipcontroller-tc.furl', ipythondir, + delay, max_tries ) def get_multiengine_client(self, profile='default', cluster_dir=None, - furl_or_file=None, ipythondir=None): + furl_or_file=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): """Get the multiengine controller client. This method is a simple wrapper around `get_client` that passes in @@ -178,11 +200,13 @@ class AsyncClientConnector(object): """ return self.get_client( profile, cluster_dir, furl_or_file, - 'ipcontroller-mec.furl', ipythondir + 'ipcontroller-mec.furl', ipythondir, + delay, max_tries ) def get_client(self, profile='default', cluster_dir=None, - furl_or_file=None, furl_file_name=None, ipythondir=None): + furl_or_file=None, furl_file_name=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): """Get a remote reference and wrap it in a client by furl. This method is a simple wrapper around `get_client` that passes in @@ -201,7 +225,7 @@ class AsyncClientConnector(object): The full path to a cluster directory. This is useful if profiles are not being used. furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you + A furl or a filename containing a FURL. This is useful if you simply know the location of the FURL file. furl_file_name : str The filename (not the full path) of the FURL. This must be @@ -212,18 +236,17 @@ class AsyncClientConnector(object): Returns ------- - A deferred to the actual client class. + A deferred to the actual client class. Or a failure to a + :exc:`FURLError`. """ try: - furl = self._find_furl( + furl_file = self._find_furl( profile, cluster_dir, furl_or_file, furl_file_name, ipythondir ) - except: + except FURLError: return defer.fail(failure.Failure()) - d = self.get_reference(furl) - def _wrap_remote_reference(rr): d = rr.callRemote('get_client_name') d.addCallback(lambda name: import_item(name)) @@ -235,9 +258,42 @@ class AsyncClientConnector(object): return d + d = self._try_to_connect(furl_file, delay, max_tries, attempt=0) d.addCallback(_wrap_remote_reference) return d + @inlineCallbacks + def _try_to_connect(self, furl_or_file, delay, max_tries, attempt): + """Try to connect to the controller with retry logic.""" + if attempt < max_tries: + log.msg("Connecting to controller [%r]: %s" % \ + (attempt, furl_or_file)) + try: + self.furl = find_furl(furl_or_file) + # Uncomment this to see the FURL being tried. + # log.msg("FURL: %s" % self.furl) + rr = yield self.get_reference(self.furl) + except: + if attempt==max_tries-1: + # This will propagate the exception all the way to the top + # where it can be handled. + raise + else: + yield sleep_deferred(delay) + rr = yield self._try_to_connect( + furl_or_file, 1.5*delay, max_tries, attempt+1 + ) + returnValue(rr) + else: + returnValue(rr) + else: + raise ClientConnectorError( + 'Could not connect to controller, max_tries (%r) exceeded. ' + 'This usually means that i) the controller was not started, ' + 'or ii) a firewall was blocking the client from connecting ' + 'to the controller.' % max_tries + ) + class ClientConnector(object): """A blocking version of a client connector. @@ -252,7 +308,8 @@ class ClientConnector(object): self.async_cc = AsyncClientConnector() def get_task_client(self, profile='default', cluster_dir=None, - furl_or_file=None, ipythondir=None): + furl_or_file=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): """Get the task client. Usually only the ``profile`` option will be needed. If a FURL file @@ -282,12 +339,13 @@ class ClientConnector(object): """ client = blockingCallFromThread( self.async_cc.get_task_client, profile, cluster_dir, - furl_or_file, ipythondir + furl_or_file, ipythondir, delay, max_tries ) return client.adapt_to_blocking_client() def get_multiengine_client(self, profile='default', cluster_dir=None, - furl_or_file=None, ipythondir=None): + furl_or_file=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): """Get the multiengine client. Usually only the ``profile`` option will be needed. If a FURL file @@ -317,15 +375,17 @@ class ClientConnector(object): """ client = blockingCallFromThread( self.async_cc.get_multiengine_client, profile, cluster_dir, - furl_or_file, ipythondir + furl_or_file, ipythondir, delay, max_tries ) return client.adapt_to_blocking_client() def get_client(self, profile='default', cluster_dir=None, - furl_or_file=None, ipythondir=None): + furl_or_file=None, ipythondir=None, + delay=DELAY, max_tries=MAX_TRIES): client = blockingCallFromThread( self.async_cc.get_client, profile, cluster_dir, - furl_or_file, ipythondir + furl_or_file, ipythondir, + delay, max_tries ) return client.adapt_to_blocking_client() @@ -357,9 +417,6 @@ class AsyncCluster(object): cluster_dir : str The full path to a cluster directory. This is useful if profiles are not being used. - furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you - simply know the location of the FURL file. ipythondir : str The location of the ipythondir if different from the default. This is used if the cluster directory is being found by profile. @@ -451,7 +508,7 @@ class AsyncCluster(object): else: raise ClusterStateError("Cluster not running") - def get_multiengine_client(self): + def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES): """Get the multiengine client for the running cluster. If this fails, it means that the cluster has not finished starting. @@ -460,10 +517,11 @@ class AsyncCluster(object): if self.client_connector is None: self.client_connector = AsyncClientConnector() return self.client_connector.get_multiengine_client( - cluster_dir=self.cluster_dir_obj.location + cluster_dir=self.cluster_dir_obj.location, + delay=delay, max_tries=max_tries ) - def get_task_client(self): + def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES): """Get the task client for the running cluster. If this fails, it means that the cluster has not finished starting. @@ -472,7 +530,8 @@ class AsyncCluster(object): if self.client_connector is None: self.client_connector = AsyncClientConnector() return self.client_connector.get_task_client( - cluster_dir=self.cluster_dir_obj.location + cluster_dir=self.cluster_dir_obj.location, + delay=delay, max_tries=max_tries ) def get_ipengine_logs(self): @@ -529,9 +588,6 @@ class Cluster(object): cluster_dir : str The full path to a cluster directory. This is useful if profiles are not being used. - furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you - simply know the location of the FURL file. ipythondir : str The location of the ipythondir if different from the default. This is used if the cluster directory is being found by profile. @@ -581,7 +637,7 @@ class Cluster(object): """Stop the IPython cluster if it is running.""" return blockingCallFromThread(self.async_cluster.stop) - def get_multiengine_client(self): + def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES): """Get the multiengine client for the running cluster. If this fails, it means that the cluster has not finished starting. @@ -590,10 +646,11 @@ class Cluster(object): if self.client_connector is None: self.client_connector = ClientConnector() return self.client_connector.get_multiengine_client( - cluster_dir=self.cluster_dir_obj.location + cluster_dir=self.cluster_dir_obj.location, + delay=delay, max_tries=max_tries ) - def get_task_client(self): + def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES): """Get the task client for the running cluster. If this fails, it means that the cluster has not finished starting. @@ -602,7 +659,8 @@ class Cluster(object): if self.client_connector is None: self.client_connector = ClientConnector() return self.client_connector.get_task_client( - cluster_dir=self.cluster_dir_obj.location + cluster_dir=self.cluster_dir_obj.location, + delay=delay, max_tries=max_tries ) def __repr__(self): diff --git a/IPython/kernel/engineconnector.py b/IPython/kernel/engineconnector.py index eea93c3..dfa983f 100644 --- a/IPython/kernel/engineconnector.py +++ b/IPython/kernel/engineconnector.py @@ -21,9 +21,9 @@ from twisted.python import log, failure from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, returnValue -from IPython.kernel.fcutil import find_furl +from IPython.kernel.fcutil import find_furl, validate_furl_or_file from IPython.kernel.enginefc import IFCEngine -from IPython.kernel.twistedutil import sleep_deferred +from IPython.kernel.twistedutil import sleep_deferred, make_deferred #----------------------------------------------------------------------------- # The ClientConnector class @@ -45,6 +45,7 @@ class EngineConnector(object): def __init__(self, tub): self.tub = tub + @make_deferred def connect_to_controller(self, engine_service, furl_or_file, delay=0.1, max_tries=10): """ @@ -74,13 +75,20 @@ class EngineConnector(object): attempts have increasing delays. max_tries : int The maximum number of connection attempts. + + Returns + ------- + A deferred to the registered client or a failure to an error + like :exc:`FURLError`. """ if not self.tub.running: self.tub.startService() self.engine_service = engine_service self.engine_reference = IFCEngine(self.engine_service) + validate_furl_or_file(furl_or_file) d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0) + d.addCallback(self._register) return d @inlineCallbacks @@ -101,12 +109,13 @@ class EngineConnector(object): raise else: yield sleep_deferred(delay) - yield self._try_to_connect( + rr = yield self._try_to_connect( furl_or_file, 1.5*delay, max_tries, attempt+1 ) + # rr becomes an int when there is a connection!!! + returnValue(rr) else: - result = yield self._register(rr) - returnValue(result) + returnValue(rr) else: raise EngineConnectorError( 'Could not connect to controller, max_tries (%r) exceeded. ' diff --git a/IPython/kernel/fcutil.py b/IPython/kernel/fcutil.py index 09d2c1c..5943739 100644 --- a/IPython/kernel/fcutil.py +++ b/IPython/kernel/fcutil.py @@ -51,6 +51,10 @@ else: have_crypto = True +class FURLError(Exception): + pass + + def check_furl_file_security(furl_file, secure): """Remove the old furl_file if changing security modes.""" if os.path.isfile(furl_file): @@ -69,7 +73,7 @@ def is_secure(furl): elif furl.startswith("pbu://"): return False else: - raise ValueError("invalid FURL: %s" % furl) + raise FURLError("invalid FURL: %s" % furl) def is_valid(furl): @@ -91,7 +95,30 @@ def find_furl(furl_or_file): furl = f.read().strip() if is_valid(furl): return furl - raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file) + raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file) + + +def is_valid_furl_or_file(furl_or_file): + """Validate a FURL or a FURL file. + + If ``furl_or_file`` looks like a file, we simply make sure its directory + exists and that it has a ``.furl`` file extension. We don't try to see + if the FURL file exists or to read its contents. This is useful for + cases where auto re-connection is being used. + """ + if isinstance(furl_or_file, str): + if is_valid(furl_or_file): + return True + if isinstance(furl_or_file, (str, unicode)): + path, furl_filename = os.path.split(furl_or_file) + if os.path.isdir(path) and furl_filename.endswith('.furl'): + return True + return False + + +def validate_furl_or_file(furl_or_file): + if not is_valid_furl_or_file(furl_or_file): + raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file) def get_temp_furlfile(filename): diff --git a/IPython/kernel/twistedutil.py b/IPython/kernel/twistedutil.py index 2d0798a..712a83b 100644 --- a/IPython/kernel/twistedutil.py +++ b/IPython/kernel/twistedutil.py @@ -40,6 +40,15 @@ class ReactorInThread(threading.Thread): """ def run(self): + """Run the twisted reactor in a thread. + + This runs the reactor with installSignalHandlers=0, which prevents + twisted from installing any of its own signal handlers. This needs to + be disabled because signal.signal can't be called in a thread. The + only problem with this is that SIGCHLD events won't be detected so + spawnProcess won't detect that its processes have been killed by + an external factor. + """ reactor.run(installSignalHandlers=0) # self.join() @@ -260,4 +269,6 @@ def make_deferred(func): def _wrapper(*args, **kwargs): return defer.maybeDeferred(func, *args, **kwargs) - return _wrapper \ No newline at end of file + return _wrapper + +