diff --git a/IPython/kernel/clientconnector.py b/IPython/kernel/clientconnector.py
index 034dbd5..c440017 100644
--- a/IPython/kernel/clientconnector.py
+++ b/IPython/kernel/clientconnector.py
@@ -17,22 +17,39 @@
 from __future__ import with_statement
 import os
 
-from IPython.kernel.fcutil import Tub, find_furl
+from IPython.kernel.fcutil import (
+    Tub,
+    find_furl,
+    is_valid_furl_or_file,
+    validate_furl_or_file,
+    FURLError
+)
 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
 from IPython.kernel.launcher import IPClusterLauncher
-from IPython.kernel.twistedutil import gatherBoth, make_deferred
-from IPython.kernel.twistedutil import blockingCallFromThread
-
+from IPython.kernel.twistedutil import (
+    gatherBoth, 
+    make_deferred,
+    blockingCallFromThread, 
+    sleep_deferred
+)
 from IPython.utils.importstring import import_item
 from IPython.utils.genutils import get_ipython_dir
 
 from twisted.internet import defer
-from twisted.python import failure
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure, log
 
 #-----------------------------------------------------------------------------
 # The ClientConnector class
 #-----------------------------------------------------------------------------
 
+DELAY = 0.2
+MAX_TRIES = 9
+
+
+class ClientConnectorError(Exception):
+    pass
+
 
 class AsyncClientConnector(object):
     """A class for getting remote references and clients from furls.
@@ -51,24 +68,24 @@ class AsyncClientConnector(object):
                    ipythondir=None):
         """Find a FURL file by profile+ipythondir or cluster dir.
 
-        This raises an exception if a FURL file can't be found.
+        This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception 
+        if a FURL file can't be found.
         """
         # Try by furl_or_file
         if furl_or_file is not None:
-            try:
-                furl = find_furl(furl_or_file)
-            except ValueError:
-                return furl
+            validate_furl_or_file(furl_or_file)
+            return furl_or_file
 
         if furl_file_name is None:
-            raise ValueError('A furl_file_name must be provided')
+            raise FURLError('A furl_file_name must be provided')
 
         # Try by cluster_dir
         if cluster_dir is not None:
             cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
             sdir = cluster_dir_obj.security_dir
             furl_file = os.path.join(sdir, furl_file_name)
-            return find_furl(furl_file)
+            validate_furl_or_file(furl_file)
+            return furl_file
 
         # Try by profile
         if ipythondir is None:
@@ -78,9 +95,10 @@ class AsyncClientConnector(object):
                 ipythondir, profile)
             sdir = cluster_dir_obj.security_dir
             furl_file = os.path.join(sdir, furl_file_name)
-            return find_furl(furl_file)
+            validate_furl_or_file(furl_file)
+            return furl_file
 
-        raise ValueError('Could not find a valid FURL file.')
+        raise FURLError('Could not find a valid FURL file.')
 
     def get_reference(self, furl_or_file):
         """Get a remote reference using a furl or a file containing a furl.
@@ -92,13 +110,14 @@ class AsyncClientConnector(object):
         Parameters
         ----------
         furl_or_file : str
-            A furl or a filename containing a furl
+            A furl or a filename containing a furl. This should already be
+            validated, but might not yet exist.
 
         Returns
         -------
         A deferred to a remote reference
         """
-        furl = find_furl(furl_or_file)
+        furl = furl_or_file
         if furl in self._remote_refs:
             d = defer.succeed(self._remote_refs[furl])
         else:
@@ -112,7 +131,8 @@ class AsyncClientConnector(object):
         return ref
         
     def get_task_client(self, profile='default', cluster_dir=None,
-                        furl_or_file=None, ipythondir=None):
+                        furl_or_file=None, ipythondir=None,
+                        delay=DELAY, max_tries=MAX_TRIES):
         """Get the task controller client.
         
         This method is a simple wrapper around `get_client` that passes in
@@ -143,11 +163,13 @@ class AsyncClientConnector(object):
         """
         return self.get_client(
             profile, cluster_dir, furl_or_file, 
-            'ipcontroller-tc.furl', ipythondir
+            'ipcontroller-tc.furl', ipythondir,
+            delay, max_tries
         )
 
     def get_multiengine_client(self, profile='default', cluster_dir=None,
-                               furl_or_file=None, ipythondir=None):
+                               furl_or_file=None, ipythondir=None,
+                               delay=DELAY, max_tries=MAX_TRIES):
         """Get the multiengine controller client.
         
         This method is a simple wrapper around `get_client` that passes in
