engineservice.py
864 lines
| 27.8 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.tests.test_engineservice -*- | ||||
"""A Twisted Service Representation of the IPython core. | ||||
The IPython Core exposed to the network is called the Engine. Its | ||||
representation in Twisted in the EngineService. Interfaces and adapters | ||||
are used to abstract out the details of the actual network protocol used. | ||||
The EngineService is an Engine that knows nothing about the actual protocol | ||||
used. | ||||
The EngineService is exposed with various network protocols in modules like: | ||||
enginepb.py | ||||
enginevanilla.py | ||||
As of 12/12/06 the classes in this module have been simplified greatly. It was | ||||
felt that we had over-engineered things. To improve the maintainability of the | ||||
code we have taken out the ICompleteEngine interface and the completeEngine | ||||
method that automatically added methods to engines. | ||||
""" | ||||
__docformat__ = "restructuredtext en" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import os, sys, copy | ||||
import cPickle as pickle | ||||
from new import instancemethod | ||||
from twisted.application import service | ||||
from twisted.internet import defer, reactor | ||||
from twisted.python import log, failure, components | ||||
import zope.interface as zi | ||||
from IPython.kernel.core.interpreter import Interpreter | ||||
from IPython.kernel import newserialized, error, util | ||||
from IPython.kernel.util import printer | ||||
from IPython.kernel.twistedutil import gatherBoth, DeferredList | ||||
from IPython.kernel import codeutil | ||||
#------------------------------------------------------------------------------- | ||||
# Interface specification for the Engine | ||||
#------------------------------------------------------------------------------- | ||||
class IEngineCore(zi.Interface): | ||||
"""The minimal required interface for the IPython Engine. | ||||
This interface provides a formal specification of the IPython core. | ||||
All these methods should return deferreds regardless of what side of a | ||||
network connection they are on. | ||||
In general, this class simply wraps a shell class and wraps its return | ||||
values as Deferred objects. If the underlying shell class method raises | ||||
an exception, this class should convert it to a twisted.failure.Failure | ||||
that will be propagated along the Deferred's errback chain. | ||||
In addition, Failures are aggressive. By this, we mean that if a method | ||||
is performing multiple actions (like pulling multiple object) if any | ||||
single one fails, the entire method will fail with that Failure. It is | ||||
all or nothing. | ||||
""" | ||||
id = zi.interface.Attribute("the id of the Engine object") | ||||
properties = zi.interface.Attribute("A dict of properties of the Engine") | ||||
def execute(lines): | ||||
"""Execute lines of Python code. | ||||
Returns a dictionary with keys (id, number, stdin, stdout, stderr) | ||||
upon success. | ||||
Returns a failure object if the execution of lines raises an exception. | ||||
""" | ||||
def push(namespace): | ||||
"""Push dict namespace into the user's namespace. | ||||
Returns a deferred to None or a failure. | ||||
""" | ||||
def pull(keys): | ||||
"""Pulls values out of the user's namespace by keys. | ||||
Returns a deferred to a tuple objects or a single object. | ||||
Raises NameError if any one of objects doess not exist. | ||||
""" | ||||
def push_function(namespace): | ||||
"""Push a dict of key, function pairs into the user's namespace. | ||||
Returns a deferred to None or a failure.""" | ||||
def pull_function(keys): | ||||
"""Pulls functions out of the user's namespace by keys. | ||||
Returns a deferred to a tuple of functions or a single function. | ||||
Raises NameError if any one of the functions does not exist. | ||||
""" | ||||
def get_result(i=None): | ||||
"""Get the stdin/stdout/stderr of command i. | ||||
Returns a deferred to a dict with keys | ||||
(id, number, stdin, stdout, stderr). | ||||
Raises IndexError if command i does not exist. | ||||
Raises TypeError if i in not an int. | ||||
""" | ||||
def reset(): | ||||
"""Reset the shell. | ||||
This clears the users namespace. Won't cause modules to be | ||||
reloaded. Should also re-initialize certain variables like id. | ||||
""" | ||||
def kill(): | ||||
"""Kill the engine by stopping the reactor.""" | ||||
def keys(): | ||||
"""Return the top level variables in the users namspace. | ||||
Returns a deferred to a dict.""" | ||||
class IEngineSerialized(zi.Interface): | ||||
"""Push/Pull methods that take Serialized objects. | ||||
All methods should return deferreds. | ||||
""" | ||||
def push_serialized(namespace): | ||||
"""Push a dict of keys and Serialized objects into the user's namespace.""" | ||||
def pull_serialized(keys): | ||||
"""Pull objects by key from the user's namespace as Serialized. | ||||
Returns a list of or one Serialized. | ||||
Raises NameError is any one of the objects does not exist. | ||||
""" | ||||
class IEngineProperties(zi.Interface): | ||||
"""Methods for access to the properties object of an Engine""" | ||||
properties = zi.Attribute("A StrictDict object, containing the properties") | ||||
def set_properties(properties): | ||||
"""set properties by key and value""" | ||||
def get_properties(keys=None): | ||||
"""get a list of properties by `keys`, if no keys specified, get all""" | ||||
def del_properties(keys): | ||||
"""delete properties by `keys`""" | ||||
def has_properties(keys): | ||||
"""get a list of bool values for whether `properties` has `keys`""" | ||||
def clear_properties(): | ||||
"""clear the properties dict""" | ||||
class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties): | ||||
"""The basic engine interface that EngineService will implement. | ||||
This exists so it is easy to specify adapters that adapt to and from the | ||||
API that the basic EngineService implements. | ||||
""" | ||||
pass | ||||
class IEngineQueued(IEngineBase): | ||||
"""Interface for adding a queue to an IEngineBase. | ||||
This interface extends the IEngineBase interface to add methods for managing | ||||
the engine's queue. The implicit details of this interface are that the | ||||
execution of all methods declared in IEngineBase should appropriately be | ||||
put through a queue before execution. | ||||
All methods should return deferreds. | ||||
""" | ||||
def clear_queue(): | ||||
"""Clear the queue.""" | ||||
def queue_status(): | ||||
"""Get the queued and pending commands in the queue.""" | ||||
def register_failure_observer(obs): | ||||
"""Register an observer of pending Failures. | ||||
The observer must implement IFailureObserver. | ||||
""" | ||||
def unregister_failure_observer(obs): | ||||
"""Unregister an observer of pending Failures.""" | ||||
class IEngineThreaded(zi.Interface): | ||||
"""A place holder for threaded commands. | ||||
All methods should return deferreds. | ||||
""" | ||||
pass | ||||
#------------------------------------------------------------------------------- | ||||
# Functions and classes to implement the EngineService | ||||
#------------------------------------------------------------------------------- | ||||
class StrictDict(dict): | ||||
"""This is a strict copying dictionary for use as the interface to the | ||||
properties of an Engine. | ||||
:IMPORTANT: | ||||
This object copies the values you set to it, and returns copies to you | ||||
when you request them. The only way to change properties os explicitly | ||||
through the setitem and getitem of the dictionary interface. | ||||
Example: | ||||
>>> e = kernel.get_engine(id) | ||||
>>> L = someList | ||||
>>> e.properties['L'] = L | ||||
>>> L == e.properties['L'] | ||||
... True | ||||
>>> L.append(something Else) | ||||
>>> L == e.properties['L'] | ||||
... False | ||||
getitem copies, so calls to methods of objects do not affect the | ||||
properties, as in the following example: | ||||
>>> e.properties[1] = range(2) | ||||
>>> print e.properties[1] | ||||
... [0, 1] | ||||
>>> e.properties[1].append(2) | ||||
>>> print e.properties[1] | ||||
... [0, 1] | ||||
""" | ||||
def __init__(self, *args, **kwargs): | ||||
dict.__init__(self, *args, **kwargs) | ||||
self.modified = True | ||||
def __getitem__(self, key): | ||||
return copy.deepcopy(dict.__getitem__(self, key)) | ||||
def __setitem__(self, key, value): | ||||
# check if this entry is valid for transport around the network | ||||
# and copying | ||||
try: | ||||
pickle.dumps(key, 2) | ||||
pickle.dumps(value, 2) | ||||
newvalue = copy.deepcopy(value) | ||||
except: | ||||
raise error.InvalidProperty(value) | ||||
dict.__setitem__(self, key, newvalue) | ||||
self.modified = True | ||||
def __delitem__(self, key): | ||||
dict.__delitem__(self, key) | ||||
self.modified = True | ||||
def update(self, dikt): | ||||
for k,v in dikt.iteritems(): | ||||
self[k] = v | ||||
def pop(self, key): | ||||
self.modified = True | ||||
return dict.pop(self, key) | ||||
def popitem(self): | ||||
self.modified = True | ||||
return dict.popitem(self) | ||||
def clear(self): | ||||
self.modified = True | ||||
dict.clear(self) | ||||
def subDict(self, *keys): | ||||
d = {} | ||||
for key in keys: | ||||
d[key] = self[key] | ||||
return d | ||||
class EngineAPI(object): | ||||
"""This is the object through which the user can edit the `properties` | ||||
attribute of an Engine. | ||||
The Engine Properties object copies all object in and out of itself. | ||||
See the EngineProperties object for details. | ||||
""" | ||||
_fix=False | ||||
def __init__(self, id): | ||||
self.id = id | ||||
self.properties = StrictDict() | ||||
self._fix=True | ||||
def __setattr__(self, k,v): | ||||
if self._fix: | ||||
raise error.KernelError("I am protected!") | ||||
else: | ||||
object.__setattr__(self, k, v) | ||||
def __delattr__(self, key): | ||||
raise error.KernelError("I am protected!") | ||||
_apiDict = {} | ||||
def get_engine(id): | ||||
"""Get the Engine API object, whcih currently just provides the properties | ||||
object, by ID""" | ||||
global _apiDict | ||||
if not _apiDict.get(id): | ||||
_apiDict[id] = EngineAPI(id) | ||||
return _apiDict[id] | ||||
def drop_engine(id): | ||||
"""remove an engine""" | ||||
global _apiDict | ||||
if _apiDict.has_key(id): | ||||
del _apiDict[id] | ||||
class EngineService(object, service.Service): | ||||
"""Adapt a IPython shell into a IEngine implementing Twisted Service.""" | ||||
zi.implements(IEngineBase) | ||||
name = 'EngineService' | ||||
def __init__(self, shellClass=Interpreter, mpi=None): | ||||
"""Create an EngineService. | ||||
shellClass: something that implements IInterpreter or core1 | ||||
mpi: an mpi module that has rank and size attributes | ||||
""" | ||||
self.shellClass = shellClass | ||||
self.shell = self.shellClass() | ||||
self.mpi = mpi | ||||
self.id = None | ||||
self.properties = get_engine(self.id).properties | ||||
if self.mpi is not None: | ||||
log.msg("MPI started with rank = %i and size = %i" % | ||||
(self.mpi.rank, self.mpi.size)) | ||||
self.id = self.mpi.rank | ||||
self._seedNamespace() | ||||
# Make id a property so that the shell can get the updated id | ||||
def _setID(self, id): | ||||
self._id = id | ||||
self.properties = get_engine(id).properties | ||||
self.shell.push({'id': id}) | ||||
def _getID(self): | ||||
return self._id | ||||
id = property(_getID, _setID) | ||||
def _seedNamespace(self): | ||||
self.shell.push({'mpi': self.mpi, 'id' : self.id}) | ||||
def executeAndRaise(self, msg, callable, *args, **kwargs): | ||||
"""Call a method of self.shell and wrap any exception.""" | ||||
d = defer.Deferred() | ||||
try: | ||||
result = callable(*args, **kwargs) | ||||
except: | ||||
# This gives the following: | ||||
# et=exception class | ||||
# ev=exception class instance | ||||
# tb=traceback object | ||||
et,ev,tb = sys.exc_info() | ||||
# This call adds attributes to the exception value | ||||
et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) | ||||
# Add another attribute | ||||
ev._ipython_engine_info = msg | ||||
f = failure.Failure(ev,et,None) | ||||
d.errback(f) | ||||
else: | ||||
d.callback(result) | ||||
return d | ||||
# The IEngine methods. See the interface for documentation. | ||||
def execute(self, lines): | ||||
msg = {'engineid':self.id, | ||||
'method':'execute', | ||||
'args':[lines]} | ||||
d = self.executeAndRaise(msg, self.shell.execute, lines) | ||||
d.addCallback(self.addIDToResult) | ||||
return d | ||||
def addIDToResult(self, result): | ||||
result['id'] = self.id | ||||
return result | ||||
def push(self, namespace): | ||||
msg = {'engineid':self.id, | ||||
'method':'push', | ||||
'args':[repr(namespace.keys())]} | ||||
d = self.executeAndRaise(msg, self.shell.push, namespace) | ||||
return d | ||||
def pull(self, keys): | ||||
msg = {'engineid':self.id, | ||||
'method':'pull', | ||||
'args':[repr(keys)]} | ||||
d = self.executeAndRaise(msg, self.shell.pull, keys) | ||||
return d | ||||
def push_function(self, namespace): | ||||
msg = {'engineid':self.id, | ||||
'method':'push_function', | ||||
'args':[repr(namespace.keys())]} | ||||
d = self.executeAndRaise(msg, self.shell.push_function, namespace) | ||||
return d | ||||
def pull_function(self, keys): | ||||
msg = {'engineid':self.id, | ||||
'method':'pull_function', | ||||
'args':[repr(keys)]} | ||||
d = self.executeAndRaise(msg, self.shell.pull_function, keys) | ||||
return d | ||||
def get_result(self, i=None): | ||||
msg = {'engineid':self.id, | ||||
'method':'get_result', | ||||
'args':[repr(i)]} | ||||
d = self.executeAndRaise(msg, self.shell.getCommand, i) | ||||
d.addCallback(self.addIDToResult) | ||||
return d | ||||
def reset(self): | ||||
msg = {'engineid':self.id, | ||||
'method':'reset', | ||||
'args':[]} | ||||
del self.shell | ||||
self.shell = self.shellClass() | ||||
self.properties.clear() | ||||
d = self.executeAndRaise(msg, self._seedNamespace) | ||||
return d | ||||
def kill(self): | ||||
drop_engine(self.id) | ||||
try: | ||||
reactor.stop() | ||||
except RuntimeError: | ||||
log.msg('The reactor was not running apparently.') | ||||
return defer.fail() | ||||
else: | ||||
return defer.succeed(None) | ||||
def keys(self): | ||||
"""Return a list of variables names in the users top level namespace. | ||||
This used to return a dict of all the keys/repr(values) in the | ||||
user's namespace. This was too much info for the ControllerService | ||||
to handle so it is now just a list of keys. | ||||
""" | ||||
remotes = [] | ||||
for k in self.shell.user_ns.iterkeys(): | ||||
if k not in ['__name__', '_ih', '_oh', '__builtins__', | ||||
'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']: | ||||
remotes.append(k) | ||||
return defer.succeed(remotes) | ||||
def set_properties(self, properties): | ||||
msg = {'engineid':self.id, | ||||
'method':'set_properties', | ||||
'args':[repr(properties.keys())]} | ||||
return self.executeAndRaise(msg, self.properties.update, properties) | ||||
def get_properties(self, keys=None): | ||||
msg = {'engineid':self.id, | ||||
'method':'get_properties', | ||||
'args':[repr(keys)]} | ||||
if keys is None: | ||||
keys = self.properties.keys() | ||||
return self.executeAndRaise(msg, self.properties.subDict, *keys) | ||||
def _doDel(self, keys): | ||||
for key in keys: | ||||
del self.properties[key] | ||||
def del_properties(self, keys): | ||||
msg = {'engineid':self.id, | ||||
'method':'del_properties', | ||||
'args':[repr(keys)]} | ||||
return self.executeAndRaise(msg, self._doDel, keys) | ||||
def _doHas(self, keys): | ||||
return [self.properties.has_key(key) for key in keys] | ||||
def has_properties(self, keys): | ||||
msg = {'engineid':self.id, | ||||
'method':'has_properties', | ||||
'args':[repr(keys)]} | ||||
return self.executeAndRaise(msg, self._doHas, keys) | ||||
def clear_properties(self): | ||||
msg = {'engineid':self.id, | ||||
'method':'clear_properties', | ||||
'args':[]} | ||||
return self.executeAndRaise(msg, self.properties.clear) | ||||
def push_serialized(self, sNamespace): | ||||
msg = {'engineid':self.id, | ||||
'method':'push_serialized', | ||||
'args':[repr(sNamespace.keys())]} | ||||
ns = {} | ||||
for k,v in sNamespace.iteritems(): | ||||
try: | ||||
unserialized = newserialized.IUnSerialized(v) | ||||
ns[k] = unserialized.getObject() | ||||
except: | ||||
return defer.fail() | ||||
return self.executeAndRaise(msg, self.shell.push, ns) | ||||
def pull_serialized(self, keys): | ||||
msg = {'engineid':self.id, | ||||
'method':'pull_serialized', | ||||
'args':[repr(keys)]} | ||||
if isinstance(keys, str): | ||||
keys = [keys] | ||||
if len(keys)==1: | ||||
d = self.executeAndRaise(msg, self.shell.pull, keys) | ||||
d.addCallback(newserialized.serialize) | ||||
return d | ||||
elif len(keys)>1: | ||||
d = self.executeAndRaise(msg, self.shell.pull, keys) | ||||
@d.addCallback | ||||
def packThemUp(values): | ||||
serials = [] | ||||
for v in values: | ||||
try: | ||||
serials.append(newserialized.serialize(v)) | ||||
except: | ||||
return defer.fail(failure.Failure()) | ||||
return serials | ||||
return packThemUp | ||||
def queue(methodToQueue): | ||||
def queuedMethod(this, *args, **kwargs): | ||||
name = methodToQueue.__name__ | ||||
return this.submitCommand(Command(name, *args, **kwargs)) | ||||
return queuedMethod | ||||
class QueuedEngine(object): | ||||
"""Adapt an IEngineBase to an IEngineQueued by wrapping it. | ||||
The resulting object will implement IEngineQueued which extends | ||||
IEngineCore which extends (IEngineBase, IEngineSerialized). | ||||
This seems like the best way of handling it, but I am not sure. The | ||||
other option is to have the various base interfaces be used like | ||||
mix-in intefaces. The problem I have with this is adpatation is | ||||
more difficult and complicated because there can be can multiple | ||||
original and final Interfaces. | ||||
""" | ||||
zi.implements(IEngineQueued) | ||||
def __init__(self, engine): | ||||
"""Create a QueuedEngine object from an engine | ||||
engine: An implementor of IEngineCore and IEngineSerialized | ||||
keepUpToDate: whether to update the remote status when the | ||||
queue is empty. Defaults to False. | ||||
""" | ||||
# This is the right way to do these tests rather than | ||||
# IEngineCore in list(zi.providedBy(engine)) which will only | ||||
# picks of the interfaces that are directly declared by engine. | ||||
assert IEngineBase.providedBy(engine), \ | ||||
"engine passed to QueuedEngine doesn't provide IEngineBase" | ||||
self.engine = engine | ||||
self.id = engine.id | ||||
self.queued = [] | ||||
self.history = {} | ||||
self.engineStatus = {} | ||||
self.currentCommand = None | ||||
self.failureObservers = [] | ||||
def _get_properties(self): | ||||
return self.engine.properties | ||||
properties = property(_get_properties, lambda self, _: None) | ||||
# Queue management methods. You should not call these directly | ||||
def submitCommand(self, cmd): | ||||
"""Submit command to queue.""" | ||||
d = defer.Deferred() | ||||
cmd.setDeferred(d) | ||||
if self.currentCommand is not None: | ||||
if self.currentCommand.finished: | ||||
# log.msg("Running command immediately: %r" % cmd) | ||||
self.currentCommand = cmd | ||||
self.runCurrentCommand() | ||||
else: # command is still running | ||||
# log.msg("Command is running: %r" % self.currentCommand) | ||||
# log.msg("Queueing: %r" % cmd) | ||||
self.queued.append(cmd) | ||||
else: | ||||
# log.msg("No current commands, running: %r" % cmd) | ||||
self.currentCommand = cmd | ||||
self.runCurrentCommand() | ||||
return d | ||||
def runCurrentCommand(self): | ||||
"""Run current command.""" | ||||
cmd = self.currentCommand | ||||
f = getattr(self.engine, cmd.remoteMethod, None) | ||||
if f: | ||||
d = f(*cmd.args, **cmd.kwargs) | ||||
if cmd.remoteMethod is 'execute': | ||||
d.addCallback(self.saveResult) | ||||
d.addCallback(self.finishCommand) | ||||
d.addErrback(self.abortCommand) | ||||
else: | ||||
return defer.fail(AttributeError(cmd.remoteMethod)) | ||||
def _flushQueue(self): | ||||
"""Pop next command in queue and run it.""" | ||||
if len(self.queued) > 0: | ||||
self.currentCommand = self.queued.pop(0) | ||||
self.runCurrentCommand() | ||||
def saveResult(self, result): | ||||
"""Put the result in the history.""" | ||||
self.history[result['number']] = result | ||||
return result | ||||
def finishCommand(self, result): | ||||
"""Finish currrent command.""" | ||||
# The order of these commands is absolutely critical. | ||||
self.currentCommand.handleResult(result) | ||||
self.currentCommand.finished = True | ||||
self._flushQueue() | ||||
return result | ||||
def abortCommand(self, reason): | ||||
"""Abort current command. | ||||
This eats the Failure but first passes it onto the Deferred that the | ||||
user has. | ||||
It also clear out the queue so subsequence commands don't run. | ||||
""" | ||||
# The order of these 3 commands is absolutely critical. The currentCommand | ||||
# must first be marked as finished BEFORE the queue is cleared and before | ||||
# the current command is sent the failure. | ||||
# Also, the queue must be cleared BEFORE the current command is sent the Failure | ||||
# otherwise the errback chain could trigger new commands to be added to the | ||||
# queue before we clear it. We should clear ONLY the commands that were in | ||||
# the queue when the error occured. | ||||
self.currentCommand.finished = True | ||||
s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs) | ||||
self.clear_queue(msg=s) | ||||
self.currentCommand.handleError(reason) | ||||
return None | ||||
#--------------------------------------------------------------------------- | ||||
# IEngineCore methods | ||||
#--------------------------------------------------------------------------- | ||||
@queue | ||||
def execute(self, lines): | ||||
pass | ||||
@queue | ||||
def push(self, namespace): | ||||
pass | ||||
@queue | ||||
def pull(self, keys): | ||||
pass | ||||
@queue | ||||
def push_function(self, namespace): | ||||
pass | ||||
@queue | ||||
def pull_function(self, keys): | ||||
pass | ||||
def get_result(self, i=None): | ||||
if i is None: | ||||
i = max(self.history.keys()+[None]) | ||||
cmd = self.history.get(i, None) | ||||
# Uncomment this line to disable chaching of results | ||||
#cmd = None | ||||
if cmd is None: | ||||
return self.submitCommand(Command('get_result', i)) | ||||
else: | ||||
return defer.succeed(cmd) | ||||
def reset(self): | ||||
self.clear_queue() | ||||
self.history = {} # reset the cache - I am not sure we should do this | ||||
return self.submitCommand(Command('reset')) | ||||
def kill(self): | ||||
self.clear_queue() | ||||
return self.submitCommand(Command('kill')) | ||||
@queue | ||||
def keys(self): | ||||
pass | ||||
#--------------------------------------------------------------------------- | ||||
# IEngineSerialized methods | ||||
#--------------------------------------------------------------------------- | ||||
@queue | ||||
def push_serialized(self, namespace): | ||||
pass | ||||
@queue | ||||
def pull_serialized(self, keys): | ||||
pass | ||||
#--------------------------------------------------------------------------- | ||||
# IEngineProperties methods | ||||
#--------------------------------------------------------------------------- | ||||
@queue | ||||
def set_properties(self, namespace): | ||||
pass | ||||
@queue | ||||
def get_properties(self, keys=None): | ||||
pass | ||||
@queue | ||||
def del_properties(self, keys): | ||||
pass | ||||
@queue | ||||
def has_properties(self, keys): | ||||
pass | ||||
@queue | ||||
def clear_properties(self): | ||||
pass | ||||
#--------------------------------------------------------------------------- | ||||
# IQueuedEngine methods | ||||
#--------------------------------------------------------------------------- | ||||
def clear_queue(self, msg=''): | ||||
"""Clear the queue, but doesn't cancel the currently running commmand.""" | ||||
for cmd in self.queued: | ||||
cmd.deferred.errback(failure.Failure(error.QueueCleared(msg))) | ||||
self.queued = [] | ||||
return defer.succeed(None) | ||||
def queue_status(self): | ||||
if self.currentCommand is not None: | ||||
if self.currentCommand.finished: | ||||
pending = repr(None) | ||||
else: | ||||
pending = repr(self.currentCommand) | ||||
else: | ||||
pending = repr(None) | ||||
dikt = {'queue':map(repr,self.queued), 'pending':pending} | ||||
return defer.succeed(dikt) | ||||
def register_failure_observer(self, obs): | ||||
self.failureObservers.append(obs) | ||||
def unregister_failure_observer(self, obs): | ||||
self.failureObservers.remove(obs) | ||||
# Now register QueuedEngine as an adpater class that makes an IEngineBase into a | ||||
# IEngineQueued. | ||||
components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued) | ||||
class Command(object): | ||||
"""A command object that encapslates queued commands. | ||||
This class basically keeps track of a command that has been queued | ||||
in a QueuedEngine. It manages the deferreds and hold the method to be called | ||||
and the arguments to that method. | ||||
""" | ||||
def __init__(self, remoteMethod, *args, **kwargs): | ||||
"""Build a new Command object.""" | ||||
self.remoteMethod = remoteMethod | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
self.finished = False | ||||
def setDeferred(self, d): | ||||
"""Sets the deferred attribute of the Command.""" | ||||
self.deferred = d | ||||
def __repr__(self): | ||||
if not self.args: | ||||
args = '' | ||||
else: | ||||
args = str(self.args)[1:-2] #cut off (...,) | ||||
for k,v in self.kwargs.iteritems(): | ||||
if args: | ||||
args += ', ' | ||||
args += '%s=%r' %(k,v) | ||||
return "%s(%s)" %(self.remoteMethod, args) | ||||
def handleResult(self, result): | ||||
"""When the result is ready, relay it to self.deferred.""" | ||||
self.deferred.callback(result) | ||||
def handleError(self, reason): | ||||
"""When an error has occured, relay it to self.deferred.""" | ||||
self.deferred.errback(reason) | ||||
class ThreadedEngineService(EngineService): | ||||
zi.implements(IEngineBase) | ||||
def __init__(self, shellClass=Interpreter, mpi=None): | ||||
EngineService.__init__(self, shellClass, mpi) | ||||
# Only import this if we are going to use this class | ||||
from twisted.internet import threads | ||||
def execute(self, lines): | ||||
msg = """engine: %r | ||||
method: execute(lines) | ||||
lines = %s""" % (self.id, lines) | ||||
d = threads.deferToThread(self.executeAndRaise, msg, self.shell.execute, lines) | ||||
d.addCallback(self.addIDToResult) | ||||
return d | ||||