#!/usr/bin/env python # encoding: utf-8 """A class that manages the engines connection to the controller.""" #----------------------------------------------------------------------------- # Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import os import cPickle as pickle from twisted.python import log, failure from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, returnValue from IPython.kernel.fcutil import find_furl, validate_furl_or_file from IPython.kernel.enginefc import IFCEngine from IPython.kernel.twistedutil import sleep_deferred, make_deferred #----------------------------------------------------------------------------- # The ClientConnector class #----------------------------------------------------------------------------- class EngineConnectorError(Exception): pass class EngineConnector(object): """Manage an engines connection to a controller. This class takes a foolscap `Tub` and provides a `connect_to_controller` method that will use the `Tub` to connect to a controller and register the engine with the controller. """ def __init__(self, tub): self.tub = tub @make_deferred def connect_to_controller(self, engine_service, furl_or_file, delay=0.1, max_tries=10): """ Make a connection to a controller specified by a furl. This method takes an `IEngineBase` instance and a foolcap URL and uses the `tub` attribute to make a connection to the controller. The foolscap URL contains all the information needed to connect to the controller, including the ip and port as well as any encryption and authentication information needed for the connection. After getting a reference to the controller, this method calls the `register_engine` method of the controller to actually register the engine. This method will try to connect to the controller multiple times with a delay in between. Each time the FURL file is read anew. Parameters __________ engine_service : IEngineBase An instance of an `IEngineBase` implementer furl_or_file : str A furl or a filename containing a furl delay : float The intial time to wait between connection attempts. Subsequent attempts have increasing delays. max_tries : int The maximum number of connection attempts. Returns ------- A deferred to the registered client or a failure to an error like :exc:`FURLError`. """ if not self.tub.running: self.tub.startService() self.engine_service = engine_service self.engine_reference = IFCEngine(self.engine_service) validate_furl_or_file(furl_or_file) d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0) d.addCallback(self._register) return d @inlineCallbacks def _try_to_connect(self, furl_or_file, delay, max_tries, attempt): """Try to connect to the controller with retry logic.""" if attempt < max_tries: log.msg("Attempting to connect to controller [%r]: %s" % \ (attempt, furl_or_file)) try: self.furl = find_furl(furl_or_file) # Uncomment this to see the FURL being tried. # log.msg("FURL: %s" % self.furl) rr = yield self.tub.getReference(self.furl) except: if attempt==max_tries-1: # This will propagate the exception all the way to the top # where it can be handled. raise else: yield sleep_deferred(delay) rr = yield self._try_to_connect( furl_or_file, 1.5*delay, max_tries, attempt+1 ) # rr becomes an int when there is a connection!!! returnValue(rr) else: returnValue(rr) else: raise EngineConnectorError( 'Could not connect to controller, max_tries (%r) exceeded. ' 'This usually means that i) the controller was not started, ' 'or ii) a firewall was blocking the engine from connecting ' 'to the controller.' % max_tries ) def _register(self, rr): self.remote_ref = rr # Now register myself with the controller desired_id = self.engine_service.id d = self.remote_ref.callRemote('register_engine', self.engine_reference, desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2)) return d.addCallback(self._reference_sent) def _reference_sent(self, registration_dict): self.engine_service.id = registration_dict['id'] log.msg("engine registration succeeded, got id: %r" % self.engine_service.id) return self.engine_service.id