@@ -178,11 +200,13 @@ class AsyncClientConnector(object):
         """
         return self.get_client(
             profile, cluster_dir, furl_or_file, 
-            'ipcontroller-mec.furl', ipythondir
+            'ipcontroller-mec.furl', ipythondir,
+            delay, max_tries
         )
     
     def get_client(self, profile='default', cluster_dir=None,
-                   furl_or_file=None, furl_file_name=None, ipythondir=None):
+                   furl_or_file=None, furl_file_name=None, ipythondir=None,
+                   delay=DELAY, max_tries=MAX_TRIES):
         """Get a remote reference and wrap it in a client by furl.
 
         This method is a simple wrapper around `get_client` that passes in
@@ -201,7 +225,7 @@ class AsyncClientConnector(object):
             The full path to a cluster directory.  This is useful if profiles
             are not being used.
         furl_or_file : str
-            A furl or a filename containing a FURLK. This is useful if you 
+            A furl or a filename containing a FURL. This is useful if you 
             simply know the location of the FURL file.
         furl_file_name : str
             The filename (not the full path) of the FURL. This must be
@@ -212,18 +236,17 @@ class AsyncClientConnector(object):
 
         Returns
         -------
-        A deferred to the actual client class.
+        A deferred to the actual client class. Or a failure to a 
+        :exc:`FURLError`.
         """
         try:
-            furl = self._find_furl(
+            furl_file = self._find_furl(
                 profile, cluster_dir, furl_or_file,
                 furl_file_name, ipythondir
             )
-        except:
+        except FURLError:
             return defer.fail(failure.Failure())
 
-        d = self.get_reference(furl)
-
         def _wrap_remote_reference(rr):
             d = rr.callRemote('get_client_name')
             d.addCallback(lambda name: import_item(name))
@@ -235,9 +258,42 @@ class AsyncClientConnector(object):
 
             return d
 
+        d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
         d.addCallback(_wrap_remote_reference)
         return d
 
+    @inlineCallbacks
+    def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
+        """Try to connect to the controller with retry logic."""
+        if attempt < max_tries:
+            log.msg("Connecting to controller [%r]: %s" % \
+                (attempt, furl_or_file))
+            try:
+                self.furl = find_furl(furl_or_file)
+                # Uncomment this to see the FURL being tried.
+                # log.msg("FURL: %s" % self.furl)
+                rr = yield self.get_reference(self.furl)
+            except:
+                if attempt==max_tries-1:
+                    # This will propagate the exception all the way to the top
+                    # where it can be handled.
+                    raise
+                else:
+                    yield sleep_deferred(delay)
+                    rr = yield self._try_to_connect(
+                        furl_or_file, 1.5*delay, max_tries, attempt+1
+                    )
+                    returnValue(rr)
+            else:
+                returnValue(rr)
+        else:
+            raise ClientConnectorError(
+                'Could not connect to controller, max_tries (%r) exceeded. '
+                'This usually means that i) the controller was not started, '
+                'or ii) a firewall was blocking the client from connecting '
+                'to the controller.' % max_tries
+            )
+
 
 class ClientConnector(object):
     """A blocking version of a client connector.
@@ -252,7 +308,8 @@ class ClientConnector(object):
         self.async_cc = AsyncClientConnector()
 
     def get_task_client(self, profile='default', cluster_dir=None,
-                        furl_or_file=None, ipythondir=None):
+                        furl_or_file=None, ipythondir=None,
+                        delay=DELAY, max_tries=MAX_TRIES):
         """Get the task client.
         
         Usually only the ``profile`` option will be needed. If a FURL file
@@ -282,12 +339,13 @@ class ClientConnector(object):
         """
         client = blockingCallFromThread(
             self.async_cc.get_task_client, profile, cluster_dir,
-            furl_or_file, ipythondir
+            furl_or_file, ipythondir, delay, max_tries
         )
         return client.adapt_to_blocking_client()
 
     def get_multiengine_client(self, profile='default', cluster_dir=None,
-                               furl_or_file=None, ipythondir=None):
+                               furl_or_file=None, ipythondir=None,
+                               delay=DELAY, max_tries=MAX_TRIES):
         """Get the multiengine client.
         
         Usually only the ``profile`` option will be needed. If a FURL file
@@ -317,15 +375,17 @@ class ClientConnector(object):
         """
         client = blockingCallFromThread(
             self.async_cc.get_multiengine_client, profile, cluster_dir,
-            furl_or_file, ipythondir
+            furl_or_file, ipythondir, delay, max_tries
         )
         return client.adapt_to_blocking_client()
 
     def get_client(self, profile='default', cluster_dir=None,
-                   furl_or_file=None, ipythondir=None):
+                   furl_or_file=None, ipythondir=None,
+                   delay=DELAY, max_tries=MAX_TRIES):
         client = blockingCallFromThread(
             self.async_cc.get_client, profile, cluster_dir,
-            furl_or_file, ipythondir
+            furl_or_file, ipythondir,
+            delay, max_tries
         )
         return client.adapt_to_blocking_client()
 
