##// END OF EJS Templates
I.k.engineservice.ThreadedEngineService wrapps calls to self.shell.execute, adding extra information to exceptions, like EngineService.executeAndRaise.
Barry Wark -
r1320:ea3790ac merge
parent child Browse files
Show More
@@ -1,876 +1,901 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3
3
4 """A Twisted Service Representation of the IPython core.
4 """A Twisted Service Representation of the IPython core.
5
5
6 The IPython Core exposed to the network is called the Engine. Its
6 The IPython Core exposed to the network is called the Engine. Its
7 representation in Twisted in the EngineService. Interfaces and adapters
7 representation in Twisted in the EngineService. Interfaces and adapters
8 are used to abstract out the details of the actual network protocol used.
8 are used to abstract out the details of the actual network protocol used.
9 The EngineService is an Engine that knows nothing about the actual protocol
9 The EngineService is an Engine that knows nothing about the actual protocol
10 used.
10 used.
11
11
12 The EngineService is exposed with various network protocols in modules like:
12 The EngineService is exposed with various network protocols in modules like:
13
13
14 enginepb.py
14 enginepb.py
15 enginevanilla.py
15 enginevanilla.py
16
16
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 felt that we had over-engineered things. To improve the maintainability of the
18 felt that we had over-engineered things. To improve the maintainability of the
19 code we have taken out the ICompleteEngine interface and the completeEngine
19 code we have taken out the ICompleteEngine interface and the completeEngine
20 method that automatically added methods to engines.
20 method that automatically added methods to engines.
21
21
22 """
22 """
23
23
24 __docformat__ = "restructuredtext en"
24 __docformat__ = "restructuredtext en"
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Copyright (C) 2008 The IPython Development Team
27 # Copyright (C) 2008 The IPython Development Team
28 #
28 #
29 # Distributed under the terms of the BSD License. The full license is in
29 # Distributed under the terms of the BSD License. The full license is in
30 # the file COPYING, distributed as part of this software.
30 # the file COPYING, distributed as part of this software.
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
34 # Imports
34 # Imports
35 #-------------------------------------------------------------------------------
35 #-------------------------------------------------------------------------------
36
36
37 import os, sys, copy
37 import os, sys, copy
38 import cPickle as pickle
38 import cPickle as pickle
39 from new import instancemethod
39 from new import instancemethod
40
40
41 from twisted.application import service
41 from twisted.application import service
42 from twisted.internet import defer, reactor
42 from twisted.internet import defer, reactor
43 from twisted.python import log, failure, components
43 from twisted.python import log, failure, components
44 import zope.interface as zi
44 import zope.interface as zi
45
45
46 from IPython.kernel.core.interpreter import Interpreter
46 from IPython.kernel.core.interpreter import Interpreter
47 from IPython.kernel import newserialized, error, util
47 from IPython.kernel import newserialized, error, util
48 from IPython.kernel.util import printer
48 from IPython.kernel.util import printer
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 from IPython.kernel import codeutil
50 from IPython.kernel import codeutil
51
51
52
52
53 #-------------------------------------------------------------------------------
53 #-------------------------------------------------------------------------------
54 # Interface specification for the Engine
54 # Interface specification for the Engine
55 #-------------------------------------------------------------------------------
55 #-------------------------------------------------------------------------------
56
56
57 class IEngineCore(zi.Interface):
57 class IEngineCore(zi.Interface):
58 """The minimal required interface for the IPython Engine.
58 """The minimal required interface for the IPython Engine.
59
59
60 This interface provides a formal specification of the IPython core.
60 This interface provides a formal specification of the IPython core.
61 All these methods should return deferreds regardless of what side of a
61 All these methods should return deferreds regardless of what side of a
62 network connection they are on.
62 network connection they are on.
63
63
64 In general, this class simply wraps a shell class and wraps its return
64 In general, this class simply wraps a shell class and wraps its return
65 values as Deferred objects. If the underlying shell class method raises
65 values as Deferred objects. If the underlying shell class method raises
66 an exception, this class should convert it to a twisted.failure.Failure
66 an exception, this class should convert it to a twisted.failure.Failure
67 that will be propagated along the Deferred's errback chain.
67 that will be propagated along the Deferred's errback chain.
68
68
69 In addition, Failures are aggressive. By this, we mean that if a method
69 In addition, Failures are aggressive. By this, we mean that if a method
70 is performing multiple actions (like pulling multiple object) if any
70 is performing multiple actions (like pulling multiple object) if any
71 single one fails, the entire method will fail with that Failure. It is
71 single one fails, the entire method will fail with that Failure. It is
72 all or nothing.
72 all or nothing.
73 """
73 """
74
74
75 id = zi.interface.Attribute("the id of the Engine object")
75 id = zi.interface.Attribute("the id of the Engine object")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77
77
78 def execute(lines):
78 def execute(lines):
79 """Execute lines of Python code.
79 """Execute lines of Python code.
80
80
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 upon success.
82 upon success.
83
83
84 Returns a failure object if the execution of lines raises an exception.
84 Returns a failure object if the execution of lines raises an exception.
85 """
85 """
86
86
87 def push(namespace):
87 def push(namespace):
88 """Push dict namespace into the user's namespace.
88 """Push dict namespace into the user's namespace.
89
89
90 Returns a deferred to None or a failure.
90 Returns a deferred to None or a failure.
91 """
91 """
92
92
93 def pull(keys):
93 def pull(keys):
94 """Pulls values out of the user's namespace by keys.
94 """Pulls values out of the user's namespace by keys.
95
95
96 Returns a deferred to a tuple objects or a single object.
96 Returns a deferred to a tuple objects or a single object.
97
97
98 Raises NameError if any one of objects doess not exist.
98 Raises NameError if any one of objects doess not exist.
99 """
99 """
100
100
101 def push_function(namespace):
101 def push_function(namespace):
102 """Push a dict of key, function pairs into the user's namespace.
102 """Push a dict of key, function pairs into the user's namespace.
103
103
104 Returns a deferred to None or a failure."""
104 Returns a deferred to None or a failure."""
105
105
106 def pull_function(keys):
106 def pull_function(keys):
107 """Pulls functions out of the user's namespace by keys.
107 """Pulls functions out of the user's namespace by keys.
108
108
109 Returns a deferred to a tuple of functions or a single function.
109 Returns a deferred to a tuple of functions or a single function.
110
110
111 Raises NameError if any one of the functions does not exist.
111 Raises NameError if any one of the functions does not exist.
112 """
112 """
113
113
114 def get_result(i=None):
114 def get_result(i=None):
115 """Get the stdin/stdout/stderr of command i.
115 """Get the stdin/stdout/stderr of command i.
116
116
117 Returns a deferred to a dict with keys
117 Returns a deferred to a dict with keys
118 (id, number, stdin, stdout, stderr).
118 (id, number, stdin, stdout, stderr).
119
119
120 Raises IndexError if command i does not exist.
120 Raises IndexError if command i does not exist.
121 Raises TypeError if i in not an int.
121 Raises TypeError if i in not an int.
122 """
122 """
123
123
124 def reset():
124 def reset():
125 """Reset the shell.
125 """Reset the shell.
126
126
127 This clears the users namespace. Won't cause modules to be
127 This clears the users namespace. Won't cause modules to be
128 reloaded. Should also re-initialize certain variables like id.
128 reloaded. Should also re-initialize certain variables like id.
129 """
129 """
130
130
131 def kill():
131 def kill():
132 """Kill the engine by stopping the reactor."""
132 """Kill the engine by stopping the reactor."""
133
133
134 def keys():
134 def keys():
135 """Return the top level variables in the users namspace.
135 """Return the top level variables in the users namspace.
136
136
137 Returns a deferred to a dict."""
137 Returns a deferred to a dict."""
138
138
139
139
140 class IEngineSerialized(zi.Interface):
140 class IEngineSerialized(zi.Interface):
141 """Push/Pull methods that take Serialized objects.
141 """Push/Pull methods that take Serialized objects.
142
142
143 All methods should return deferreds.
143 All methods should return deferreds.
144 """
144 """
145
145
146 def push_serialized(namespace):
146 def push_serialized(namespace):
147 """Push a dict of keys and Serialized objects into the user's namespace."""
147 """Push a dict of keys and Serialized objects into the user's namespace."""
148
148
149 def pull_serialized(keys):
149 def pull_serialized(keys):
150 """Pull objects by key from the user's namespace as Serialized.
150 """Pull objects by key from the user's namespace as Serialized.
151
151
152 Returns a list of or one Serialized.
152 Returns a list of or one Serialized.
153
153
154 Raises NameError is any one of the objects does not exist.
154 Raises NameError is any one of the objects does not exist.
155 """
155 """
156
156
157
157
158 class IEngineProperties(zi.Interface):
158 class IEngineProperties(zi.Interface):
159 """Methods for access to the properties object of an Engine"""
159 """Methods for access to the properties object of an Engine"""
160
160
161 properties = zi.Attribute("A StrictDict object, containing the properties")
161 properties = zi.Attribute("A StrictDict object, containing the properties")
162
162
163 def set_properties(properties):
163 def set_properties(properties):
164 """set properties by key and value"""
164 """set properties by key and value"""
165
165
166 def get_properties(keys=None):
166 def get_properties(keys=None):
167 """get a list of properties by `keys`, if no keys specified, get all"""
167 """get a list of properties by `keys`, if no keys specified, get all"""
168
168
169 def del_properties(keys):
169 def del_properties(keys):
170 """delete properties by `keys`"""
170 """delete properties by `keys`"""
171
171
172 def has_properties(keys):
172 def has_properties(keys):
173 """get a list of bool values for whether `properties` has `keys`"""
173 """get a list of bool values for whether `properties` has `keys`"""
174
174
175 def clear_properties():
175 def clear_properties():
176 """clear the properties dict"""
176 """clear the properties dict"""
177
177
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 """The basic engine interface that EngineService will implement.
179 """The basic engine interface that EngineService will implement.
180
180
181 This exists so it is easy to specify adapters that adapt to and from the
181 This exists so it is easy to specify adapters that adapt to and from the
182 API that the basic EngineService implements.
182 API that the basic EngineService implements.
183 """
183 """
184 pass
184 pass
185
185
186 class IEngineQueued(IEngineBase):
186 class IEngineQueued(IEngineBase):
187 """Interface for adding a queue to an IEngineBase.
187 """Interface for adding a queue to an IEngineBase.
188
188
189 This interface extends the IEngineBase interface to add methods for managing
189 This interface extends the IEngineBase interface to add methods for managing
190 the engine's queue. The implicit details of this interface are that the
190 the engine's queue. The implicit details of this interface are that the
191 execution of all methods declared in IEngineBase should appropriately be
191 execution of all methods declared in IEngineBase should appropriately be
192 put through a queue before execution.
192 put through a queue before execution.
193
193
194 All methods should return deferreds.
194 All methods should return deferreds.
195 """
195 """
196
196
197 def clear_queue():
197 def clear_queue():
198 """Clear the queue."""
198 """Clear the queue."""
199
199
200 def queue_status():
200 def queue_status():
201 """Get the queued and pending commands in the queue."""
201 """Get the queued and pending commands in the queue."""
202
202
203 def register_failure_observer(obs):
203 def register_failure_observer(obs):
204 """Register an observer of pending Failures.
204 """Register an observer of pending Failures.
205
205
206 The observer must implement IFailureObserver.
206 The observer must implement IFailureObserver.
207 """
207 """
208
208
209 def unregister_failure_observer(obs):
209 def unregister_failure_observer(obs):
210 """Unregister an observer of pending Failures."""
210 """Unregister an observer of pending Failures."""
211
211
212
212
213 class IEngineThreaded(zi.Interface):
213 class IEngineThreaded(zi.Interface):
214 """A place holder for threaded commands.
214 """A place holder for threaded commands.
215
215
216 All methods should return deferreds.
216 All methods should return deferreds.
217 """
217 """
218 pass
218 pass
219
219
220
220
221 #-------------------------------------------------------------------------------
221 #-------------------------------------------------------------------------------
222 # Functions and classes to implement the EngineService
222 # Functions and classes to implement the EngineService
223 #-------------------------------------------------------------------------------
223 #-------------------------------------------------------------------------------
224
224
225
225
226 class StrictDict(dict):
226 class StrictDict(dict):
227 """This is a strict copying dictionary for use as the interface to the
227 """This is a strict copying dictionary for use as the interface to the
228 properties of an Engine.
228 properties of an Engine.
229 :IMPORTANT:
229 :IMPORTANT:
230 This object copies the values you set to it, and returns copies to you
230 This object copies the values you set to it, and returns copies to you
231 when you request them. The only way to change properties os explicitly
231 when you request them. The only way to change properties os explicitly
232 through the setitem and getitem of the dictionary interface.
232 through the setitem and getitem of the dictionary interface.
233 Example:
233 Example:
234 >>> e = kernel.get_engine(id)
234 >>> e = kernel.get_engine(id)
235 >>> L = someList
235 >>> L = someList
236 >>> e.properties['L'] = L
236 >>> e.properties['L'] = L
237 >>> L == e.properties['L']
237 >>> L == e.properties['L']
238 ... True
238 ... True
239 >>> L.append(something Else)
239 >>> L.append(something Else)
240 >>> L == e.properties['L']
240 >>> L == e.properties['L']
241 ... False
241 ... False
242
242
243 getitem copies, so calls to methods of objects do not affect the
243 getitem copies, so calls to methods of objects do not affect the
244 properties, as in the following example:
244 properties, as in the following example:
245 >>> e.properties[1] = range(2)
245 >>> e.properties[1] = range(2)
246 >>> print e.properties[1]
246 >>> print e.properties[1]
247 ... [0, 1]
247 ... [0, 1]
248 >>> e.properties[1].append(2)
248 >>> e.properties[1].append(2)
249 >>> print e.properties[1]
249 >>> print e.properties[1]
250 ... [0, 1]
250 ... [0, 1]
251
251
252 """
252 """
253 def __init__(self, *args, **kwargs):
253 def __init__(self, *args, **kwargs):
254 dict.__init__(self, *args, **kwargs)
254 dict.__init__(self, *args, **kwargs)
255 self.modified = True
255 self.modified = True
256
256
257 def __getitem__(self, key):
257 def __getitem__(self, key):
258 return copy.deepcopy(dict.__getitem__(self, key))
258 return copy.deepcopy(dict.__getitem__(self, key))
259
259
260 def __setitem__(self, key, value):
260 def __setitem__(self, key, value):
261 # check if this entry is valid for transport around the network
261 # check if this entry is valid for transport around the network
262 # and copying
262 # and copying
263 try:
263 try:
264 pickle.dumps(key, 2)
264 pickle.dumps(key, 2)
265 pickle.dumps(value, 2)
265 pickle.dumps(value, 2)
266 newvalue = copy.deepcopy(value)
266 newvalue = copy.deepcopy(value)
267 except:
267 except:
268 raise error.InvalidProperty(value)
268 raise error.InvalidProperty(value)
269 dict.__setitem__(self, key, newvalue)
269 dict.__setitem__(self, key, newvalue)
270 self.modified = True
270 self.modified = True
271
271
272 def __delitem__(self, key):
272 def __delitem__(self, key):
273 dict.__delitem__(self, key)
273 dict.__delitem__(self, key)
274 self.modified = True
274 self.modified = True
275
275
276 def update(self, dikt):
276 def update(self, dikt):
277 for k,v in dikt.iteritems():
277 for k,v in dikt.iteritems():
278 self[k] = v
278 self[k] = v
279
279
280 def pop(self, key):
280 def pop(self, key):
281 self.modified = True
281 self.modified = True
282 return dict.pop(self, key)
282 return dict.pop(self, key)
283
283
284 def popitem(self):
284 def popitem(self):
285 self.modified = True
285 self.modified = True
286 return dict.popitem(self)
286 return dict.popitem(self)
287
287
288 def clear(self):
288 def clear(self):
289 self.modified = True
289 self.modified = True
290 dict.clear(self)
290 dict.clear(self)
291
291
292 def subDict(self, *keys):
292 def subDict(self, *keys):
293 d = {}
293 d = {}
294 for key in keys:
294 for key in keys:
295 d[key] = self[key]
295 d[key] = self[key]
296 return d
296 return d
297
297
298
298
299
299
300 class EngineAPI(object):
300 class EngineAPI(object):
301 """This is the object through which the user can edit the `properties`
301 """This is the object through which the user can edit the `properties`
302 attribute of an Engine.
302 attribute of an Engine.
303 The Engine Properties object copies all object in and out of itself.
303 The Engine Properties object copies all object in and out of itself.
304 See the EngineProperties object for details.
304 See the EngineProperties object for details.
305 """
305 """
306 _fix=False
306 _fix=False
307 def __init__(self, id):
307 def __init__(self, id):
308 self.id = id
308 self.id = id
309 self.properties = StrictDict()
309 self.properties = StrictDict()
310 self._fix=True
310 self._fix=True
311
311
312 def __setattr__(self, k,v):
312 def __setattr__(self, k,v):
313 if self._fix:
313 if self._fix:
314 raise error.KernelError("I am protected!")
314 raise error.KernelError("I am protected!")
315 else:
315 else:
316 object.__setattr__(self, k, v)
316 object.__setattr__(self, k, v)
317
317
318 def __delattr__(self, key):
318 def __delattr__(self, key):
319 raise error.KernelError("I am protected!")
319 raise error.KernelError("I am protected!")
320
320
321
321
322 _apiDict = {}
322 _apiDict = {}
323
323
324 def get_engine(id):
324 def get_engine(id):
325 """Get the Engine API object, whcih currently just provides the properties
325 """Get the Engine API object, whcih currently just provides the properties
326 object, by ID"""
326 object, by ID"""
327 global _apiDict
327 global _apiDict
328 if not _apiDict.get(id):
328 if not _apiDict.get(id):
329 _apiDict[id] = EngineAPI(id)
329 _apiDict[id] = EngineAPI(id)
330 return _apiDict[id]
330 return _apiDict[id]
331
331
332 def drop_engine(id):
332 def drop_engine(id):
333 """remove an engine"""
333 """remove an engine"""
334 global _apiDict
334 global _apiDict
335 if _apiDict.has_key(id):
335 if _apiDict.has_key(id):
336 del _apiDict[id]
336 del _apiDict[id]
337
337
338 class EngineService(object, service.Service):
338 class EngineService(object, service.Service):
339 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
339 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
340
340
341 zi.implements(IEngineBase)
341 zi.implements(IEngineBase)
342 name = 'EngineService'
342 name = 'EngineService'
343
343
344 def __init__(self, shellClass=Interpreter, mpi=None):
344 def __init__(self, shellClass=Interpreter, mpi=None):
345 """Create an EngineService.
345 """Create an EngineService.
346
346
347 shellClass: something that implements IInterpreter or core1
347 shellClass: something that implements IInterpreter or core1
348 mpi: an mpi module that has rank and size attributes
348 mpi: an mpi module that has rank and size attributes
349 """
349 """
350 self.shellClass = shellClass
350 self.shellClass = shellClass
351 self.shell = self.shellClass()
351 self.shell = self.shellClass()
352 self.mpi = mpi
352 self.mpi = mpi
353 self.id = None
353 self.id = None
354 self.properties = get_engine(self.id).properties
354 self.properties = get_engine(self.id).properties
355 if self.mpi is not None:
355 if self.mpi is not None:
356 log.msg("MPI started with rank = %i and size = %i" %
356 log.msg("MPI started with rank = %i and size = %i" %
357 (self.mpi.rank, self.mpi.size))
357 (self.mpi.rank, self.mpi.size))
358 self.id = self.mpi.rank
358 self.id = self.mpi.rank
359 self._seedNamespace()
359 self._seedNamespace()
360
360
361 # Make id a property so that the shell can get the updated id
361 # Make id a property so that the shell can get the updated id
362
362
363 def _setID(self, id):
363 def _setID(self, id):
364 self._id = id
364 self._id = id
365 self.properties = get_engine(id).properties
365 self.properties = get_engine(id).properties
366 self.shell.push({'id': id})
366 self.shell.push({'id': id})
367
367
368 def _getID(self):
368 def _getID(self):
369 return self._id
369 return self._id
370
370
371 id = property(_getID, _setID)
371 id = property(_getID, _setID)
372
372
373 def _seedNamespace(self):
373 def _seedNamespace(self):
374 self.shell.push({'mpi': self.mpi, 'id' : self.id})
374 self.shell.push({'mpi': self.mpi, 'id' : self.id})
375
375
376 def executeAndRaise(self, msg, callable, *args, **kwargs):
376 def executeAndRaise(self, msg, callable, *args, **kwargs):
377 """Call a method of self.shell and wrap any exception."""
377 """Call a method of self.shell and wrap any exception."""
378 d = defer.Deferred()
378 d = defer.Deferred()
379 try:
379 try:
380 result = callable(*args, **kwargs)
380 result = callable(*args, **kwargs)
381 except:
381 except:
382 # This gives the following:
382 # This gives the following:
383 # et=exception class
383 # et=exception class
384 # ev=exception class instance
384 # ev=exception class instance
385 # tb=traceback object
385 # tb=traceback object
386 et,ev,tb = sys.exc_info()
386 et,ev,tb = sys.exc_info()
387 # This call adds attributes to the exception value
387 # This call adds attributes to the exception value
388 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
388 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
389 # Add another attribute
389 # Add another attribute
390 ev._ipython_engine_info = msg
390 ev._ipython_engine_info = msg
391 f = failure.Failure(ev,et,None)
391 f = failure.Failure(ev,et,None)
392 d.errback(f)
392 d.errback(f)
393 else:
393 else:
394 d.callback(result)
394 d.callback(result)
395
395
396 return d
396 return d
397
397
398
398 # The IEngine methods. See the interface for documentation.
399 # The IEngine methods. See the interface for documentation.
399
400
400 def execute(self, lines):
401 def execute(self, lines):
401 msg = {'engineid':self.id,
402 msg = {'engineid':self.id,
402 'method':'execute',
403 'method':'execute',
403 'args':[lines]}
404 'args':[lines]}
404 d = self.executeAndRaise(msg, self.shell.execute, lines)
405 d = self.executeAndRaise(msg, self.shell.execute, lines)
405 d.addCallback(self.addIDToResult)
406 d.addCallback(self.addIDToResult)
406 return d
407 return d
407
408
408 def addIDToResult(self, result):
409 def addIDToResult(self, result):
409 result['id'] = self.id
410 result['id'] = self.id
410 return result
411 return result
411
412
412 def push(self, namespace):
413 def push(self, namespace):
413 msg = {'engineid':self.id,
414 msg = {'engineid':self.id,
414 'method':'push',
415 'method':'push',
415 'args':[repr(namespace.keys())]}
416 'args':[repr(namespace.keys())]}
416 d = self.executeAndRaise(msg, self.shell.push, namespace)
417 d = self.executeAndRaise(msg, self.shell.push, namespace)
417 return d
418 return d
418
419
419 def pull(self, keys):
420 def pull(self, keys):
420 msg = {'engineid':self.id,
421 msg = {'engineid':self.id,
421 'method':'pull',
422 'method':'pull',
422 'args':[repr(keys)]}
423 'args':[repr(keys)]}
423 d = self.executeAndRaise(msg, self.shell.pull, keys)
424 d = self.executeAndRaise(msg, self.shell.pull, keys)
424 return d
425 return d
425
426
426 def push_function(self, namespace):
427 def push_function(self, namespace):
427 msg = {'engineid':self.id,
428 msg = {'engineid':self.id,
428 'method':'push_function',
429 'method':'push_function',
429 'args':[repr(namespace.keys())]}
430 'args':[repr(namespace.keys())]}
430 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
431 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
431 return d
432 return d
432
433
433 def pull_function(self, keys):
434 def pull_function(self, keys):
434 msg = {'engineid':self.id,
435 msg = {'engineid':self.id,
435 'method':'pull_function',
436 'method':'pull_function',
436 'args':[repr(keys)]}
437 'args':[repr(keys)]}
437 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
438 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
438 return d
439 return d
439
440
440 def get_result(self, i=None):
441 def get_result(self, i=None):
441 msg = {'engineid':self.id,
442 msg = {'engineid':self.id,
442 'method':'get_result',
443 'method':'get_result',
443 'args':[repr(i)]}
444 'args':[repr(i)]}
444 d = self.executeAndRaise(msg, self.shell.getCommand, i)
445 d = self.executeAndRaise(msg, self.shell.getCommand, i)
445 d.addCallback(self.addIDToResult)
446 d.addCallback(self.addIDToResult)
446 return d
447 return d
447
448
448 def reset(self):
449 def reset(self):
449 msg = {'engineid':self.id,
450 msg = {'engineid':self.id,
450 'method':'reset',
451 'method':'reset',
451 'args':[]}
452 'args':[]}
452 del self.shell
453 del self.shell
453 self.shell = self.shellClass()
454 self.shell = self.shellClass()
454 self.properties.clear()
455 self.properties.clear()
455 d = self.executeAndRaise(msg, self._seedNamespace)
456 d = self.executeAndRaise(msg, self._seedNamespace)
456 return d
457 return d
457
458
458 def kill(self):
459 def kill(self):
459 drop_engine(self.id)
460 drop_engine(self.id)
460 try:
461 try:
461 reactor.stop()
462 reactor.stop()
462 except RuntimeError:
463 except RuntimeError:
463 log.msg('The reactor was not running apparently.')
464 log.msg('The reactor was not running apparently.')
464 return defer.fail()
465 return defer.fail()
465 else:
466 else:
466 return defer.succeed(None)
467 return defer.succeed(None)
467
468
468 def keys(self):
469 def keys(self):
469 """Return a list of variables names in the users top level namespace.
470 """Return a list of variables names in the users top level namespace.
470
471
471 This used to return a dict of all the keys/repr(values) in the
472 This used to return a dict of all the keys/repr(values) in the
472 user's namespace. This was too much info for the ControllerService
473 user's namespace. This was too much info for the ControllerService
473 to handle so it is now just a list of keys.
474 to handle so it is now just a list of keys.
474 """
475 """
475
476
476 remotes = []
477 remotes = []
477 for k in self.shell.user_ns.iterkeys():
478 for k in self.shell.user_ns.iterkeys():
478 if k not in ['__name__', '_ih', '_oh', '__builtins__',
479 if k not in ['__name__', '_ih', '_oh', '__builtins__',
479 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
480 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
480 remotes.append(k)
481 remotes.append(k)
481 return defer.succeed(remotes)
482 return defer.succeed(remotes)
482
483
483 def set_properties(self, properties):
484 def set_properties(self, properties):
484 msg = {'engineid':self.id,
485 msg = {'engineid':self.id,
485 'method':'set_properties',
486 'method':'set_properties',
486 'args':[repr(properties.keys())]}
487 'args':[repr(properties.keys())]}
487 return self.executeAndRaise(msg, self.properties.update, properties)
488 return self.executeAndRaise(msg, self.properties.update, properties)
488
489
489 def get_properties(self, keys=None):
490 def get_properties(self, keys=None):
490 msg = {'engineid':self.id,
491 msg = {'engineid':self.id,
491 'method':'get_properties',
492 'method':'get_properties',
492 'args':[repr(keys)]}
493 'args':[repr(keys)]}
493 if keys is None:
494 if keys is None:
494 keys = self.properties.keys()
495 keys = self.properties.keys()
495 return self.executeAndRaise(msg, self.properties.subDict, *keys)
496 return self.executeAndRaise(msg, self.properties.subDict, *keys)
496
497
497 def _doDel(self, keys):
498 def _doDel(self, keys):
498 for key in keys:
499 for key in keys:
499 del self.properties[key]
500 del self.properties[key]
500
501
501 def del_properties(self, keys):
502 def del_properties(self, keys):
502 msg = {'engineid':self.id,
503 msg = {'engineid':self.id,
503 'method':'del_properties',
504 'method':'del_properties',
504 'args':[repr(keys)]}
505 'args':[repr(keys)]}
505 return self.executeAndRaise(msg, self._doDel, keys)
506 return self.executeAndRaise(msg, self._doDel, keys)
506
507
507 def _doHas(self, keys):
508 def _doHas(self, keys):
508 return [self.properties.has_key(key) for key in keys]
509 return [self.properties.has_key(key) for key in keys]
509
510
510 def has_properties(self, keys):
511 def has_properties(self, keys):
511 msg = {'engineid':self.id,
512 msg = {'engineid':self.id,
512 'method':'has_properties',
513 'method':'has_properties',
513 'args':[repr(keys)]}
514 'args':[repr(keys)]}
514 return self.executeAndRaise(msg, self._doHas, keys)
515 return self.executeAndRaise(msg, self._doHas, keys)
515
516
516 def clear_properties(self):
517 def clear_properties(self):
517 msg = {'engineid':self.id,
518 msg = {'engineid':self.id,
518 'method':'clear_properties',
519 'method':'clear_properties',
519 'args':[]}
520 'args':[]}
520 return self.executeAndRaise(msg, self.properties.clear)
521 return self.executeAndRaise(msg, self.properties.clear)
521
522
522 def push_serialized(self, sNamespace):
523 def push_serialized(self, sNamespace):
523 msg = {'engineid':self.id,
524 msg = {'engineid':self.id,
524 'method':'push_serialized',
525 'method':'push_serialized',
525 'args':[repr(sNamespace.keys())]}
526 'args':[repr(sNamespace.keys())]}
526 ns = {}
527 ns = {}
527 for k,v in sNamespace.iteritems():
528 for k,v in sNamespace.iteritems():
528 try:
529 try:
529 unserialized = newserialized.IUnSerialized(v)
530 unserialized = newserialized.IUnSerialized(v)
530 ns[k] = unserialized.getObject()
531 ns[k] = unserialized.getObject()
531 except:
532 except:
532 return defer.fail()
533 return defer.fail()
533 return self.executeAndRaise(msg, self.shell.push, ns)
534 return self.executeAndRaise(msg, self.shell.push, ns)
534
535
535 def pull_serialized(self, keys):
536 def pull_serialized(self, keys):
536 msg = {'engineid':self.id,
537 msg = {'engineid':self.id,
537 'method':'pull_serialized',
538 'method':'pull_serialized',
538 'args':[repr(keys)]}
539 'args':[repr(keys)]}
539 if isinstance(keys, str):
540 if isinstance(keys, str):
540 keys = [keys]
541 keys = [keys]
541 if len(keys)==1:
542 if len(keys)==1:
542 d = self.executeAndRaise(msg, self.shell.pull, keys)
543 d = self.executeAndRaise(msg, self.shell.pull, keys)
543 d.addCallback(newserialized.serialize)
544 d.addCallback(newserialized.serialize)
544 return d
545 return d
545 elif len(keys)>1:
546 elif len(keys)>1:
546 d = self.executeAndRaise(msg, self.shell.pull, keys)
547 d = self.executeAndRaise(msg, self.shell.pull, keys)
547 @d.addCallback
548 @d.addCallback
548 def packThemUp(values):
549 def packThemUp(values):
549 serials = []
550 serials = []
550 for v in values:
551 for v in values:
551 try:
552 try:
552 serials.append(newserialized.serialize(v))
553 serials.append(newserialized.serialize(v))
553 except:
554 except:
554 return defer.fail(failure.Failure())
555 return defer.fail(failure.Failure())
555 return serials
556 return serials
556 return packThemUp
557 return packThemUp
557
558
558
559
559 def queue(methodToQueue):
560 def queue(methodToQueue):
560 def queuedMethod(this, *args, **kwargs):
561 def queuedMethod(this, *args, **kwargs):
561 name = methodToQueue.__name__
562 name = methodToQueue.__name__
562 return this.submitCommand(Command(name, *args, **kwargs))
563 return this.submitCommand(Command(name, *args, **kwargs))
563 return queuedMethod
564 return queuedMethod
564
565
565 class QueuedEngine(object):
566 class QueuedEngine(object):
566 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
567 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
567
568
568 The resulting object will implement IEngineQueued which extends
569 The resulting object will implement IEngineQueued which extends
569 IEngineCore which extends (IEngineBase, IEngineSerialized).
570 IEngineCore which extends (IEngineBase, IEngineSerialized).
570
571
571 This seems like the best way of handling it, but I am not sure. The
572 This seems like the best way of handling it, but I am not sure. The
572 other option is to have the various base interfaces be used like
573 other option is to have the various base interfaces be used like
573 mix-in intefaces. The problem I have with this is adpatation is
574 mix-in intefaces. The problem I have with this is adpatation is
574 more difficult and complicated because there can be can multiple
575 more difficult and complicated because there can be can multiple
575 original and final Interfaces.
576 original and final Interfaces.
576 """
577 """
577
578
578 zi.implements(IEngineQueued)
579 zi.implements(IEngineQueued)
579
580
580 def __init__(self, engine):
581 def __init__(self, engine):
581 """Create a QueuedEngine object from an engine
582 """Create a QueuedEngine object from an engine
582
583
583 engine: An implementor of IEngineCore and IEngineSerialized
584 engine: An implementor of IEngineCore and IEngineSerialized
584 keepUpToDate: whether to update the remote status when the
585 keepUpToDate: whether to update the remote status when the
585 queue is empty. Defaults to False.
586 queue is empty. Defaults to False.
586 """
587 """
587
588
588 # This is the right way to do these tests rather than
589 # This is the right way to do these tests rather than
589 # IEngineCore in list(zi.providedBy(engine)) which will only
590 # IEngineCore in list(zi.providedBy(engine)) which will only
590 # picks of the interfaces that are directly declared by engine.
591 # picks of the interfaces that are directly declared by engine.
591 assert IEngineBase.providedBy(engine), \
592 assert IEngineBase.providedBy(engine), \
592 "engine passed to QueuedEngine doesn't provide IEngineBase"
593 "engine passed to QueuedEngine doesn't provide IEngineBase"
593
594
594 self.engine = engine
595 self.engine = engine
595 self.id = engine.id
596 self.id = engine.id
596 self.queued = []
597 self.queued = []
597 self.history = {}
598 self.history = {}
598 self.engineStatus = {}
599 self.engineStatus = {}
599 self.currentCommand = None
600 self.currentCommand = None
600 self.failureObservers = []
601 self.failureObservers = []
601
602
602 def _get_properties(self):
603 def _get_properties(self):
603 return self.engine.properties
604 return self.engine.properties
604
605
605 properties = property(_get_properties, lambda self, _: None)
606 properties = property(_get_properties, lambda self, _: None)
606 # Queue management methods. You should not call these directly
607 # Queue management methods. You should not call these directly
607
608
608 def submitCommand(self, cmd):
609 def submitCommand(self, cmd):
609 """Submit command to queue."""
610 """Submit command to queue."""
610
611
611 d = defer.Deferred()
612 d = defer.Deferred()
612 cmd.setDeferred(d)
613 cmd.setDeferred(d)
613 if self.currentCommand is not None:
614 if self.currentCommand is not None:
614 if self.currentCommand.finished:
615 if self.currentCommand.finished:
615 # log.msg("Running command immediately: %r" % cmd)
616 # log.msg("Running command immediately: %r" % cmd)
616 self.currentCommand = cmd
617 self.currentCommand = cmd
617 self.runCurrentCommand()
618 self.runCurrentCommand()
618 else: # command is still running
619 else: # command is still running
619 # log.msg("Command is running: %r" % self.currentCommand)
620 # log.msg("Command is running: %r" % self.currentCommand)
620 # log.msg("Queueing: %r" % cmd)
621 # log.msg("Queueing: %r" % cmd)
621 self.queued.append(cmd)
622 self.queued.append(cmd)
622 else:
623 else:
623 # log.msg("No current commands, running: %r" % cmd)
624 # log.msg("No current commands, running: %r" % cmd)
624 self.currentCommand = cmd
625 self.currentCommand = cmd
625 self.runCurrentCommand()
626 self.runCurrentCommand()
626 return d
627 return d
627
628
628 def runCurrentCommand(self):
629 def runCurrentCommand(self):
629 """Run current command."""
630 """Run current command."""
630
631
631 cmd = self.currentCommand
632 cmd = self.currentCommand
632 f = getattr(self.engine, cmd.remoteMethod, None)
633 f = getattr(self.engine, cmd.remoteMethod, None)
633 if f:
634 if f:
634 d = f(*cmd.args, **cmd.kwargs)
635 d = f(*cmd.args, **cmd.kwargs)
635 if cmd.remoteMethod is 'execute':
636 if cmd.remoteMethod is 'execute':
636 d.addCallback(self.saveResult)
637 d.addCallback(self.saveResult)
637 d.addCallback(self.finishCommand)
638 d.addCallback(self.finishCommand)
638 d.addErrback(self.abortCommand)
639 d.addErrback(self.abortCommand)
639 else:
640 else:
640 return defer.fail(AttributeError(cmd.remoteMethod))
641 return defer.fail(AttributeError(cmd.remoteMethod))
641
642
642 def _flushQueue(self):
643 def _flushQueue(self):
643 """Pop next command in queue and run it."""
644 """Pop next command in queue and run it."""
644
645
645 if len(self.queued) > 0:
646 if len(self.queued) > 0:
646 self.currentCommand = self.queued.pop(0)
647 self.currentCommand = self.queued.pop(0)
647 self.runCurrentCommand()
648 self.runCurrentCommand()
648
649
649 def saveResult(self, result):
650 def saveResult(self, result):
650 """Put the result in the history."""
651 """Put the result in the history."""
651 self.history[result['number']] = result
652 self.history[result['number']] = result
652 return result
653 return result
653
654
654 def finishCommand(self, result):
655 def finishCommand(self, result):
655 """Finish currrent command."""
656 """Finish currrent command."""
656
657
657 # The order of these commands is absolutely critical.
658 # The order of these commands is absolutely critical.
658 self.currentCommand.handleResult(result)
659 self.currentCommand.handleResult(result)
659 self.currentCommand.finished = True
660 self.currentCommand.finished = True
660 self._flushQueue()
661 self._flushQueue()
661 return result
662 return result
662
663
663 def abortCommand(self, reason):
664 def abortCommand(self, reason):
664 """Abort current command.
665 """Abort current command.
665
666
666 This eats the Failure but first passes it onto the Deferred that the
667 This eats the Failure but first passes it onto the Deferred that the
667 user has.
668 user has.
668
669
669 It also clear out the queue so subsequence commands don't run.
670 It also clear out the queue so subsequence commands don't run.
670 """
671 """
671
672
672 # The order of these 3 commands is absolutely critical. The currentCommand
673 # The order of these 3 commands is absolutely critical. The currentCommand
673 # must first be marked as finished BEFORE the queue is cleared and before
674 # must first be marked as finished BEFORE the queue is cleared and before
674 # the current command is sent the failure.
675 # the current command is sent the failure.
675 # Also, the queue must be cleared BEFORE the current command is sent the Failure
676 # Also, the queue must be cleared BEFORE the current command is sent the Failure
676 # otherwise the errback chain could trigger new commands to be added to the
677 # otherwise the errback chain could trigger new commands to be added to the
677 # queue before we clear it. We should clear ONLY the commands that were in
678 # queue before we clear it. We should clear ONLY the commands that were in
678 # the queue when the error occured.
679 # the queue when the error occured.
679 self.currentCommand.finished = True
680 self.currentCommand.finished = True
680 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
681 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
681 self.clear_queue(msg=s)
682 self.clear_queue(msg=s)
682 self.currentCommand.handleError(reason)
683 self.currentCommand.handleError(reason)
683
684
684 return None
685 return None
685
686
686 #---------------------------------------------------------------------------
687 #---------------------------------------------------------------------------
687 # IEngineCore methods
688 # IEngineCore methods
688 #---------------------------------------------------------------------------
689 #---------------------------------------------------------------------------
689
690
690 @queue
691 @queue
691 def execute(self, lines):
692 def execute(self, lines):
692 pass
693 pass
693
694
694 @queue
695 @queue
695 def push(self, namespace):
696 def push(self, namespace):
696 pass
697 pass
697
698
698 @queue
699 @queue
699 def pull(self, keys):
700 def pull(self, keys):
700 pass
701 pass
701
702
702 @queue
703 @queue
703 def push_function(self, namespace):
704 def push_function(self, namespace):
704 pass
705 pass
705
706
706 @queue
707 @queue
707 def pull_function(self, keys):
708 def pull_function(self, keys):
708 pass
709 pass
709
710
710 def get_result(self, i=None):
711 def get_result(self, i=None):
711 if i is None:
712 if i is None:
712 i = max(self.history.keys()+[None])
713 i = max(self.history.keys()+[None])
713
714
714 cmd = self.history.get(i, None)
715 cmd = self.history.get(i, None)
715 # Uncomment this line to disable chaching of results
716 # Uncomment this line to disable chaching of results
716 #cmd = None
717 #cmd = None
717 if cmd is None:
718 if cmd is None:
718 return self.submitCommand(Command('get_result', i))
719 return self.submitCommand(Command('get_result', i))
719 else:
720 else:
720 return defer.succeed(cmd)
721 return defer.succeed(cmd)
721
722
722 def reset(self):
723 def reset(self):
723 self.clear_queue()
724 self.clear_queue()
724 self.history = {} # reset the cache - I am not sure we should do this
725 self.history = {} # reset the cache - I am not sure we should do this
725 return self.submitCommand(Command('reset'))
726 return self.submitCommand(Command('reset'))
726
727
727 def kill(self):
728 def kill(self):
728 self.clear_queue()
729 self.clear_queue()
729 return self.submitCommand(Command('kill'))
730 return self.submitCommand(Command('kill'))
730
731
731 @queue
732 @queue
732 def keys(self):
733 def keys(self):
733 pass
734 pass
734
735
735 #---------------------------------------------------------------------------
736 #---------------------------------------------------------------------------
736 # IEngineSerialized methods
737 # IEngineSerialized methods
737 #---------------------------------------------------------------------------
738 #---------------------------------------------------------------------------
738
739
739 @queue
740 @queue
740 def push_serialized(self, namespace):
741 def push_serialized(self, namespace):
741 pass
742 pass
742
743
743 @queue
744 @queue
744 def pull_serialized(self, keys):
745 def pull_serialized(self, keys):
745 pass
746 pass
746
747
747 #---------------------------------------------------------------------------
748 #---------------------------------------------------------------------------
748 # IEngineProperties methods
749 # IEngineProperties methods
749 #---------------------------------------------------------------------------
750 #---------------------------------------------------------------------------
750
751
751 @queue
752 @queue
752 def set_properties(self, namespace):
753 def set_properties(self, namespace):
753 pass
754 pass
754
755
755 @queue
756 @queue
756 def get_properties(self, keys=None):
757 def get_properties(self, keys=None):
757 pass
758 pass
758
759
759 @queue
760 @queue
760 def del_properties(self, keys):
761 def del_properties(self, keys):
761 pass
762 pass
762
763
763 @queue
764 @queue
764 def has_properties(self, keys):
765 def has_properties(self, keys):
765 pass
766 pass
766
767
767 @queue
768 @queue
768 def clear_properties(self):
769 def clear_properties(self):
769 pass
770 pass
770
771
771 #---------------------------------------------------------------------------
772 #---------------------------------------------------------------------------
772 # IQueuedEngine methods
773 # IQueuedEngine methods
773 #---------------------------------------------------------------------------
774 #---------------------------------------------------------------------------
774
775
775 def clear_queue(self, msg=''):
776 def clear_queue(self, msg=''):
776 """Clear the queue, but doesn't cancel the currently running commmand."""
777 """Clear the queue, but doesn't cancel the currently running commmand."""
777
778
778 for cmd in self.queued:
779 for cmd in self.queued:
779 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
780 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
780 self.queued = []
781 self.queued = []
781 return defer.succeed(None)
782 return defer.succeed(None)
782
783
783 def queue_status(self):
784 def queue_status(self):
784 if self.currentCommand is not None:
785 if self.currentCommand is not None:
785 if self.currentCommand.finished:
786 if self.currentCommand.finished:
786 pending = repr(None)
787 pending = repr(None)
787 else:
788 else:
788 pending = repr(self.currentCommand)
789 pending = repr(self.currentCommand)
789 else:
790 else:
790 pending = repr(None)
791 pending = repr(None)
791 dikt = {'queue':map(repr,self.queued), 'pending':pending}
792 dikt = {'queue':map(repr,self.queued), 'pending':pending}
792 return defer.succeed(dikt)
793 return defer.succeed(dikt)
793
794
794 def register_failure_observer(self, obs):
795 def register_failure_observer(self, obs):
795 self.failureObservers.append(obs)
796 self.failureObservers.append(obs)
796
797
797 def unregister_failure_observer(self, obs):
798 def unregister_failure_observer(self, obs):
798 self.failureObservers.remove(obs)
799 self.failureObservers.remove(obs)
799
800
800
801
801 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
802 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
802 # IEngineQueued.
803 # IEngineQueued.
803 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
804 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
804
805
805
806
806 class Command(object):
807 class Command(object):
807 """A command object that encapslates queued commands.
808 """A command object that encapslates queued commands.
808
809
809 This class basically keeps track of a command that has been queued
810 This class basically keeps track of a command that has been queued
810 in a QueuedEngine. It manages the deferreds and hold the method to be called
811 in a QueuedEngine. It manages the deferreds and hold the method to be called
811 and the arguments to that method.
812 and the arguments to that method.
812 """
813 """
813
814
814
815
815 def __init__(self, remoteMethod, *args, **kwargs):
816 def __init__(self, remoteMethod, *args, **kwargs):
816 """Build a new Command object."""
817 """Build a new Command object."""
817
818
818 self.remoteMethod = remoteMethod
819 self.remoteMethod = remoteMethod
819 self.args = args
820 self.args = args
820 self.kwargs = kwargs
821 self.kwargs = kwargs
821 self.finished = False
822 self.finished = False
822
823
823 def setDeferred(self, d):
824 def setDeferred(self, d):
824 """Sets the deferred attribute of the Command."""
825 """Sets the deferred attribute of the Command."""
825
826
826 self.deferred = d
827 self.deferred = d
827
828
828 def __repr__(self):
829 def __repr__(self):
829 if not self.args:
830 if not self.args:
830 args = ''
831 args = ''
831 else:
832 else:
832 args = str(self.args)[1:-2] #cut off (...,)
833 args = str(self.args)[1:-2] #cut off (...,)
833 for k,v in self.kwargs.iteritems():
834 for k,v in self.kwargs.iteritems():
834 if args:
835 if args:
835 args += ', '
836 args += ', '
836 args += '%s=%r' %(k,v)
837 args += '%s=%r' %(k,v)
837 return "%s(%s)" %(self.remoteMethod, args)
838 return "%s(%s)" %(self.remoteMethod, args)
838
839
839 def handleResult(self, result):
840 def handleResult(self, result):
840 """When the result is ready, relay it to self.deferred."""
841 """When the result is ready, relay it to self.deferred."""
841
842
842 self.deferred.callback(result)
843 self.deferred.callback(result)
843
844
844 def handleError(self, reason):
845 def handleError(self, reason):
845 """When an error has occured, relay it to self.deferred."""
846 """When an error has occured, relay it to self.deferred."""
846
847
847 self.deferred.errback(reason)
848 self.deferred.errback(reason)
848
849
849 class ThreadedEngineService(EngineService):
850 class ThreadedEngineService(EngineService):
850 """An EngineService subclass that defers execute commands to a separate
851 """An EngineService subclass that defers execute commands to a separate
851 thread.
852 thread.
852
853
853 ThreadedEngineService uses twisted.internet.threads.deferToThread to
854 ThreadedEngineService uses twisted.internet.threads.deferToThread to
854 defer execute requests to a separate thread. GUI frontends may want to
855 defer execute requests to a separate thread. GUI frontends may want to
855 use ThreadedEngineService as the engine in an
856 use ThreadedEngineService as the engine in an
856 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
857 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
857 block execution from blocking the GUI thread.
858 block execution from blocking the GUI thread.
858 """
859 """
859
860
860 zi.implements(IEngineBase)
861 zi.implements(IEngineBase)
861
862
862 def __init__(self, shellClass=Interpreter, mpi=None):
863 def __init__(self, shellClass=Interpreter, mpi=None):
863 EngineService.__init__(self, shellClass, mpi)
864 EngineService.__init__(self, shellClass, mpi)
864
865
866 def wrapped_execute(self, msg, lines):
867 """Wrap self.shell.execute to add extra information to tracebacks"""
868
869 try:
870 result = self.shell.execute(lines)
871 except Exception,e:
872 # This gives the following:
873 # et=exception class
874 # ev=exception class instance
875 # tb=traceback object
876 et,ev,tb = sys.exc_info()
877 # This call adds attributes to the exception value
878 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
879 # Add another attribute
880
881 # Create a new exception with the new attributes
882 e = et(ev._ipython_traceback_text)
883 e._ipython_engine_info = msg
884
885 # Re-raise
886 raise e
887
888 return result
889
865
890
866 def execute(self, lines):
891 def execute(self, lines):
867 # Only import this if we are going to use this class
892 # Only import this if we are going to use this class
868 from twisted.internet import threads
893 from twisted.internet import threads
869
894
870 msg = {'engineid':self.id,
895 msg = {'engineid':self.id,
871 'method':'execute',
896 'method':'execute',
872 'args':[lines]}
897 'args':[lines]}
873
898
874 d = threads.deferToThread(self.shell.execute, lines)
899 d = threads.deferToThread(self.wrapped_execute, msg, lines)
875 d.addCallback(self.addIDToResult)
900 d.addCallback(self.addIDToResult)
876 return d
901 return d
General Comments 0
You need to be logged in to leave comments. Login now