# encoding: utf-8 # -*- test-case-name: IPython.kernel.test.test_controllerservice -*- """A Twisted Service for the IPython Controller. The IPython Controller: * Listens for Engines to connect and then manages access to those engines. * Listens for clients and passes commands from client to the Engines. * Exposes an asynchronous interfaces to the Engines which themselves can block. * Acts as a gateway to the Engines. The design of the controller is somewhat abstract to allow flexibility in how the controller is presented to clients. This idea is that there is a basic ControllerService class that allows engines to connect to it. But, this basic class has no client interfaces. To expose client interfaces developers provide an adapter that makes the ControllerService look like something. For example, one client interface might support task farming and another might support interactive usage. The important thing is that by using interfaces and adapters, a single controller can be accessed from multiple interfaces. Furthermore, by adapting various client interfaces to various network protocols, each client interface can be exposed to multiple network protocols. See multiengine.py for an example of how to adapt the ControllerService to a client interface. """ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import os, sys from twisted.application import service from twisted.internet import defer, reactor from twisted.python import log, components from zope.interface import Interface, implements, Attribute import zope.interface as zi from IPython.kernel.engineservice import \ IEngineCore, \ IEngineSerialized, \ IEngineQueued from IPython.config import cutils from IPython.kernel import codeutil #------------------------------------------------------------------------------- # Interfaces for the Controller #------------------------------------------------------------------------------- class IControllerCore(Interface): """Basic methods any controller must have. This is basically the aspect of the controller relevant to the engines and does not assume anything about how the engines will be presented to a client. """ engines = Attribute("A dict of engine ids and engine instances.") def register_engine(remoteEngine, id=None, ip=None, port=None, pid=None): """Register new remote engine. The controller can use the ip, port, pid of the engine to do useful things like kill the engines. :Parameters: remoteEngine An implementer of IEngineCore, IEngineSerialized and IEngineQueued. id : int Requested id. ip : str IP address the engine is running on. port : int Port the engine is on. pid : int pid of the running engine. :Returns: A dict of {'id':id} and possibly other key, value pairs. """ def unregister_engine(id): """Handle a disconnecting engine. :Parameters: id The integer engine id of the engine to unregister. """ def on_register_engine_do(f, includeID, *args, **kwargs): """Call ``f(*args, **kwargs)`` when an engine is registered. :Parameters: includeID : int If True the first argument to f will be the id of the engine. """ def on_unregister_engine_do(f, includeID, *args, **kwargs): """Call ``f(*args, **kwargs)`` when an engine is unregistered. :Parameters: includeID : int If True the first argument to f will be the id of the engine. """ def on_register_engine_do_not(f): """Stop calling f on engine registration""" def on_unregister_engine_do_not(f): """Stop calling f on engine unregistration""" def on_n_engines_registered_do(n, f, *arg, **kwargs): """Call f(*args, **kwargs) the first time the nth engine registers.""" class IControllerBase(IControllerCore): """The basic controller interface.""" pass #------------------------------------------------------------------------------- # Implementation of the ControllerService #------------------------------------------------------------------------------- class ControllerService(object, service.Service): """A basic Controller represented as a Twisted Service. This class doesn't implement any client notification mechanism. That is up to adapted subclasses. """ # I also pick up the IService interface by inheritance from service.Service implements(IControllerBase) name = 'ControllerService' def __init__(self, maxEngines=511, saveIDs=False): self.saveIDs = saveIDs self.engines = {} self.availableIDs = range(maxEngines,-1,-1) # [511,...,0] self._onRegister = [] self._onUnregister = [] self._onNRegistered = [] #--------------------------------------------------------------------------- # Methods used to save the engine info to a log file #--------------------------------------------------------------------------- def _buildEngineInfoString(self, id, ip, port, pid): if id is None: id = -99 if ip is None: ip = "-99" if port is None: port = -99 if pid is None: pid = -99 return "Engine Info: %d %s %d %d" % (id, ip , port, pid) def _logEngineInfo(self, id, ip, port, pid): log.msg(self._buildEngineInfoString(id,ip,port,pid)) def _getEngineInfoLogFile(self): # Store all logs inside the ipython directory ipdir = cutils.get_ipython_dir() pjoin = os.path.join logdir_base = pjoin(ipdir,'log') if not os.path.isdir(logdir_base): os.makedirs(logdir_base) logfile = os.path.join(logdir_base,'ipcontroller-%s-engine-info.log' % os.getpid()) return logfile def _logEngineInfoToFile(self, id, ip, port, pid): """Log info about an engine to a log file. When an engine registers with a ControllerService, the ControllerService saves information about the engine to a log file. That information can be useful for various purposes, such as killing hung engines, etc. This method takes the assigned id, ip/port and pid of the engine and saves it to a file of the form: ~/.ipython/log/ipcontroller-###-engine-info.log where ### is the pid of the controller. Each line of this file has the form: Engine Info: ip ip port pid If any of the entries are not known, they are replaced by -99. """ fname = self._getEngineInfoLogFile() f = open(fname, 'a') s = self._buildEngineInfoString(id,ip,port,pid) f.write(s + '\n') f.close() #--------------------------------------------------------------------------- # IControllerCore methods #--------------------------------------------------------------------------- def register_engine(self, remoteEngine, id=None, ip=None, port=None, pid=None): """Register new engine connection""" # What happens if these assertions fail? assert IEngineCore.providedBy(remoteEngine), \ "engine passed to register_engine doesn't provide IEngineCore" assert IEngineSerialized.providedBy(remoteEngine), \ "engine passed to register_engine doesn't provide IEngineSerialized" assert IEngineQueued.providedBy(remoteEngine), \ "engine passed to register_engine doesn't provide IEngineQueued" assert isinstance(id, int) or id is None, \ "id to register_engine must be an integer or None" assert isinstance(ip, str) or ip is None, \ "ip to register_engine must be a string or None" assert isinstance(port, int) or port is None, \ "port to register_engine must be an integer or None" assert isinstance(pid, int) or pid is None, \ "pid to register_engine must be an integer or None" desiredID = id if desiredID in self.engines.keys(): desiredID = None if desiredID in self.availableIDs: getID = desiredID self.availableIDs.remove(desiredID) else: getID = self.availableIDs.pop() remoteEngine.id = getID remoteEngine.service = self self.engines[getID] = remoteEngine # Log the Engine Information for monitoring purposes self._logEngineInfoToFile(getID, ip, port, pid) msg = "registered engine with id: %i" %getID log.msg(msg) for i in range(len(self._onRegister)): (f,args,kwargs,ifid) = self._onRegister[i] try: if ifid: f(getID, *args, **kwargs) else: f(*args, **kwargs) except: self._onRegister.pop(i) # Call functions when the nth engine is registered and them remove them for i, (n, f, args, kwargs) in enumerate(self._onNRegistered): if len(self.engines.keys()) == n: try: try: f(*args, **kwargs) except: log.msg("Function %r failed when the %ith engine registered" % (f, n)) finally: self._onNRegistered.pop(i) return {'id':getID} def unregister_engine(self, id): """Unregister engine by id.""" assert isinstance(id, int) or id is None, \ "id to unregister_engine must be an integer or None" msg = "unregistered engine with id: %i" %id log.msg(msg) try: del self.engines[id] except KeyError: log.msg("engine with id %i was not registered" % id) else: if not self.saveIDs: self.availableIDs.append(id) # Sort to assign lower ids first self.availableIDs.sort(reverse=True) else: log.msg("preserving id %i" %id) for i in range(len(self._onUnregister)): (f,args,kwargs,ifid) = self._onUnregister[i] try: if ifid: f(id, *args, **kwargs) else: f(*args, **kwargs) except: self._onUnregister.pop(i) def on_register_engine_do(self, f, includeID, *args, **kwargs): assert callable(f), "f must be callable" self._onRegister.append((f,args,kwargs,includeID)) def on_unregister_engine_do(self, f, includeID, *args, **kwargs): assert callable(f), "f must be callable" self._onUnregister.append((f,args,kwargs,includeID)) def on_register_engine_do_not(self, f): for i in range(len(self._onRegister)): g = self._onRegister[i][0] if f == g: self._onRegister.pop(i) return def on_unregister_engine_do_not(self, f): for i in range(len(self._onUnregister)): g = self._onUnregister[i][0] if f == g: self._onUnregister.pop(i) return def on_n_engines_registered_do(self, n, f, *args, **kwargs): if len(self.engines.keys()) >= n: f(*args, **kwargs) else: self._onNRegistered.append((n,f,args,kwargs)) #------------------------------------------------------------------------------- # Base class for adapting controller to different client APIs #------------------------------------------------------------------------------- class ControllerAdapterBase(object): """All Controller adapters should inherit from this class. This class provides a wrapped version of the IControllerBase interface that can be used to easily create new custom controllers. Subclasses of this will provide a full implementation of IControllerBase. This class doesn't implement any client notification mechanism. That is up to subclasses. """ implements(IControllerBase) def __init__(self, controller): self.controller = controller # Needed for IControllerCore self.engines = self.controller.engines def register_engine(self, remoteEngine, id=None, ip=None, port=None, pid=None): return self.controller.register_engine(remoteEngine, id, ip, port, pid) def unregister_engine(self, id): return self.controller.unregister_engine(id) def on_register_engine_do(self, f, includeID, *args, **kwargs): return self.controller.on_register_engine_do(f, includeID, *args, **kwargs) def on_unregister_engine_do(self, f, includeID, *args, **kwargs): return self.controller.on_unregister_engine_do(f, includeID, *args, **kwargs) def on_register_engine_do_not(self, f): return self.controller.on_register_engine_do_not(f) def on_unregister_engine_do_not(self, f): return self.controller.on_unregister_engine_do_not(f) def on_n_engines_registered_do(self, n, f, *args, **kwargs): return self.controller.on_n_engines_registered_do(n, f, *args, **kwargs)