@@ -357,9 +417,6 @@ class AsyncCluster(object):
         cluster_dir : str
             The full path to a cluster directory.  This is useful if profiles
             are not being used.
-        furl_or_file : str
-            A furl or a filename containing a FURLK. This is useful if you 
-            simply know the location of the FURL file.
         ipythondir : str
             The location of the ipythondir if different from the default.
             This is used if the cluster directory is being found by profile.
@@ -451,7 +508,7 @@ class AsyncCluster(object):
         else:
             raise ClusterStateError("Cluster not running")
 
-    def get_multiengine_client(self):
+    def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
         """Get the multiengine client for the running cluster.
 
         If this fails, it means that the cluster has not finished starting.
@@ -460,10 +517,11 @@ class AsyncCluster(object):
         if self.client_connector is None:
             self.client_connector = AsyncClientConnector()
         return self.client_connector.get_multiengine_client(
-            cluster_dir=self.cluster_dir_obj.location
+            cluster_dir=self.cluster_dir_obj.location,
+            delay=delay, max_tries=max_tries
         )
 
-    def get_task_client(self):
+    def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
         """Get the task client for the running cluster.
 
         If this fails, it means that the cluster has not finished starting.
@@ -472,7 +530,8 @@ class AsyncCluster(object):
         if self.client_connector is None:
             self.client_connector = AsyncClientConnector()
         return self.client_connector.get_task_client(
-            cluster_dir=self.cluster_dir_obj.location
+            cluster_dir=self.cluster_dir_obj.location,
+            delay=delay, max_tries=max_tries
         )
 
     def get_ipengine_logs(self):
@@ -529,9 +588,6 @@ class Cluster(object):
         cluster_dir : str
             The full path to a cluster directory.  This is useful if profiles
             are not being used.
-        furl_or_file : str
-            A furl or a filename containing a FURLK. This is useful if you 
-            simply know the location of the FURL file.
         ipythondir : str
             The location of the ipythondir if different from the default.
             This is used if the cluster directory is being found by profile.
@@ -581,7 +637,7 @@ class Cluster(object):
         """Stop the IPython cluster if it is running."""
         return blockingCallFromThread(self.async_cluster.stop)
 
-    def get_multiengine_client(self):
+    def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
         """Get the multiengine client for the running cluster.
 
         If this fails, it means that the cluster has not finished starting.
@@ -590,10 +646,11 @@ class Cluster(object):
         if self.client_connector is None:
             self.client_connector = ClientConnector()
         return self.client_connector.get_multiengine_client(
-            cluster_dir=self.cluster_dir_obj.location
+            cluster_dir=self.cluster_dir_obj.location,
+            delay=delay, max_tries=max_tries
         )
 
-    def get_task_client(self):
+    def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
         """Get the task client for the running cluster.
 
         If this fails, it means that the cluster has not finished starting.
@@ -602,7 +659,8 @@ class Cluster(object):
         if self.client_connector is None:
             self.client_connector = ClientConnector()
         return self.client_connector.get_task_client(
-            cluster_dir=self.cluster_dir_obj.location
+            cluster_dir=self.cluster_dir_obj.location,
+            delay=delay, max_tries=max_tries
         )
 
     def __repr__(self):
diff --git a/IPython/kernel/engineconnector.py b/IPython/kernel/engineconnector.py
index eea93c3..dfa983f 100644
--- a/IPython/kernel/engineconnector.py
+++ b/IPython/kernel/engineconnector.py
@@ -21,9 +21,9 @@ from twisted.python import log, failure
 from twisted.internet import defer
 from twisted.internet.defer import inlineCallbacks, returnValue
 
-from IPython.kernel.fcutil import find_furl
+from IPython.kernel.fcutil import find_furl, validate_furl_or_file
 from IPython.kernel.enginefc import IFCEngine
