engineconnector.py
139 lines
| 5.4 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2306 | #!/usr/bin/env python | ||
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
"""A class that manages the engines connection to the controller.""" | ||||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2009 The IPython Development Team | ||||
Brian E Granger
|
r1234 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Imports | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
import os | ||||
import cPickle as pickle | ||||
Brian Granger
|
r1769 | from twisted.python import log, failure | ||
from twisted.internet import defer | ||||
Brian Granger
|
r2302 | from twisted.internet.defer import inlineCallbacks, returnValue | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2309 | from IPython.kernel.fcutil import find_furl, validate_furl_or_file | ||
Brian E Granger
|
r1234 | from IPython.kernel.enginefc import IFCEngine | ||
Brian Granger
|
r2309 | from IPython.kernel.twistedutil import sleep_deferred, make_deferred | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # The ClientConnector class | ||
Brian Granger
|
r2306 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian Granger
|
r2302 | |||
class EngineConnectorError(Exception): | ||||
pass | ||||
Brian E Granger
|
r1234 | class EngineConnector(object): | ||
"""Manage an engines connection to a controller. | ||||
This class takes a foolscap `Tub` and provides a `connect_to_controller` | ||||
method that will use the `Tub` to connect to a controller and register | ||||
the engine with the controller. | ||||
""" | ||||
def __init__(self, tub): | ||||
self.tub = tub | ||||
Brian Granger
|
r2302 | |||
Brian Granger
|
r2309 | @make_deferred | ||
Brian Granger
|
r2302 | def connect_to_controller(self, engine_service, furl_or_file, | ||
delay=0.1, max_tries=10): | ||||
Brian E Granger
|
r1234 | """ | ||
Make a connection to a controller specified by a furl. | ||||
This method takes an `IEngineBase` instance and a foolcap URL and uses | ||||
the `tub` attribute to make a connection to the controller. The | ||||
foolscap URL contains all the information needed to connect to the | ||||
controller, including the ip and port as well as any encryption and | ||||
authentication information needed for the connection. | ||||
Brian Granger
|
r2302 | |||
Brian E Granger
|
r1234 | After getting a reference to the controller, this method calls the | ||
`register_engine` method of the controller to actually register the | ||||
engine. | ||||
Brian Granger
|
r2302 | |||
This method will try to connect to the controller multiple times with | ||||
a delay in between. Each time the FURL file is read anew. | ||||
Parameters | ||||
__________ | ||||
engine_service : IEngineBase | ||||
An instance of an `IEngineBase` implementer | ||||
furl_or_file : str | ||||
A furl or a filename containing a furl | ||||
delay : float | ||||
The intial time to wait between connection attempts. Subsequent | ||||
attempts have increasing delays. | ||||
max_tries : int | ||||
The maximum number of connection attempts. | ||||
Brian Granger
|
r2309 | |||
Returns | ||||
------- | ||||
A deferred to the registered client or a failure to an error | ||||
like :exc:`FURLError`. | ||||
Brian E Granger
|
r1234 | """ | ||
if not self.tub.running: | ||||
self.tub.startService() | ||||
self.engine_service = engine_service | ||||
self.engine_reference = IFCEngine(self.engine_service) | ||||
Brian Granger
|
r2302 | |||
Brian Granger
|
r2309 | validate_furl_or_file(furl_or_file) | ||
Brian Granger
|
r2302 | d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0) | ||
Brian Granger
|
r2309 | d.addCallback(self._register) | ||
Brian Granger
|
r2302 | return d | ||
@inlineCallbacks | ||||
def _try_to_connect(self, furl_or_file, delay, max_tries, attempt): | ||||
"""Try to connect to the controller with retry logic.""" | ||||
if attempt < max_tries: | ||||
log.msg("Attempting to connect to controller [%r]: %s" % \ | ||||
(attempt, furl_or_file)) | ||||
try: | ||||
self.furl = find_furl(furl_or_file) | ||||
# Uncomment this to see the FURL being tried. | ||||
# log.msg("FURL: %s" % self.furl) | ||||
rr = yield self.tub.getReference(self.furl) | ||||
except: | ||||
if attempt==max_tries-1: | ||||
# This will propagate the exception all the way to the top | ||||
# where it can be handled. | ||||
raise | ||||
else: | ||||
yield sleep_deferred(delay) | ||||
Brian Granger
|
r2309 | rr = yield self._try_to_connect( | ||
Brian Granger
|
r2302 | furl_or_file, 1.5*delay, max_tries, attempt+1 | ||
) | ||||
Brian Granger
|
r2309 | # rr becomes an int when there is a connection!!! | ||
returnValue(rr) | ||||
Brian Granger
|
r2302 | else: | ||
Brian Granger
|
r2309 | returnValue(rr) | ||
Brian Granger
|
r1944 | else: | ||
Brian Granger
|
r2302 | raise EngineConnectorError( | ||
'Could not connect to controller, max_tries (%r) exceeded. ' | ||||
'This usually means that i) the controller was not started, ' | ||||
'or ii) a firewall was blocking the engine from connecting ' | ||||
'to the controller.' % max_tries | ||||
) | ||||
Brian E Granger
|
r1234 | |||
def _register(self, rr): | ||||
self.remote_ref = rr | ||||
# Now register myself with the controller | ||||
desired_id = self.engine_service.id | ||||
d = self.remote_ref.callRemote('register_engine', self.engine_reference, | ||||
desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2)) | ||||
Brian Granger
|
r2302 | return d.addCallback(self._reference_sent) | ||
Brian E Granger
|
r1234 | |||
def _reference_sent(self, registration_dict): | ||||
self.engine_service.id = registration_dict['id'] | ||||
log.msg("engine registration succeeded, got id: %r" % self.engine_service.id) | ||||
return self.engine_service.id | ||||