-from IPython.kernel.twistedutil import sleep_deferred
+from IPython.kernel.twistedutil import sleep_deferred, make_deferred
 
 #-----------------------------------------------------------------------------
 # The ClientConnector class
@@ -45,6 +45,7 @@ class EngineConnector(object):
     def __init__(self, tub):
         self.tub = tub
 
+    @make_deferred
     def connect_to_controller(self, engine_service, furl_or_file,
                               delay=0.1, max_tries=10):
         """
@@ -74,13 +75,20 @@ class EngineConnector(object):
             attempts have increasing delays.
         max_tries : int
             The maximum number of connection attempts.
+
+        Returns
+        -------
+        A deferred to the registered client or a failure to an error
+        like :exc:`FURLError`.
         """
         if not self.tub.running:
             self.tub.startService()
         self.engine_service = engine_service
         self.engine_reference = IFCEngine(self.engine_service)
 
+        validate_furl_or_file(furl_or_file)
         d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
+        d.addCallback(self._register)
         return d
 
     @inlineCallbacks
@@ -101,12 +109,13 @@ class EngineConnector(object):
                     raise
                 else:
                     yield sleep_deferred(delay)
-                    yield self._try_to_connect(
+                    rr = yield self._try_to_connect(
                         furl_or_file, 1.5*delay, max_tries, attempt+1
                     )
+                    # rr becomes an int when there is a connection!!!
+                    returnValue(rr)
             else:
-                result = yield self._register(rr)
-                returnValue(result)
+                returnValue(rr)
         else:
             raise EngineConnectorError(
                 'Could not connect to controller, max_tries (%r) exceeded. '
diff --git a/IPython/kernel/fcutil.py b/IPython/kernel/fcutil.py
index 09d2c1c..5943739 100644
--- a/IPython/kernel/fcutil.py
+++ b/IPython/kernel/fcutil.py
@@ -51,6 +51,10 @@ else:
     have_crypto = True
 
 
+class FURLError(Exception):
+    pass
+
+
 def check_furl_file_security(furl_file, secure):
     """Remove the old furl_file if changing security modes."""
     if os.path.isfile(furl_file):
@@ -69,7 +73,7 @@ def is_secure(furl):
         elif furl.startswith("pbu://"):
             return False
     else:
-        raise ValueError("invalid FURL: %s" % furl)
+        raise FURLError("invalid FURL: %s" % furl)
 
 
 def is_valid(furl):
@@ -91,7 +95,30 @@ def find_furl(furl_or_file):
             furl = f.read().strip()
         if is_valid(furl):
             return furl
-    raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
+    raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
+
+
+def is_valid_furl_or_file(furl_or_file):
+    """Validate a FURL or a FURL file.
+
+    If ``furl_or_file`` looks like a file, we simply make sure its directory
+    exists and that it has a ``.furl`` file extension.  We don't try to see
+    if the FURL file exists or to read its contents. This is useful for
+    cases where auto re-connection is being used.
+    """
+    if isinstance(furl_or_file, str):
+        if is_valid(furl_or_file):
+            return True
+    if isinstance(furl_or_file, (str, unicode)):
+        path, furl_filename = os.path.split(furl_or_file)
+        if os.path.isdir(path) and furl_filename.endswith('.furl'):
+            return True
+    return False
+
+
+def validate_furl_or_file(furl_or_file):
+    if not is_valid_furl_or_file(furl_or_file):
+        raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
 
 
 def get_temp_furlfile(filename):
diff --git a/IPython/kernel/twistedutil.py b/IPython/kernel/twistedutil.py
index 2d0798a..712a83b 100644
--- a/IPython/kernel/twistedutil.py
+++ b/IPython/kernel/twistedutil.py
@@ -40,6 +40,15 @@ class ReactorInThread(threading.Thread):
     """
     
     def run(self):
+        """Run the twisted reactor in a thread.
+
+        This runs the reactor with installSignalHandlers=0, which prevents
+        twisted from installing any of its own signal handlers. This needs to
+        be disabled because signal.signal can't be called in a thread. The
+        only problem with this is that SIGCHLD events won't be detected so
+        spawnProcess won't detect that its processes have been killed by
+        an external factor.
+        """
         reactor.run(installSignalHandlers=0)
         # self.join()
         
@@ -260,4 +269,6 @@ def make_deferred(func):
     def _wrapper(*args, **kwargs):
         return defer.maybeDeferred(func, *args, **kwargs)
 
-    return _wrapper
\ No newline at end of file
+    return _wrapper
+
+