##// END OF EJS Templates
Messing with newlines.
Brian Granger -
Show More
@@ -1,903 +1,904 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
2 # -*- test-case-name: IPython.kernel.tests.test_engineservice -*-
3
3
4 """A Twisted Service Representation of the IPython core.
4 """A Twisted Service Representation of the IPython core.
5
5
6 The IPython Core exposed to the network is called the Engine. Its
6 The IPython Core exposed to the network is called the Engine. Its
7 representation in Twisted in the EngineService. Interfaces and adapters
7 representation in Twisted in the EngineService. Interfaces and adapters
8 are used to abstract out the details of the actual network protocol used.
8 are used to abstract out the details of the actual network protocol used.
9 The EngineService is an Engine that knows nothing about the actual protocol
9 The EngineService is an Engine that knows nothing about the actual protocol
10 used.
10 used.
11
11
12 The EngineService is exposed with various network protocols in modules like:
12 The EngineService is exposed with various network protocols in modules like:
13
13
14 enginepb.py
14 enginepb.py
15 enginevanilla.py
15 enginevanilla.py
16
16
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
17 As of 12/12/06 the classes in this module have been simplified greatly. It was
18 felt that we had over-engineered things. To improve the maintainability of the
18 felt that we had over-engineered things. To improve the maintainability of the
19 code we have taken out the ICompleteEngine interface and the completeEngine
19 code we have taken out the ICompleteEngine interface and the completeEngine
20 method that automatically added methods to engines.
20 method that automatically added methods to engines.
21
21
22 """
22 """
23
23
24 __docformat__ = "restructuredtext en"
24 __docformat__ = "restructuredtext en"
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Copyright (C) 2008 The IPython Development Team
27 # Copyright (C) 2008 The IPython Development Team
28 #
28 #
29 # Distributed under the terms of the BSD License. The full license is in
29 # Distributed under the terms of the BSD License. The full license is in
30 # the file COPYING, distributed as part of this software.
30 # the file COPYING, distributed as part of this software.
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 #-------------------------------------------------------------------------------
33 #-------------------------------------------------------------------------------
34 # Imports
34 # Imports
35 #-------------------------------------------------------------------------------
35 #-------------------------------------------------------------------------------
36
36
37 import os, sys, copy
37 import os, sys, copy
38 import cPickle as pickle
38 import cPickle as pickle
39 from new import instancemethod
39 from new import instancemethod
40
40
41 from twisted.application import service
41 from twisted.application import service
42 from twisted.internet import defer, reactor
42 from twisted.internet import defer, reactor
43 from twisted.python import log, failure, components
43 from twisted.python import log, failure, components
44 import zope.interface as zi
44 import zope.interface as zi
45
45
46 from IPython.kernel.core.interpreter import Interpreter
46 from IPython.kernel.core.interpreter import Interpreter
47 from IPython.kernel import newserialized, error, util
47 from IPython.kernel import newserialized, error, util
48 from IPython.kernel.util import printer
48 from IPython.kernel.util import printer
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
49 from IPython.kernel.twistedutil import gatherBoth, DeferredList
50 from IPython.kernel import codeutil
50 from IPython.kernel import codeutil
51
51
52
52
53 #-------------------------------------------------------------------------------
53 #-------------------------------------------------------------------------------
54 # Interface specification for the Engine
54 # Interface specification for the Engine
55 #-------------------------------------------------------------------------------
55 #-------------------------------------------------------------------------------
56
56
57 class IEngineCore(zi.Interface):
57 class IEngineCore(zi.Interface):
58 """The minimal required interface for the IPython Engine.
58 """The minimal required interface for the IPython Engine.
59
59
60 This interface provides a formal specification of the IPython core.
60 This interface provides a formal specification of the IPython core.
61 All these methods should return deferreds regardless of what side of a
61 All these methods should return deferreds regardless of what side of a
62 network connection they are on.
62 network connection they are on.
63
63
64 In general, this class simply wraps a shell class and wraps its return
64 In general, this class simply wraps a shell class and wraps its return
65 values as Deferred objects. If the underlying shell class method raises
65 values as Deferred objects. If the underlying shell class method raises
66 an exception, this class should convert it to a twisted.failure.Failure
66 an exception, this class should convert it to a twisted.failure.Failure
67 that will be propagated along the Deferred's errback chain.
67 that will be propagated along the Deferred's errback chain.
68
68
69 In addition, Failures are aggressive. By this, we mean that if a method
69 In addition, Failures are aggressive. By this, we mean that if a method
70 is performing multiple actions (like pulling multiple object) if any
70 is performing multiple actions (like pulling multiple object) if any
71 single one fails, the entire method will fail with that Failure. It is
71 single one fails, the entire method will fail with that Failure. It is
72 all or nothing.
72 all or nothing.
73 """
73 """
74
74
75 id = zi.interface.Attribute("the id of the Engine object")
75 id = zi.interface.Attribute("the id of the Engine object")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
76 properties = zi.interface.Attribute("A dict of properties of the Engine")
77
77
78 def execute(lines):
78 def execute(lines):
79 """Execute lines of Python code.
79 """Execute lines of Python code.
80
80
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
81 Returns a dictionary with keys (id, number, stdin, stdout, stderr)
82 upon success.
82 upon success.
83
83
84 Returns a failure object if the execution of lines raises an exception.
84 Returns a failure object if the execution of lines raises an exception.
85 """
85 """
86
86
87 def push(namespace):
87 def push(namespace):
88 """Push dict namespace into the user's namespace.
88 """Push dict namespace into the user's namespace.
89
89
90 Returns a deferred to None or a failure.
90 Returns a deferred to None or a failure.
91 """
91 """
92
92
93 def pull(keys):
93 def pull(keys):
94 """Pulls values out of the user's namespace by keys.
94 """Pulls values out of the user's namespace by keys.
95
95
96 Returns a deferred to a tuple objects or a single object.
96 Returns a deferred to a tuple objects or a single object.
97
97
98 Raises NameError if any one of objects doess not exist.
98 Raises NameError if any one of objects doess not exist.
99 """
99 """
100
100
101 def push_function(namespace):
101 def push_function(namespace):
102 """Push a dict of key, function pairs into the user's namespace.
102 """Push a dict of key, function pairs into the user's namespace.
103
103
104 Returns a deferred to None or a failure."""
104 Returns a deferred to None or a failure."""
105
105
106 def pull_function(keys):
106 def pull_function(keys):
107 """Pulls functions out of the user's namespace by keys.
107 """Pulls functions out of the user's namespace by keys.
108
108
109 Returns a deferred to a tuple of functions or a single function.
109 Returns a deferred to a tuple of functions or a single function.
110
110
111 Raises NameError if any one of the functions does not exist.
111 Raises NameError if any one of the functions does not exist.
112 """
112 """
113
113
114 def get_result(i=None):
114 def get_result(i=None):
115 """Get the stdin/stdout/stderr of command i.
115 """Get the stdin/stdout/stderr of command i.
116
116
117 Returns a deferred to a dict with keys
117 Returns a deferred to a dict with keys
118 (id, number, stdin, stdout, stderr).
118 (id, number, stdin, stdout, stderr).
119
119
120 Raises IndexError if command i does not exist.
120 Raises IndexError if command i does not exist.
121 Raises TypeError if i in not an int.
121 Raises TypeError if i in not an int.
122 """
122 """
123
123
124 def reset():
124 def reset():
125 """Reset the shell.
125 """Reset the shell.
126
126
127 This clears the users namespace. Won't cause modules to be
127 This clears the users namespace. Won't cause modules to be
128 reloaded. Should also re-initialize certain variables like id.
128 reloaded. Should also re-initialize certain variables like id.
129 """
129 """
130
130
131 def kill():
131 def kill():
132 """Kill the engine by stopping the reactor."""
132 """Kill the engine by stopping the reactor."""
133
133
134 def keys():
134 def keys():
135 """Return the top level variables in the users namspace.
135 """Return the top level variables in the users namspace.
136
136
137 Returns a deferred to a dict."""
137 Returns a deferred to a dict."""
138
138
139
139
140 class IEngineSerialized(zi.Interface):
140 class IEngineSerialized(zi.Interface):
141 """Push/Pull methods that take Serialized objects.
141 """Push/Pull methods that take Serialized objects.
142
142
143 All methods should return deferreds.
143 All methods should return deferreds.
144 """
144 """
145
145
146 def push_serialized(namespace):
146 def push_serialized(namespace):
147 """Push a dict of keys and Serialized objects into the user's namespace."""
147 """Push a dict of keys and Serialized objects into the user's namespace."""
148
148
149 def pull_serialized(keys):
149 def pull_serialized(keys):
150 """Pull objects by key from the user's namespace as Serialized.
150 """Pull objects by key from the user's namespace as Serialized.
151
151
152 Returns a list of or one Serialized.
152 Returns a list of or one Serialized.
153
153
154 Raises NameError is any one of the objects does not exist.
154 Raises NameError is any one of the objects does not exist.
155 """
155 """
156
156
157
157
158 class IEngineProperties(zi.Interface):
158 class IEngineProperties(zi.Interface):
159 """Methods for access to the properties object of an Engine"""
159 """Methods for access to the properties object of an Engine"""
160
160
161 properties = zi.Attribute("A StrictDict object, containing the properties")
161 properties = zi.Attribute("A StrictDict object, containing the properties")
162
162
163 def set_properties(properties):
163 def set_properties(properties):
164 """set properties by key and value"""
164 """set properties by key and value"""
165
165
166 def get_properties(keys=None):
166 def get_properties(keys=None):
167 """get a list of properties by `keys`, if no keys specified, get all"""
167 """get a list of properties by `keys`, if no keys specified, get all"""
168
168
169 def del_properties(keys):
169 def del_properties(keys):
170 """delete properties by `keys`"""
170 """delete properties by `keys`"""
171
171
172 def has_properties(keys):
172 def has_properties(keys):
173 """get a list of bool values for whether `properties` has `keys`"""
173 """get a list of bool values for whether `properties` has `keys`"""
174
174
175 def clear_properties():
175 def clear_properties():
176 """clear the properties dict"""
176 """clear the properties dict"""
177
177
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
178 class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
179 """The basic engine interface that EngineService will implement.
179 """The basic engine interface that EngineService will implement.
180
180
181 This exists so it is easy to specify adapters that adapt to and from the
181 This exists so it is easy to specify adapters that adapt to and from the
182 API that the basic EngineService implements.
182 API that the basic EngineService implements.
183 """
183 """
184 pass
184 pass
185
185
186 class IEngineQueued(IEngineBase):
186 class IEngineQueued(IEngineBase):
187 """Interface for adding a queue to an IEngineBase.
187 """Interface for adding a queue to an IEngineBase.
188
188
189 This interface extends the IEngineBase interface to add methods for managing
189 This interface extends the IEngineBase interface to add methods for managing
190 the engine's queue. The implicit details of this interface are that the
190 the engine's queue. The implicit details of this interface are that the
191 execution of all methods declared in IEngineBase should appropriately be
191 execution of all methods declared in IEngineBase should appropriately be
192 put through a queue before execution.
192 put through a queue before execution.
193
193
194 All methods should return deferreds.
194 All methods should return deferreds.
195 """
195 """
196
196
197 def clear_queue():
197 def clear_queue():
198 """Clear the queue."""
198 """Clear the queue."""
199
199
200 def queue_status():
200 def queue_status():
201 """Get the queued and pending commands in the queue."""
201 """Get the queued and pending commands in the queue."""
202
202
203 def register_failure_observer(obs):
203 def register_failure_observer(obs):
204 """Register an observer of pending Failures.
204 """Register an observer of pending Failures.
205
205
206 The observer must implement IFailureObserver.
206 The observer must implement IFailureObserver.
207 """
207 """
208
208
209 def unregister_failure_observer(obs):
209 def unregister_failure_observer(obs):
210 """Unregister an observer of pending Failures."""
210 """Unregister an observer of pending Failures."""
211
211
212
212
213 class IEngineThreaded(zi.Interface):
213 class IEngineThreaded(zi.Interface):
214 """A place holder for threaded commands.
214 """A place holder for threaded commands.
215
215
216 All methods should return deferreds.
216 All methods should return deferreds.
217 """
217 """
218 pass
218 pass
219
219
220
220
221 #-------------------------------------------------------------------------------
221 #-------------------------------------------------------------------------------
222 # Functions and classes to implement the EngineService
222 # Functions and classes to implement the EngineService
223 #-------------------------------------------------------------------------------
223 #-------------------------------------------------------------------------------
224
224
225
225
226 class StrictDict(dict):
226 class StrictDict(dict):
227 """This is a strict copying dictionary for use as the interface to the
227 """This is a strict copying dictionary for use as the interface to the
228 properties of an Engine.
228 properties of an Engine.
229
229
230 :IMPORTANT:
230 :IMPORTANT:
231 This object copies the values you set to it, and returns copies to you
231 This object copies the values you set to it, and returns copies to you
232 when you request them. The only way to change properties os explicitly
232 when you request them. The only way to change properties os explicitly
233 through the setitem and getitem of the dictionary interface.
233 through the setitem and getitem of the dictionary interface.
234
234
235 Example:
235 Example:
236 >>> e = get_engine(id)
236 >>> e = get_engine(id)
237 >>> L = [1,2,3]
237 >>> L = [1,2,3]
238 >>> e.properties['L'] = L
238 >>> e.properties['L'] = L
239 >>> L == e.properties['L']
239 >>> L == e.properties['L']
240 True
240 True
241 >>> L.append(99)
241 >>> L.append(99)
242 >>> L == e.properties['L']
242 >>> L == e.properties['L']
243 False
243 False
244
244
245 Note that getitem copies, so calls to methods of objects do not affect
245 Note that getitem copies, so calls to methods of objects do not affect
246 the properties, as seen here:
246 the properties, as seen here:
247
247
248 >>> e.properties[1] = range(2)
248 >>> e.properties[1] = range(2)
249 >>> print e.properties[1]
249 >>> print e.properties[1]
250 [0, 1]
250 [0, 1]
251 >>> e.properties[1].append(2)
251 >>> e.properties[1].append(2)
252 >>> print e.properties[1]
252 >>> print e.properties[1]
253 [0, 1]
253 [0, 1]
254 """
254 """
255 def __init__(self, *args, **kwargs):
255 def __init__(self, *args, **kwargs):
256 dict.__init__(self, *args, **kwargs)
256 dict.__init__(self, *args, **kwargs)
257 self.modified = True
257 self.modified = True
258
258
259 def __getitem__(self, key):
259 def __getitem__(self, key):
260 return copy.deepcopy(dict.__getitem__(self, key))
260 return copy.deepcopy(dict.__getitem__(self, key))
261
261
262 def __setitem__(self, key, value):
262 def __setitem__(self, key, value):
263 # check if this entry is valid for transport around the network
263 # check if this entry is valid for transport around the network
264 # and copying
264 # and copying
265 try:
265 try:
266 pickle.dumps(key, 2)
266 pickle.dumps(key, 2)
267 pickle.dumps(value, 2)
267 pickle.dumps(value, 2)
268 newvalue = copy.deepcopy(value)
268 newvalue = copy.deepcopy(value)
269 except:
269 except:
270 raise error.InvalidProperty(value)
270 raise error.InvalidProperty(value)
271 dict.__setitem__(self, key, newvalue)
271 dict.__setitem__(self, key, newvalue)
272 self.modified = True
272 self.modified = True
273
273
274 def __delitem__(self, key):
274 def __delitem__(self, key):
275 dict.__delitem__(self, key)
275 dict.__delitem__(self, key)
276 self.modified = True
276 self.modified = True
277
277
278 def update(self, dikt):
278 def update(self, dikt):
279 for k,v in dikt.iteritems():
279 for k,v in dikt.iteritems():
280 self[k] = v
280 self[k] = v
281
281
282 def pop(self, key):
282 def pop(self, key):
283 self.modified = True
283 self.modified = True
284 return dict.pop(self, key)
284 return dict.pop(self, key)
285
285
286 def popitem(self):
286 def popitem(self):
287 self.modified = True
287 self.modified = True
288 return dict.popitem(self)
288 return dict.popitem(self)
289
289
290 def clear(self):
290 def clear(self):
291 self.modified = True
291 self.modified = True
292 dict.clear(self)
292 dict.clear(self)
293
293
294 def subDict(self, *keys):
294 def subDict(self, *keys):
295 d = {}
295 d = {}
296 for key in keys:
296 for key in keys:
297 d[key] = self[key]
297 d[key] = self[key]
298 return d
298 return d
299
299
300
300
301
301
302 class EngineAPI(object):
302 class EngineAPI(object):
303 """This is the object through which the user can edit the `properties`
303 """This is the object through which the user can edit the `properties`
304 attribute of an Engine.
304 attribute of an Engine.
305 The Engine Properties object copies all object in and out of itself.
305 The Engine Properties object copies all object in and out of itself.
306 See the EngineProperties object for details.
306 See the EngineProperties object for details.
307 """
307 """
308 _fix=False
308 _fix=False
309 def __init__(self, id):
309 def __init__(self, id):
310 self.id = id
310 self.id = id
311 self.properties = StrictDict()
311 self.properties = StrictDict()
312 self._fix=True
312 self._fix=True
313
313
314 def __setattr__(self, k,v):
314 def __setattr__(self, k,v):
315 if self._fix:
315 if self._fix:
316 raise error.KernelError("I am protected!")
316 raise error.KernelError("I am protected!")
317 else:
317 else:
318 object.__setattr__(self, k, v)
318 object.__setattr__(self, k, v)
319
319
320 def __delattr__(self, key):
320 def __delattr__(self, key):
321 raise error.KernelError("I am protected!")
321 raise error.KernelError("I am protected!")
322
322
323
323
324 _apiDict = {}
324 _apiDict = {}
325
325
326 def get_engine(id):
326 def get_engine(id):
327 """Get the Engine API object, whcih currently just provides the properties
327 """Get the Engine API object, whcih currently just provides the properties
328 object, by ID"""
328 object, by ID"""
329 global _apiDict
329 global _apiDict
330 if not _apiDict.get(id):
330 if not _apiDict.get(id):
331 _apiDict[id] = EngineAPI(id)
331 _apiDict[id] = EngineAPI(id)
332 return _apiDict[id]
332 return _apiDict[id]
333
333
334 def drop_engine(id):
334 def drop_engine(id):
335 """remove an engine"""
335 """remove an engine"""
336 global _apiDict
336 global _apiDict
337 if _apiDict.has_key(id):
337 if _apiDict.has_key(id):
338 del _apiDict[id]
338 del _apiDict[id]
339
339
340 class EngineService(object, service.Service):
340 class EngineService(object, service.Service):
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
341 """Adapt a IPython shell into a IEngine implementing Twisted Service."""
342
342
343 zi.implements(IEngineBase)
343 zi.implements(IEngineBase)
344 name = 'EngineService'
344 name = 'EngineService'
345
345
346 def __init__(self, shellClass=Interpreter, mpi=None):
346 def __init__(self, shellClass=Interpreter, mpi=None):
347 """Create an EngineService.
347 """Create an EngineService.
348
348
349 shellClass: something that implements IInterpreter or core1
349 shellClass: something that implements IInterpreter or core1
350 mpi: an mpi module that has rank and size attributes
350 mpi: an mpi module that has rank and size attributes
351 """
351 """
352 self.shellClass = shellClass
352 self.shellClass = shellClass
353 self.shell = self.shellClass()
353 self.shell = self.shellClass()
354 self.mpi = mpi
354 self.mpi = mpi
355 self.id = None
355 self.id = None
356 self.properties = get_engine(self.id).properties
356 self.properties = get_engine(self.id).properties
357 if self.mpi is not None:
357 if self.mpi is not None:
358 log.msg("MPI started with rank = %i and size = %i" %
358 log.msg("MPI started with rank = %i and size = %i" %
359 (self.mpi.rank, self.mpi.size))
359 (self.mpi.rank, self.mpi.size))
360 self.id = self.mpi.rank
360 self.id = self.mpi.rank
361 self._seedNamespace()
361 self._seedNamespace()
362
362
363 # Make id a property so that the shell can get the updated id
363 # Make id a property so that the shell can get the updated id
364
364
365 def _setID(self, id):
365 def _setID(self, id):
366 self._id = id
366 self._id = id
367 self.properties = get_engine(id).properties
367 self.properties = get_engine(id).properties
368 self.shell.push({'id': id})
368 self.shell.push({'id': id})
369
369
370 def _getID(self):
370 def _getID(self):
371 return self._id
371 return self._id
372
372
373 id = property(_getID, _setID)
373 id = property(_getID, _setID)
374
374
375 def _seedNamespace(self):
375 def _seedNamespace(self):
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
376 self.shell.push({'mpi': self.mpi, 'id' : self.id})
377
377
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
378 def executeAndRaise(self, msg, callable, *args, **kwargs):
379 """Call a method of self.shell and wrap any exception."""
379 """Call a method of self.shell and wrap any exception."""
380 d = defer.Deferred()
380 d = defer.Deferred()
381 try:
381 try:
382 result = callable(*args, **kwargs)
382 result = callable(*args, **kwargs)
383 except:
383 except:
384 # This gives the following:
384 # This gives the following:
385 # et=exception class
385 # et=exception class
386 # ev=exception class instance
386 # ev=exception class instance
387 # tb=traceback object
387 # tb=traceback object
388 et,ev,tb = sys.exc_info()
388 et,ev,tb = sys.exc_info()
389 # This call adds attributes to the exception value
389 # This call adds attributes to the exception value
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
390 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
391 # Add another attribute
391 # Add another attribute
392 ev._ipython_engine_info = msg
392 ev._ipython_engine_info = msg
393 f = failure.Failure(ev,et,None)
393 f = failure.Failure(ev,et,None)
394 d.errback(f)
394 d.errback(f)
395 else:
395 else:
396 d.callback(result)
396 d.callback(result)
397
397
398 return d
398 return d
399
399
400
400
401 # The IEngine methods. See the interface for documentation.
401 # The IEngine methods. See the interface for documentation.
402
402
403 @profile
403 def execute(self, lines):
404 def execute(self, lines):
404 msg = {'engineid':self.id,
405 msg = {'engineid':self.id,
405 'method':'execute',
406 'method':'execute',
406 'args':[lines]}
407 'args':[lines]}
407 d = self.executeAndRaise(msg, self.shell.execute, lines)
408 d = self.executeAndRaise(msg, self.shell.execute, lines)
408 d.addCallback(self.addIDToResult)
409 d.addCallback(self.addIDToResult)
409 return d
410 return d
410
411
411 def addIDToResult(self, result):
412 def addIDToResult(self, result):
412 result['id'] = self.id
413 result['id'] = self.id
413 return result
414 return result
414
415
415 def push(self, namespace):
416 def push(self, namespace):
416 msg = {'engineid':self.id,
417 msg = {'engineid':self.id,
417 'method':'push',
418 'method':'push',
418 'args':[repr(namespace.keys())]}
419 'args':[repr(namespace.keys())]}
419 d = self.executeAndRaise(msg, self.shell.push, namespace)
420 d = self.executeAndRaise(msg, self.shell.push, namespace)
420 return d
421 return d
421
422
422 def pull(self, keys):
423 def pull(self, keys):
423 msg = {'engineid':self.id,
424 msg = {'engineid':self.id,
424 'method':'pull',
425 'method':'pull',
425 'args':[repr(keys)]}
426 'args':[repr(keys)]}
426 d = self.executeAndRaise(msg, self.shell.pull, keys)
427 d = self.executeAndRaise(msg, self.shell.pull, keys)
427 return d
428 return d
428
429
429 def push_function(self, namespace):
430 def push_function(self, namespace):
430 msg = {'engineid':self.id,
431 msg = {'engineid':self.id,
431 'method':'push_function',
432 'method':'push_function',
432 'args':[repr(namespace.keys())]}
433 'args':[repr(namespace.keys())]}
433 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
434 d = self.executeAndRaise(msg, self.shell.push_function, namespace)
434 return d
435 return d
435
436
436 def pull_function(self, keys):
437 def pull_function(self, keys):
437 msg = {'engineid':self.id,
438 msg = {'engineid':self.id,
438 'method':'pull_function',
439 'method':'pull_function',
439 'args':[repr(keys)]}
440 'args':[repr(keys)]}
440 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
441 d = self.executeAndRaise(msg, self.shell.pull_function, keys)
441 return d
442 return d
442
443
443 def get_result(self, i=None):
444 def get_result(self, i=None):
444 msg = {'engineid':self.id,
445 msg = {'engineid':self.id,
445 'method':'get_result',
446 'method':'get_result',
446 'args':[repr(i)]}
447 'args':[repr(i)]}
447 d = self.executeAndRaise(msg, self.shell.getCommand, i)
448 d = self.executeAndRaise(msg, self.shell.getCommand, i)
448 d.addCallback(self.addIDToResult)
449 d.addCallback(self.addIDToResult)
449 return d
450 return d
450
451
451 def reset(self):
452 def reset(self):
452 msg = {'engineid':self.id,
453 msg = {'engineid':self.id,
453 'method':'reset',
454 'method':'reset',
454 'args':[]}
455 'args':[]}
455 del self.shell
456 del self.shell
456 self.shell = self.shellClass()
457 self.shell = self.shellClass()
457 self.properties.clear()
458 self.properties.clear()
458 d = self.executeAndRaise(msg, self._seedNamespace)
459 d = self.executeAndRaise(msg, self._seedNamespace)
459 return d
460 return d
460
461
461 def kill(self):
462 def kill(self):
462 drop_engine(self.id)
463 drop_engine(self.id)
463 try:
464 try:
464 reactor.stop()
465 reactor.stop()
465 except RuntimeError:
466 except RuntimeError:
466 log.msg('The reactor was not running apparently.')
467 log.msg('The reactor was not running apparently.')
467 return defer.fail()
468 return defer.fail()
468 else:
469 else:
469 return defer.succeed(None)
470 return defer.succeed(None)
470
471
471 def keys(self):
472 def keys(self):
472 """Return a list of variables names in the users top level namespace.
473 """Return a list of variables names in the users top level namespace.
473
474
474 This used to return a dict of all the keys/repr(values) in the
475 This used to return a dict of all the keys/repr(values) in the
475 user's namespace. This was too much info for the ControllerService
476 user's namespace. This was too much info for the ControllerService
476 to handle so it is now just a list of keys.
477 to handle so it is now just a list of keys.
477 """
478 """
478
479
479 remotes = []
480 remotes = []
480 for k in self.shell.user_ns.iterkeys():
481 for k in self.shell.user_ns.iterkeys():
481 if k not in ['__name__', '_ih', '_oh', '__builtins__',
482 if k not in ['__name__', '_ih', '_oh', '__builtins__',
482 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
483 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']:
483 remotes.append(k)
484 remotes.append(k)
484 return defer.succeed(remotes)
485 return defer.succeed(remotes)
485
486
486 def set_properties(self, properties):
487 def set_properties(self, properties):
487 msg = {'engineid':self.id,
488 msg = {'engineid':self.id,
488 'method':'set_properties',
489 'method':'set_properties',
489 'args':[repr(properties.keys())]}
490 'args':[repr(properties.keys())]}
490 return self.executeAndRaise(msg, self.properties.update, properties)
491 return self.executeAndRaise(msg, self.properties.update, properties)
491
492
492 def get_properties(self, keys=None):
493 def get_properties(self, keys=None):
493 msg = {'engineid':self.id,
494 msg = {'engineid':self.id,
494 'method':'get_properties',
495 'method':'get_properties',
495 'args':[repr(keys)]}
496 'args':[repr(keys)]}
496 if keys is None:
497 if keys is None:
497 keys = self.properties.keys()
498 keys = self.properties.keys()
498 return self.executeAndRaise(msg, self.properties.subDict, *keys)
499 return self.executeAndRaise(msg, self.properties.subDict, *keys)
499
500
500 def _doDel(self, keys):
501 def _doDel(self, keys):
501 for key in keys:
502 for key in keys:
502 del self.properties[key]
503 del self.properties[key]
503
504
504 def del_properties(self, keys):
505 def del_properties(self, keys):
505 msg = {'engineid':self.id,
506 msg = {'engineid':self.id,
506 'method':'del_properties',
507 'method':'del_properties',
507 'args':[repr(keys)]}
508 'args':[repr(keys)]}
508 return self.executeAndRaise(msg, self._doDel, keys)
509 return self.executeAndRaise(msg, self._doDel, keys)
509
510
510 def _doHas(self, keys):
511 def _doHas(self, keys):
511 return [self.properties.has_key(key) for key in keys]
512 return [self.properties.has_key(key) for key in keys]
512
513
513 def has_properties(self, keys):
514 def has_properties(self, keys):
514 msg = {'engineid':self.id,
515 msg = {'engineid':self.id,
515 'method':'has_properties',
516 'method':'has_properties',
516 'args':[repr(keys)]}
517 'args':[repr(keys)]}
517 return self.executeAndRaise(msg, self._doHas, keys)
518 return self.executeAndRaise(msg, self._doHas, keys)
518
519
519 def clear_properties(self):
520 def clear_properties(self):
520 msg = {'engineid':self.id,
521 msg = {'engineid':self.id,
521 'method':'clear_properties',
522 'method':'clear_properties',
522 'args':[]}
523 'args':[]}
523 return self.executeAndRaise(msg, self.properties.clear)
524 return self.executeAndRaise(msg, self.properties.clear)
524
525
525 def push_serialized(self, sNamespace):
526 def push_serialized(self, sNamespace):
526 msg = {'engineid':self.id,
527 msg = {'engineid':self.id,
527 'method':'push_serialized',
528 'method':'push_serialized',
528 'args':[repr(sNamespace.keys())]}
529 'args':[repr(sNamespace.keys())]}
529 ns = {}
530 ns = {}
530 for k,v in sNamespace.iteritems():
531 for k,v in sNamespace.iteritems():
531 try:
532 try:
532 unserialized = newserialized.IUnSerialized(v)
533 unserialized = newserialized.IUnSerialized(v)
533 ns[k] = unserialized.getObject()
534 ns[k] = unserialized.getObject()
534 except:
535 except:
535 return defer.fail()
536 return defer.fail()
536 return self.executeAndRaise(msg, self.shell.push, ns)
537 return self.executeAndRaise(msg, self.shell.push, ns)
537
538
538 def pull_serialized(self, keys):
539 def pull_serialized(self, keys):
539 msg = {'engineid':self.id,
540 msg = {'engineid':self.id,
540 'method':'pull_serialized',
541 'method':'pull_serialized',
541 'args':[repr(keys)]}
542 'args':[repr(keys)]}
542 if isinstance(keys, str):
543 if isinstance(keys, str):
543 keys = [keys]
544 keys = [keys]
544 if len(keys)==1:
545 if len(keys)==1:
545 d = self.executeAndRaise(msg, self.shell.pull, keys)
546 d = self.executeAndRaise(msg, self.shell.pull, keys)
546 d.addCallback(newserialized.serialize)
547 d.addCallback(newserialized.serialize)
547 return d
548 return d
548 elif len(keys)>1:
549 elif len(keys)>1:
549 d = self.executeAndRaise(msg, self.shell.pull, keys)
550 d = self.executeAndRaise(msg, self.shell.pull, keys)
550 @d.addCallback
551 @d.addCallback
551 def packThemUp(values):
552 def packThemUp(values):
552 serials = []
553 serials = []
553 for v in values:
554 for v in values:
554 try:
555 try:
555 serials.append(newserialized.serialize(v))
556 serials.append(newserialized.serialize(v))
556 except:
557 except:
557 return defer.fail(failure.Failure())
558 return defer.fail(failure.Failure())
558 return serials
559 return serials
559 return packThemUp
560 return packThemUp
560
561
561
562
562 def queue(methodToQueue):
563 def queue(methodToQueue):
563 def queuedMethod(this, *args, **kwargs):
564 def queuedMethod(this, *args, **kwargs):
564 name = methodToQueue.__name__
565 name = methodToQueue.__name__
565 return this.submitCommand(Command(name, *args, **kwargs))
566 return this.submitCommand(Command(name, *args, **kwargs))
566 return queuedMethod
567 return queuedMethod
567
568
568 class QueuedEngine(object):
569 class QueuedEngine(object):
569 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
570 """Adapt an IEngineBase to an IEngineQueued by wrapping it.
570
571
571 The resulting object will implement IEngineQueued which extends
572 The resulting object will implement IEngineQueued which extends
572 IEngineCore which extends (IEngineBase, IEngineSerialized).
573 IEngineCore which extends (IEngineBase, IEngineSerialized).
573
574
574 This seems like the best way of handling it, but I am not sure. The
575 This seems like the best way of handling it, but I am not sure. The
575 other option is to have the various base interfaces be used like
576 other option is to have the various base interfaces be used like
576 mix-in intefaces. The problem I have with this is adpatation is
577 mix-in intefaces. The problem I have with this is adpatation is
577 more difficult and complicated because there can be can multiple
578 more difficult and complicated because there can be can multiple
578 original and final Interfaces.
579 original and final Interfaces.
579 """
580 """
580
581
581 zi.implements(IEngineQueued)
582 zi.implements(IEngineQueued)
582
583
583 def __init__(self, engine):
584 def __init__(self, engine):
584 """Create a QueuedEngine object from an engine
585 """Create a QueuedEngine object from an engine
585
586
586 engine: An implementor of IEngineCore and IEngineSerialized
587 engine: An implementor of IEngineCore and IEngineSerialized
587 keepUpToDate: whether to update the remote status when the
588 keepUpToDate: whether to update the remote status when the
588 queue is empty. Defaults to False.
589 queue is empty. Defaults to False.
589 """
590 """
590
591
591 # This is the right way to do these tests rather than
592 # This is the right way to do these tests rather than
592 # IEngineCore in list(zi.providedBy(engine)) which will only
593 # IEngineCore in list(zi.providedBy(engine)) which will only
593 # picks of the interfaces that are directly declared by engine.
594 # picks of the interfaces that are directly declared by engine.
594 assert IEngineBase.providedBy(engine), \
595 assert IEngineBase.providedBy(engine), \
595 "engine passed to QueuedEngine doesn't provide IEngineBase"
596 "engine passed to QueuedEngine doesn't provide IEngineBase"
596
597
597 self.engine = engine
598 self.engine = engine
598 self.id = engine.id
599 self.id = engine.id
599 self.queued = []
600 self.queued = []
600 self.history = {}
601 self.history = {}
601 self.engineStatus = {}
602 self.engineStatus = {}
602 self.currentCommand = None
603 self.currentCommand = None
603 self.failureObservers = []
604 self.failureObservers = []
604
605
605 def _get_properties(self):
606 def _get_properties(self):
606 return self.engine.properties
607 return self.engine.properties
607
608
608 properties = property(_get_properties, lambda self, _: None)
609 properties = property(_get_properties, lambda self, _: None)
609 # Queue management methods. You should not call these directly
610 # Queue management methods. You should not call these directly
610
611
611 def submitCommand(self, cmd):
612 def submitCommand(self, cmd):
612 """Submit command to queue."""
613 """Submit command to queue."""
613
614
614 d = defer.Deferred()
615 d = defer.Deferred()
615 cmd.setDeferred(d)
616 cmd.setDeferred(d)
616 if self.currentCommand is not None:
617 if self.currentCommand is not None:
617 if self.currentCommand.finished:
618 if self.currentCommand.finished:
618 # log.msg("Running command immediately: %r" % cmd)
619 # log.msg("Running command immediately: %r" % cmd)
619 self.currentCommand = cmd
620 self.currentCommand = cmd
620 self.runCurrentCommand()
621 self.runCurrentCommand()
621 else: # command is still running
622 else: # command is still running
622 # log.msg("Command is running: %r" % self.currentCommand)
623 # log.msg("Command is running: %r" % self.currentCommand)
623 # log.msg("Queueing: %r" % cmd)
624 # log.msg("Queueing: %r" % cmd)
624 self.queued.append(cmd)
625 self.queued.append(cmd)
625 else:
626 else:
626 # log.msg("No current commands, running: %r" % cmd)
627 # log.msg("No current commands, running: %r" % cmd)
627 self.currentCommand = cmd
628 self.currentCommand = cmd
628 self.runCurrentCommand()
629 self.runCurrentCommand()
629 return d
630 return d
630
631
631 def runCurrentCommand(self):
632 def runCurrentCommand(self):
632 """Run current command."""
633 """Run current command."""
633
634
634 cmd = self.currentCommand
635 cmd = self.currentCommand
635 f = getattr(self.engine, cmd.remoteMethod, None)
636 f = getattr(self.engine, cmd.remoteMethod, None)
636 if f:
637 if f:
637 d = f(*cmd.args, **cmd.kwargs)
638 d = f(*cmd.args, **cmd.kwargs)
638 if cmd.remoteMethod is 'execute':
639 if cmd.remoteMethod is 'execute':
639 d.addCallback(self.saveResult)
640 d.addCallback(self.saveResult)
640 d.addCallback(self.finishCommand)
641 d.addCallback(self.finishCommand)
641 d.addErrback(self.abortCommand)
642 d.addErrback(self.abortCommand)
642 else:
643 else:
643 return defer.fail(AttributeError(cmd.remoteMethod))
644 return defer.fail(AttributeError(cmd.remoteMethod))
644
645
645 def _flushQueue(self):
646 def _flushQueue(self):
646 """Pop next command in queue and run it."""
647 """Pop next command in queue and run it."""
647
648
648 if len(self.queued) > 0:
649 if len(self.queued) > 0:
649 self.currentCommand = self.queued.pop(0)
650 self.currentCommand = self.queued.pop(0)
650 self.runCurrentCommand()
651 self.runCurrentCommand()
651
652
652 def saveResult(self, result):
653 def saveResult(self, result):
653 """Put the result in the history."""
654 """Put the result in the history."""
654 self.history[result['number']] = result
655 self.history[result['number']] = result
655 return result
656 return result
656
657
657 def finishCommand(self, result):
658 def finishCommand(self, result):
658 """Finish currrent command."""
659 """Finish currrent command."""
659
660
660 # The order of these commands is absolutely critical.
661 # The order of these commands is absolutely critical.
661 self.currentCommand.handleResult(result)
662 self.currentCommand.handleResult(result)
662 self.currentCommand.finished = True
663 self.currentCommand.finished = True
663 self._flushQueue()
664 self._flushQueue()
664 return result
665 return result
665
666
666 def abortCommand(self, reason):
667 def abortCommand(self, reason):
667 """Abort current command.
668 """Abort current command.
668
669
669 This eats the Failure but first passes it onto the Deferred that the
670 This eats the Failure but first passes it onto the Deferred that the
670 user has.
671 user has.
671
672
672 It also clear out the queue so subsequence commands don't run.
673 It also clear out the queue so subsequence commands don't run.
673 """
674 """
674
675
675 # The order of these 3 commands is absolutely critical. The currentCommand
676 # The order of these 3 commands is absolutely critical. The currentCommand
676 # must first be marked as finished BEFORE the queue is cleared and before
677 # must first be marked as finished BEFORE the queue is cleared and before
677 # the current command is sent the failure.
678 # the current command is sent the failure.
678 # Also, the queue must be cleared BEFORE the current command is sent the Failure
679 # Also, the queue must be cleared BEFORE the current command is sent the Failure
679 # otherwise the errback chain could trigger new commands to be added to the
680 # otherwise the errback chain could trigger new commands to be added to the
680 # queue before we clear it. We should clear ONLY the commands that were in
681 # queue before we clear it. We should clear ONLY the commands that were in
681 # the queue when the error occured.
682 # the queue when the error occured.
682 self.currentCommand.finished = True
683 self.currentCommand.finished = True
683 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
684 s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
684 self.clear_queue(msg=s)
685 self.clear_queue(msg=s)
685 self.currentCommand.handleError(reason)
686 self.currentCommand.handleError(reason)
686
687
687 return None
688 return None
688
689
689 #---------------------------------------------------------------------------
690 #---------------------------------------------------------------------------
690 # IEngineCore methods
691 # IEngineCore methods
691 #---------------------------------------------------------------------------
692 #---------------------------------------------------------------------------
692
693
693 @queue
694 @queue
694 def execute(self, lines):
695 def execute(self, lines):
695 pass
696 pass
696
697
697 @queue
698 @queue
698 def push(self, namespace):
699 def push(self, namespace):
699 pass
700 pass
700
701
701 @queue
702 @queue
702 def pull(self, keys):
703 def pull(self, keys):
703 pass
704 pass
704
705
705 @queue
706 @queue
706 def push_function(self, namespace):
707 def push_function(self, namespace):
707 pass
708 pass
708
709
709 @queue
710 @queue
710 def pull_function(self, keys):
711 def pull_function(self, keys):
711 pass
712 pass
712
713
713 def get_result(self, i=None):
714 def get_result(self, i=None):
714 if i is None:
715 if i is None:
715 i = max(self.history.keys()+[None])
716 i = max(self.history.keys()+[None])
716
717
717 cmd = self.history.get(i, None)
718 cmd = self.history.get(i, None)
718 # Uncomment this line to disable chaching of results
719 # Uncomment this line to disable chaching of results
719 #cmd = None
720 #cmd = None
720 if cmd is None:
721 if cmd is None:
721 return self.submitCommand(Command('get_result', i))
722 return self.submitCommand(Command('get_result', i))
722 else:
723 else:
723 return defer.succeed(cmd)
724 return defer.succeed(cmd)
724
725
725 def reset(self):
726 def reset(self):
726 self.clear_queue()
727 self.clear_queue()
727 self.history = {} # reset the cache - I am not sure we should do this
728 self.history = {} # reset the cache - I am not sure we should do this
728 return self.submitCommand(Command('reset'))
729 return self.submitCommand(Command('reset'))
729
730
730 def kill(self):
731 def kill(self):
731 self.clear_queue()
732 self.clear_queue()
732 return self.submitCommand(Command('kill'))
733 return self.submitCommand(Command('kill'))
733
734
734 @queue
735 @queue
735 def keys(self):
736 def keys(self):
736 pass
737 pass
737
738
738 #---------------------------------------------------------------------------
739 #---------------------------------------------------------------------------
739 # IEngineSerialized methods
740 # IEngineSerialized methods
740 #---------------------------------------------------------------------------
741 #---------------------------------------------------------------------------
741
742
742 @queue
743 @queue
743 def push_serialized(self, namespace):
744 def push_serialized(self, namespace):
744 pass
745 pass
745
746
746 @queue
747 @queue
747 def pull_serialized(self, keys):
748 def pull_serialized(self, keys):
748 pass
749 pass
749
750
750 #---------------------------------------------------------------------------
751 #---------------------------------------------------------------------------
751 # IEngineProperties methods
752 # IEngineProperties methods
752 #---------------------------------------------------------------------------
753 #---------------------------------------------------------------------------
753
754
754 @queue
755 @queue
755 def set_properties(self, namespace):
756 def set_properties(self, namespace):
756 pass
757 pass
757
758
758 @queue
759 @queue
759 def get_properties(self, keys=None):
760 def get_properties(self, keys=None):
760 pass
761 pass
761
762
762 @queue
763 @queue
763 def del_properties(self, keys):
764 def del_properties(self, keys):
764 pass
765 pass
765
766
766 @queue
767 @queue
767 def has_properties(self, keys):
768 def has_properties(self, keys):
768 pass
769 pass
769
770
770 @queue
771 @queue
771 def clear_properties(self):
772 def clear_properties(self):
772 pass
773 pass
773
774
774 #---------------------------------------------------------------------------
775 #---------------------------------------------------------------------------
775 # IQueuedEngine methods
776 # IQueuedEngine methods
776 #---------------------------------------------------------------------------
777 #---------------------------------------------------------------------------
777
778
778 def clear_queue(self, msg=''):
779 def clear_queue(self, msg=''):
779 """Clear the queue, but doesn't cancel the currently running commmand."""
780 """Clear the queue, but doesn't cancel the currently running commmand."""
780
781
781 for cmd in self.queued:
782 for cmd in self.queued:
782 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
783 cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
783 self.queued = []
784 self.queued = []
784 return defer.succeed(None)
785 return defer.succeed(None)
785
786
786 def queue_status(self):
787 def queue_status(self):
787 if self.currentCommand is not None:
788 if self.currentCommand is not None:
788 if self.currentCommand.finished:
789 if self.currentCommand.finished:
789 pending = repr(None)
790 pending = repr(None)
790 else:
791 else:
791 pending = repr(self.currentCommand)
792 pending = repr(self.currentCommand)
792 else:
793 else:
793 pending = repr(None)
794 pending = repr(None)
794 dikt = {'queue':map(repr,self.queued), 'pending':pending}
795 dikt = {'queue':map(repr,self.queued), 'pending':pending}
795 return defer.succeed(dikt)
796 return defer.succeed(dikt)
796
797
797 def register_failure_observer(self, obs):
798 def register_failure_observer(self, obs):
798 self.failureObservers.append(obs)
799 self.failureObservers.append(obs)
799
800
800 def unregister_failure_observer(self, obs):
801 def unregister_failure_observer(self, obs):
801 self.failureObservers.remove(obs)
802 self.failureObservers.remove(obs)
802
803
803
804
804 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
805 # Now register QueuedEngine as an adpater class that makes an IEngineBase into a
805 # IEngineQueued.
806 # IEngineQueued.
806 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
807 components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
807
808
808
809
809 class Command(object):
810 class Command(object):
810 """A command object that encapslates queued commands.
811 """A command object that encapslates queued commands.
811
812
812 This class basically keeps track of a command that has been queued
813 This class basically keeps track of a command that has been queued
813 in a QueuedEngine. It manages the deferreds and hold the method to be called
814 in a QueuedEngine. It manages the deferreds and hold the method to be called
814 and the arguments to that method.
815 and the arguments to that method.
815 """
816 """
816
817
817
818
818 def __init__(self, remoteMethod, *args, **kwargs):
819 def __init__(self, remoteMethod, *args, **kwargs):
819 """Build a new Command object."""
820 """Build a new Command object."""
820
821
821 self.remoteMethod = remoteMethod
822 self.remoteMethod = remoteMethod
822 self.args = args
823 self.args = args
823 self.kwargs = kwargs
824 self.kwargs = kwargs
824 self.finished = False
825 self.finished = False
825
826
826 def setDeferred(self, d):
827 def setDeferred(self, d):
827 """Sets the deferred attribute of the Command."""
828 """Sets the deferred attribute of the Command."""
828
829
829 self.deferred = d
830 self.deferred = d
830
831
831 def __repr__(self):
832 def __repr__(self):
832 if not self.args:
833 if not self.args:
833 args = ''
834 args = ''
834 else:
835 else:
835 args = str(self.args)[1:-2] #cut off (...,)
836 args = str(self.args)[1:-2] #cut off (...,)
836 for k,v in self.kwargs.iteritems():
837 for k,v in self.kwargs.iteritems():
837 if args:
838 if args:
838 args += ', '
839 args += ', '
839 args += '%s=%r' %(k,v)
840 args += '%s=%r' %(k,v)
840 return "%s(%s)" %(self.remoteMethod, args)
841 return "%s(%s)" %(self.remoteMethod, args)
841
842
842 def handleResult(self, result):
843 def handleResult(self, result):
843 """When the result is ready, relay it to self.deferred."""
844 """When the result is ready, relay it to self.deferred."""
844
845
845 self.deferred.callback(result)
846 self.deferred.callback(result)
846
847
847 def handleError(self, reason):
848 def handleError(self, reason):
848 """When an error has occured, relay it to self.deferred."""
849 """When an error has occured, relay it to self.deferred."""
849
850
850 self.deferred.errback(reason)
851 self.deferred.errback(reason)
851
852
852 class ThreadedEngineService(EngineService):
853 class ThreadedEngineService(EngineService):
853 """An EngineService subclass that defers execute commands to a separate
854 """An EngineService subclass that defers execute commands to a separate
854 thread.
855 thread.
855
856
856 ThreadedEngineService uses twisted.internet.threads.deferToThread to
857 ThreadedEngineService uses twisted.internet.threads.deferToThread to
857 defer execute requests to a separate thread. GUI frontends may want to
858 defer execute requests to a separate thread. GUI frontends may want to
858 use ThreadedEngineService as the engine in an
859 use ThreadedEngineService as the engine in an
859 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
860 IPython.frontend.frontendbase.FrontEndBase subclass to prevent
860 block execution from blocking the GUI thread.
861 block execution from blocking the GUI thread.
861 """
862 """
862
863
863 zi.implements(IEngineBase)
864 zi.implements(IEngineBase)
864
865
865 def __init__(self, shellClass=Interpreter, mpi=None):
866 def __init__(self, shellClass=Interpreter, mpi=None):
866 EngineService.__init__(self, shellClass, mpi)
867 EngineService.__init__(self, shellClass, mpi)
867
868
868 def wrapped_execute(self, msg, lines):
869 def wrapped_execute(self, msg, lines):
869 """Wrap self.shell.execute to add extra information to tracebacks"""
870 """Wrap self.shell.execute to add extra information to tracebacks"""
870
871
871 try:
872 try:
872 result = self.shell.execute(lines)
873 result = self.shell.execute(lines)
873 except Exception,e:
874 except Exception,e:
874 # This gives the following:
875 # This gives the following:
875 # et=exception class
876 # et=exception class
876 # ev=exception class instance
877 # ev=exception class instance
877 # tb=traceback object
878 # tb=traceback object
878 et,ev,tb = sys.exc_info()
879 et,ev,tb = sys.exc_info()
879 # This call adds attributes to the exception value
880 # This call adds attributes to the exception value
880 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
881 et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg)
881 # Add another attribute
882 # Add another attribute
882
883
883 # Create a new exception with the new attributes
884 # Create a new exception with the new attributes
884 e = et(ev._ipython_traceback_text)
885 e = et(ev._ipython_traceback_text)
885 e._ipython_engine_info = msg
886 e._ipython_engine_info = msg
886
887
887 # Re-raise
888 # Re-raise
888 raise e
889 raise e
889
890
890 return result
891 return result
891
892
892
893
893 def execute(self, lines):
894 def execute(self, lines):
894 # Only import this if we are going to use this class
895 # Only import this if we are going to use this class
895 from twisted.internet import threads
896 from twisted.internet import threads
896
897
897 msg = {'engineid':self.id,
898 msg = {'engineid':self.id,
898 'method':'execute',
899 'method':'execute',
899 'args':[lines]}
900 'args':[lines]}
900
901
901 d = threads.deferToThread(self.wrapped_execute, msg, lines)
902 d = threads.deferToThread(self.wrapped_execute, msg, lines)
902 d.addCallback(self.addIDToResult)
903 d.addCallback(self.addIDToResult)
903 return d
904 return d
@@ -1,753 +1,754 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3
3
4 """Adapt the IPython ControllerServer to IMultiEngine.
4 """Adapt the IPython ControllerServer to IMultiEngine.
5
5
6 This module provides classes that adapt a ControllerService to the
6 This module provides classes that adapt a ControllerService to the
7 IMultiEngine interface. This interface is a basic interactive interface
7 IMultiEngine interface. This interface is a basic interactive interface
8 for working with a set of engines where it is desired to have explicit
8 for working with a set of engines where it is desired to have explicit
9 access to each registered engine.
9 access to each registered engine.
10
10
11 The classes here are exposed to the network in files like:
11 The classes here are exposed to the network in files like:
12
12
13 * multienginevanilla.py
13 * multienginevanilla.py
14 * multienginepb.py
14 * multienginepb.py
15 """
15 """
16
16
17 __docformat__ = "restructuredtext en"
17 __docformat__ = "restructuredtext en"
18
18
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20 # Copyright (C) 2008 The IPython Development Team
20 # Copyright (C) 2008 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29
29
30 from new import instancemethod
30 from new import instancemethod
31 from types import FunctionType
31 from types import FunctionType
32
32
33 from twisted.application import service
33 from twisted.application import service
34 from twisted.internet import defer, reactor
34 from twisted.internet import defer, reactor
35 from twisted.python import log, components, failure
35 from twisted.python import log, components, failure
36 from zope.interface import Interface, implements, Attribute
36 from zope.interface import Interface, implements, Attribute
37
37
38 from IPython.tools import growl
38 from IPython.tools import growl
39 from IPython.kernel.util import printer
39 from IPython.kernel.util import printer
40 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel.twistedutil import gatherBoth
41 from IPython.kernel import map as Map
41 from IPython.kernel import map as Map
42 from IPython.kernel import error
42 from IPython.kernel import error
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 from IPython.kernel.controllerservice import \
44 from IPython.kernel.controllerservice import \
45 ControllerAdapterBase, \
45 ControllerAdapterBase, \
46 ControllerService, \
46 ControllerService, \
47 IControllerBase
47 IControllerBase
48
48
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
51 # Interfaces for the MultiEngine representation of a controller
51 # Interfaces for the MultiEngine representation of a controller
52 #-------------------------------------------------------------------------------
52 #-------------------------------------------------------------------------------
53
53
54 class IEngineMultiplexer(Interface):
54 class IEngineMultiplexer(Interface):
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56
56
57 This class simply acts as a multiplexer of methods that are in the
57 This class simply acts as a multiplexer of methods that are in the
58 various IEngines* interfaces. Thus the methods here are jut like those
58 various IEngines* interfaces. Thus the methods here are jut like those
59 in the IEngine* interfaces, but with an extra first argument, targets.
59 in the IEngine* interfaces, but with an extra first argument, targets.
60 The targets argument can have the following forms:
60 The targets argument can have the following forms:
61
61
62 * targets = 10 # Engines are indexed by ints
62 * targets = 10 # Engines are indexed by ints
63 * targets = [0,1,2,3] # A list of ints
63 * targets = [0,1,2,3] # A list of ints
64 * targets = 'all' # A string to indicate all targets
64 * targets = 'all' # A string to indicate all targets
65
65
66 If targets is bad in any way, an InvalidEngineID will be raised. This
66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 includes engines not being registered.
67 includes engines not being registered.
68
68
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 with length equal to the number of targets. The elements of the list will
70 with length equal to the number of targets. The elements of the list will
71 correspond to the return of the corresponding IEngine method.
71 correspond to the return of the corresponding IEngine method.
72
72
73 Failures are aggressive, meaning that if an action fails for any target,
73 Failures are aggressive, meaning that if an action fails for any target,
74 the overall action will fail immediately with that Failure.
74 the overall action will fail immediately with that Failure.
75
75
76 :Parameters:
76 :Parameters:
77 targets : int, list of ints, or 'all'
77 targets : int, list of ints, or 'all'
78 Engine ids the action will apply to.
78 Engine ids the action will apply to.
79
79
80 :Returns: Deferred to a list of results for each engine.
80 :Returns: Deferred to a list of results for each engine.
81
81
82 :Exception:
82 :Exception:
83 InvalidEngineID
83 InvalidEngineID
84 If the targets argument is bad or engines aren't registered.
84 If the targets argument is bad or engines aren't registered.
85 NoEnginesRegistered
85 NoEnginesRegistered
86 If there are no engines registered and targets='all'
86 If there are no engines registered and targets='all'
87 """
87 """
88
88
89 #---------------------------------------------------------------------------
89 #---------------------------------------------------------------------------
90 # Mutiplexed methods
90 # Mutiplexed methods
91 #---------------------------------------------------------------------------
91 #---------------------------------------------------------------------------
92
92
93 def execute(lines, targets='all'):
93 def execute(lines, targets='all'):
94 """Execute lines of Python code on targets.
94 """Execute lines of Python code on targets.
95
95
96 See the class docstring for information about targets and possible
96 See the class docstring for information about targets and possible
97 exceptions this method can raise.
97 exceptions this method can raise.
98
98
99 :Parameters:
99 :Parameters:
100 lines : str
100 lines : str
101 String of python code to be executed on targets.
101 String of python code to be executed on targets.
102 """
102 """
103
103
104 def push(namespace, targets='all'):
104 def push(namespace, targets='all'):
105 """Push dict namespace into the user's namespace on targets.
105 """Push dict namespace into the user's namespace on targets.
106
106
107 See the class docstring for information about targets and possible
107 See the class docstring for information about targets and possible
108 exceptions this method can raise.
108 exceptions this method can raise.
109
109
110 :Parameters:
110 :Parameters:
111 namspace : dict
111 namspace : dict
112 Dict of key value pairs to be put into the users namspace.
112 Dict of key value pairs to be put into the users namspace.
113 """
113 """
114
114
115 def pull(keys, targets='all'):
115 def pull(keys, targets='all'):
116 """Pull values out of the user's namespace on targets by keys.
116 """Pull values out of the user's namespace on targets by keys.
117
117
118 See the class docstring for information about targets and possible
118 See the class docstring for information about targets and possible
119 exceptions this method can raise.
119 exceptions this method can raise.
120
120
121 :Parameters:
121 :Parameters:
122 keys : tuple of strings
122 keys : tuple of strings
123 Sequence of keys to be pulled from user's namespace.
123 Sequence of keys to be pulled from user's namespace.
124 """
124 """
125
125
126 def push_function(namespace, targets='all'):
126 def push_function(namespace, targets='all'):
127 """"""
127 """"""
128
128
129 def pull_function(keys, targets='all'):
129 def pull_function(keys, targets='all'):
130 """"""
130 """"""
131
131
132 def get_result(i=None, targets='all'):
132 def get_result(i=None, targets='all'):
133 """Get the result for command i from targets.
133 """Get the result for command i from targets.
134
134
135 See the class docstring for information about targets and possible
135 See the class docstring for information about targets and possible
136 exceptions this method can raise.
136 exceptions this method can raise.
137
137
138 :Parameters:
138 :Parameters:
139 i : int or None
139 i : int or None
140 Command index or None to indicate most recent command.
140 Command index or None to indicate most recent command.
141 """
141 """
142
142
143 def reset(targets='all'):
143 def reset(targets='all'):
144 """Reset targets.
144 """Reset targets.
145
145
146 This clears the users namespace of the Engines, but won't cause
146 This clears the users namespace of the Engines, but won't cause
147 modules to be reloaded.
147 modules to be reloaded.
148 """
148 """
149
149
150 def keys(targets='all'):
150 def keys(targets='all'):
151 """Get variable names defined in user's namespace on targets."""
151 """Get variable names defined in user's namespace on targets."""
152
152
153 def kill(controller=False, targets='all'):
153 def kill(controller=False, targets='all'):
154 """Kill the targets Engines and possibly the controller.
154 """Kill the targets Engines and possibly the controller.
155
155
156 :Parameters:
156 :Parameters:
157 controller : boolean
157 controller : boolean
158 Should the controller be killed as well. If so all the
158 Should the controller be killed as well. If so all the
159 engines will be killed first no matter what targets is.
159 engines will be killed first no matter what targets is.
160 """
160 """
161
161
162 def push_serialized(namespace, targets='all'):
162 def push_serialized(namespace, targets='all'):
163 """Push a namespace of Serialized objects to targets.
163 """Push a namespace of Serialized objects to targets.
164
164
165 :Parameters:
165 :Parameters:
166 namespace : dict
166 namespace : dict
167 A dict whose keys are the variable names and whose values
167 A dict whose keys are the variable names and whose values
168 are serialized version of the objects.
168 are serialized version of the objects.
169 """
169 """
170
170
171 def pull_serialized(keys, targets='all'):
171 def pull_serialized(keys, targets='all'):
172 """Pull Serialized objects by keys from targets.
172 """Pull Serialized objects by keys from targets.
173
173
174 :Parameters:
174 :Parameters:
175 keys : tuple of strings
175 keys : tuple of strings
176 Sequence of variable names to pull as serialized objects.
176 Sequence of variable names to pull as serialized objects.
177 """
177 """
178
178
179 def clear_queue(targets='all'):
179 def clear_queue(targets='all'):
180 """Clear the queue of pending command for targets."""
180 """Clear the queue of pending command for targets."""
181
181
182 def queue_status(targets='all'):
182 def queue_status(targets='all'):
183 """Get the status of the queue on the targets."""
183 """Get the status of the queue on the targets."""
184
184
185 def set_properties(properties, targets='all'):
185 def set_properties(properties, targets='all'):
186 """set properties by key and value"""
186 """set properties by key and value"""
187
187
188 def get_properties(keys=None, targets='all'):
188 def get_properties(keys=None, targets='all'):
189 """get a list of properties by `keys`, if no keys specified, get all"""
189 """get a list of properties by `keys`, if no keys specified, get all"""
190
190
191 def del_properties(keys, targets='all'):
191 def del_properties(keys, targets='all'):
192 """delete properties by `keys`"""
192 """delete properties by `keys`"""
193
193
194 def has_properties(keys, targets='all'):
194 def has_properties(keys, targets='all'):
195 """get a list of bool values for whether `properties` has `keys`"""
195 """get a list of bool values for whether `properties` has `keys`"""
196
196
197 def clear_properties(targets='all'):
197 def clear_properties(targets='all'):
198 """clear the properties dict"""
198 """clear the properties dict"""
199
199
200
200
201 class IMultiEngine(IEngineMultiplexer):
201 class IMultiEngine(IEngineMultiplexer):
202 """A controller that exposes an explicit interface to all of its engines.
202 """A controller that exposes an explicit interface to all of its engines.
203
203
204 This is the primary inteface for interactive usage.
204 This is the primary inteface for interactive usage.
205 """
205 """
206
206
207 def get_ids():
207 def get_ids():
208 """Return list of currently registered ids.
208 """Return list of currently registered ids.
209
209
210 :Returns: A Deferred to a list of registered engine ids.
210 :Returns: A Deferred to a list of registered engine ids.
211 """
211 """
212
212
213
213
214
214
215 #-------------------------------------------------------------------------------
215 #-------------------------------------------------------------------------------
216 # Implementation of the core MultiEngine classes
216 # Implementation of the core MultiEngine classes
217 #-------------------------------------------------------------------------------
217 #-------------------------------------------------------------------------------
218
218
219 class MultiEngine(ControllerAdapterBase):
219 class MultiEngine(ControllerAdapterBase):
220 """The representation of a ControllerService as a IMultiEngine.
220 """The representation of a ControllerService as a IMultiEngine.
221
221
222 Although it is not implemented currently, this class would be where a
222 Although it is not implemented currently, this class would be where a
223 client/notification API is implemented. It could inherit from something
223 client/notification API is implemented. It could inherit from something
224 like results.NotifierParent and then use the notify method to send
224 like results.NotifierParent and then use the notify method to send
225 notifications.
225 notifications.
226 """
226 """
227
227
228 implements(IMultiEngine)
228 implements(IMultiEngine)
229
229
230 def __init(self, controller):
230 def __init(self, controller):
231 ControllerAdapterBase.__init__(self, controller)
231 ControllerAdapterBase.__init__(self, controller)
232
232
233 #---------------------------------------------------------------------------
233 #---------------------------------------------------------------------------
234 # Helper methods
234 # Helper methods
235 #---------------------------------------------------------------------------
235 #---------------------------------------------------------------------------
236
236
237 def engineList(self, targets):
237 def engineList(self, targets):
238 """Parse the targets argument into a list of valid engine objects.
238 """Parse the targets argument into a list of valid engine objects.
239
239
240 :Parameters:
240 :Parameters:
241 targets : int, list of ints or 'all'
241 targets : int, list of ints or 'all'
242 The targets argument to be parsed.
242 The targets argument to be parsed.
243
243
244 :Returns: List of engine objects.
244 :Returns: List of engine objects.
245
245
246 :Exception:
246 :Exception:
247 InvalidEngineID
247 InvalidEngineID
248 If targets is not valid or if an engine is not registered.
248 If targets is not valid or if an engine is not registered.
249 """
249 """
250 if isinstance(targets, int):
250 if isinstance(targets, int):
251 if targets not in self.engines.keys():
251 if targets not in self.engines.keys():
252 log.msg("Engine with id %i is not registered" % targets)
252 log.msg("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 else:
254 else:
255 return [self.engines[targets]]
255 return [self.engines[targets]]
256 elif isinstance(targets, (list, tuple)):
256 elif isinstance(targets, (list, tuple)):
257 for id in targets:
257 for id in targets:
258 if id not in self.engines.keys():
258 if id not in self.engines.keys():
259 log.msg("Engine with id %r is not registered" % id)
259 log.msg("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 return map(self.engines.get, targets)
261 return map(self.engines.get, targets)
262 elif targets == 'all':
262 elif targets == 'all':
263 eList = self.engines.values()
263 eList = self.engines.values()
264 if len(eList) == 0:
264 if len(eList) == 0:
265 msg = """There are no engines registered.
265 msg = """There are no engines registered.
266 Check the logs in ~/.ipython/log if you think there should have been."""
266 Check the logs in ~/.ipython/log if you think there should have been."""
267 raise error.NoEnginesRegistered(msg)
267 raise error.NoEnginesRegistered(msg)
268 else:
268 else:
269 return eList
269 return eList
270 else:
270 else:
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272
272
273 def _performOnEngines(self, methodName, *args, **kwargs):
273 def _performOnEngines(self, methodName, *args, **kwargs):
274 """Calls a method on engines and returns deferred to list of results.
274 """Calls a method on engines and returns deferred to list of results.
275
275
276 :Parameters:
276 :Parameters:
277 methodName : str
277 methodName : str
278 Name of the method to be called.
278 Name of the method to be called.
279 targets : int, list of ints, 'all'
279 targets : int, list of ints, 'all'
280 The targets argument to be parsed into a list of engine objects.
280 The targets argument to be parsed into a list of engine objects.
281 args
281 args
282 The positional keyword arguments to be passed to the engines.
282 The positional keyword arguments to be passed to the engines.
283 kwargs
283 kwargs
284 The keyword arguments passed to the method
284 The keyword arguments passed to the method
285
285
286 :Returns: List of deferreds to the results on each engine
286 :Returns: List of deferreds to the results on each engine
287
287
288 :Exception:
288 :Exception:
289 InvalidEngineID
289 InvalidEngineID
290 If the targets argument is bad in any way.
290 If the targets argument is bad in any way.
291 AttributeError
291 AttributeError
292 If the method doesn't exist on one of the engines.
292 If the method doesn't exist on one of the engines.
293 """
293 """
294 targets = kwargs.pop('targets')
294 targets = kwargs.pop('targets')
295 log.msg("Performing %s on %r" % (methodName, targets))
295 log.msg("Performing %s on %r" % (methodName, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 # This will and should raise if targets is not valid!
297 # This will and should raise if targets is not valid!
298 engines = self.engineList(targets)
298 engines = self.engineList(targets)
299 dList = []
299 dList = []
300 for e in engines:
300 for e in engines:
301 meth = getattr(e, methodName, None)
301 meth = getattr(e, methodName, None)
302 if meth is not None:
302 if meth is not None:
303 dList.append(meth(*args, **kwargs))
303 dList.append(meth(*args, **kwargs))
304 else:
304 else:
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 return dList
306 return dList
307
307
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 """Called _performOnEngines and wraps result/exception into deferred."""
309 """Called _performOnEngines and wraps result/exception into deferred."""
310 try:
310 try:
311 dList = self._performOnEngines(methodName, *args, **kwargs)
311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 return defer.fail(failure.Failure())
313 return defer.fail(failure.Failure())
314 else:
314 else:
315 # Having fireOnOneErrback is causing problems with the determinacy
315 # Having fireOnOneErrback is causing problems with the determinacy
316 # of the system. Basically, once a single engine has errbacked, this
316 # of the system. Basically, once a single engine has errbacked, this
317 # method returns. In some cases, this will cause client to submit
317 # method returns. In some cases, this will cause client to submit
318 # another command. Because the previous command is still running
318 # another command. Because the previous command is still running
319 # on some engines, this command will be queued. When those commands
319 # on some engines, this command will be queued. When those commands
320 # then errback, the second command will raise QueueCleared. Ahhh!
320 # then errback, the second command will raise QueueCleared. Ahhh!
321 d = gatherBoth(dList,
321 d = gatherBoth(dList,
322 fireOnOneErrback=0,
322 fireOnOneErrback=0,
323 consumeErrors=1,
323 consumeErrors=1,
324 logErrors=0)
324 logErrors=0)
325 d.addCallback(error.collect_exceptions, methodName)
325 d.addCallback(error.collect_exceptions, methodName)
326 return d
326 return d
327
327
328 #---------------------------------------------------------------------------
328 #---------------------------------------------------------------------------
329 # General IMultiEngine methods
329 # General IMultiEngine methods
330 #---------------------------------------------------------------------------
330 #---------------------------------------------------------------------------
331
331
332 def get_ids(self):
332 def get_ids(self):
333 return defer.succeed(self.engines.keys())
333 return defer.succeed(self.engines.keys())
334
334
335 #---------------------------------------------------------------------------
335 #---------------------------------------------------------------------------
336 # IEngineMultiplexer methods
336 # IEngineMultiplexer methods
337 #---------------------------------------------------------------------------
337 #---------------------------------------------------------------------------
338
338
339 def execute(self, lines, targets='all'):
339 def execute(self, lines, targets='all'):
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341
341
342 def push(self, ns, targets='all'):
342 def push(self, ns, targets='all'):
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344
344
345 def pull(self, keys, targets='all'):
345 def pull(self, keys, targets='all'):
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347
347
348 def push_function(self, ns, targets='all'):
348 def push_function(self, ns, targets='all'):
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350
350
351 def pull_function(self, keys, targets='all'):
351 def pull_function(self, keys, targets='all'):
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353
353
354 def get_result(self, i=None, targets='all'):
354 def get_result(self, i=None, targets='all'):
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356
356
357 def reset(self, targets='all'):
357 def reset(self, targets='all'):
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359
359
360 def keys(self, targets='all'):
360 def keys(self, targets='all'):
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362
362
363 def kill(self, controller=False, targets='all'):
363 def kill(self, controller=False, targets='all'):
364 if controller:
364 if controller:
365 targets = 'all'
365 targets = 'all'
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 if controller:
367 if controller:
368 log.msg("Killing controller")
368 log.msg("Killing controller")
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 # Consume any weird stuff coming back
370 # Consume any weird stuff coming back
371 d.addBoth(lambda _: None)
371 d.addBoth(lambda _: None)
372 return d
372 return d
373
373
374 def push_serialized(self, namespace, targets='all'):
374 def push_serialized(self, namespace, targets='all'):
375 for k, v in namespace.iteritems():
375 for k, v in namespace.iteritems():
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 return d
378 return d
379
379
380 def pull_serialized(self, keys, targets='all'):
380 def pull_serialized(self, keys, targets='all'):
381 try:
381 try:
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 return defer.fail(failure.Failure())
384 return defer.fail(failure.Failure())
385 else:
385 else:
386 for d in dList:
386 for d in dList:
387 d.addCallback(self._logSizes)
387 d.addCallback(self._logSizes)
388 d = gatherBoth(dList,
388 d = gatherBoth(dList,
389 fireOnOneErrback=0,
389 fireOnOneErrback=0,
390 consumeErrors=1,
390 consumeErrors=1,
391 logErrors=0)
391 logErrors=0)
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 return d
393 return d
394
394
395 def _logSizes(self, listOfSerialized):
395 def _logSizes(self, listOfSerialized):
396 if isinstance(listOfSerialized, (list, tuple)):
396 if isinstance(listOfSerialized, (list, tuple)):
397 for s in listOfSerialized:
397 for s in listOfSerialized:
398 log.msg("Pulled object is %f MB" % s.getDataSize())
398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 else:
399 else:
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 return listOfSerialized
401 return listOfSerialized
402
402
403 def clear_queue(self, targets='all'):
403 def clear_queue(self, targets='all'):
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405
405
406 def queue_status(self, targets='all'):
406 def queue_status(self, targets='all'):
407 log.msg("Getting queue status on %r" % targets)
407 log.msg("Getting queue status on %r" % targets)
408 try:
408 try:
409 engines = self.engineList(targets)
409 engines = self.engineList(targets)
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 return defer.fail(failure.Failure())
411 return defer.fail(failure.Failure())
412 else:
412 else:
413 dList = []
413 dList = []
414 for e in engines:
414 for e in engines:
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 d = gatherBoth(dList,
416 d = gatherBoth(dList,
417 fireOnOneErrback=0,
417 fireOnOneErrback=0,
418 consumeErrors=1,
418 consumeErrors=1,
419 logErrors=0)
419 logErrors=0)
420 d.addCallback(error.collect_exceptions, 'queue_status')
420 d.addCallback(error.collect_exceptions, 'queue_status')
421 return d
421 return d
422
422
423 def get_properties(self, keys=None, targets='all'):
423 def get_properties(self, keys=None, targets='all'):
424 log.msg("Getting properties on %r" % targets)
424 log.msg("Getting properties on %r" % targets)
425 try:
425 try:
426 engines = self.engineList(targets)
426 engines = self.engineList(targets)
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 return defer.fail(failure.Failure())
428 return defer.fail(failure.Failure())
429 else:
429 else:
430 dList = [e.get_properties(keys) for e in engines]
430 dList = [e.get_properties(keys) for e in engines]
431 d = gatherBoth(dList,
431 d = gatherBoth(dList,
432 fireOnOneErrback=0,
432 fireOnOneErrback=0,
433 consumeErrors=1,
433 consumeErrors=1,
434 logErrors=0)
434 logErrors=0)
435 d.addCallback(error.collect_exceptions, 'get_properties')
435 d.addCallback(error.collect_exceptions, 'get_properties')
436 return d
436 return d
437
437
438 def set_properties(self, properties, targets='all'):
438 def set_properties(self, properties, targets='all'):
439 log.msg("Setting properties on %r" % targets)
439 log.msg("Setting properties on %r" % targets)
440 try:
440 try:
441 engines = self.engineList(targets)
441 engines = self.engineList(targets)
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 return defer.fail(failure.Failure())
443 return defer.fail(failure.Failure())
444 else:
444 else:
445 dList = [e.set_properties(properties) for e in engines]
445 dList = [e.set_properties(properties) for e in engines]
446 d = gatherBoth(dList,
446 d = gatherBoth(dList,
447 fireOnOneErrback=0,
447 fireOnOneErrback=0,
448 consumeErrors=1,
448 consumeErrors=1,
449 logErrors=0)
449 logErrors=0)
450 d.addCallback(error.collect_exceptions, 'set_properties')
450 d.addCallback(error.collect_exceptions, 'set_properties')
451 return d
451 return d
452
452
453 def has_properties(self, keys, targets='all'):
453 def has_properties(self, keys, targets='all'):
454 log.msg("Checking properties on %r" % targets)
454 log.msg("Checking properties on %r" % targets)
455 try:
455 try:
456 engines = self.engineList(targets)
456 engines = self.engineList(targets)
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 return defer.fail(failure.Failure())
458 return defer.fail(failure.Failure())
459 else:
459 else:
460 dList = [e.has_properties(keys) for e in engines]
460 dList = [e.has_properties(keys) for e in engines]
461 d = gatherBoth(dList,
461 d = gatherBoth(dList,
462 fireOnOneErrback=0,
462 fireOnOneErrback=0,
463 consumeErrors=1,
463 consumeErrors=1,
464 logErrors=0)
464 logErrors=0)
465 d.addCallback(error.collect_exceptions, 'has_properties')
465 d.addCallback(error.collect_exceptions, 'has_properties')
466 return d
466 return d
467
467
468 def del_properties(self, keys, targets='all'):
468 def del_properties(self, keys, targets='all'):
469 log.msg("Deleting properties on %r" % targets)
469 log.msg("Deleting properties on %r" % targets)
470 try:
470 try:
471 engines = self.engineList(targets)
471 engines = self.engineList(targets)
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 return defer.fail(failure.Failure())
473 return defer.fail(failure.Failure())
474 else:
474 else:
475 dList = [e.del_properties(keys) for e in engines]
475 dList = [e.del_properties(keys) for e in engines]
476 d = gatherBoth(dList,
476 d = gatherBoth(dList,
477 fireOnOneErrback=0,
477 fireOnOneErrback=0,
478 consumeErrors=1,
478 consumeErrors=1,
479 logErrors=0)
479 logErrors=0)
480 d.addCallback(error.collect_exceptions, 'del_properties')
480 d.addCallback(error.collect_exceptions, 'del_properties')
481 return d
481 return d
482
482
483 def clear_properties(self, targets='all'):
483 def clear_properties(self, targets='all'):
484 log.msg("Clearing properties on %r" % targets)
484 log.msg("Clearing properties on %r" % targets)
485 try:
485 try:
486 engines = self.engineList(targets)
486 engines = self.engineList(targets)
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 return defer.fail(failure.Failure())
488 return defer.fail(failure.Failure())
489 else:
489 else:
490 dList = [e.clear_properties() for e in engines]
490 dList = [e.clear_properties() for e in engines]
491 d = gatherBoth(dList,
491 d = gatherBoth(dList,
492 fireOnOneErrback=0,
492 fireOnOneErrback=0,
493 consumeErrors=1,
493 consumeErrors=1,
494 logErrors=0)
494 logErrors=0)
495 d.addCallback(error.collect_exceptions, 'clear_properties')
495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 return d
496 return d
497
497
498
498
499 components.registerAdapter(MultiEngine,
499 components.registerAdapter(MultiEngine,
500 IControllerBase,
500 IControllerBase,
501 IMultiEngine)
501 IMultiEngine)
502
502
503
503
504 #-------------------------------------------------------------------------------
504 #-------------------------------------------------------------------------------
505 # Interfaces for the Synchronous MultiEngine
505 # Interfaces for the Synchronous MultiEngine
506 #-------------------------------------------------------------------------------
506 #-------------------------------------------------------------------------------
507
507
508 class ISynchronousEngineMultiplexer(Interface):
508 class ISynchronousEngineMultiplexer(Interface):
509 pass
509 pass
510
510
511
511
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 """Synchronous, two-phase version of IMultiEngine.
513 """Synchronous, two-phase version of IMultiEngine.
514
514
515 Methods in this interface are identical to those of IMultiEngine, but they
515 Methods in this interface are identical to those of IMultiEngine, but they
516 take one additional argument:
516 take one additional argument:
517
517
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519
519
520 :Parameters:
520 :Parameters:
521 block : boolean
521 block : boolean
522 Should the method return a deferred to a deferredID or the
522 Should the method return a deferred to a deferredID or the
523 actual result. If block=False a deferred to a deferredID is
523 actual result. If block=False a deferred to a deferredID is
524 returned and the user must call `get_pending_deferred` at a later
524 returned and the user must call `get_pending_deferred` at a later
525 point. If block=True, a deferred to the actual result comes back.
525 point. If block=True, a deferred to the actual result comes back.
526 """
526 """
527 def get_pending_deferred(deferredID, block=True):
527 def get_pending_deferred(deferredID, block=True):
528 """"""
528 """"""
529
529
530 def clear_pending_deferreds():
530 def clear_pending_deferreds():
531 """"""
531 """"""
532
532
533
533
534 #-------------------------------------------------------------------------------
534 #-------------------------------------------------------------------------------
535 # Implementation of the Synchronous MultiEngine
535 # Implementation of the Synchronous MultiEngine
536 #-------------------------------------------------------------------------------
536 #-------------------------------------------------------------------------------
537
537
538 class SynchronousMultiEngine(PendingDeferredManager):
538 class SynchronousMultiEngine(PendingDeferredManager):
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540
540
541 Warning, this class uses a decorator that currently uses **kwargs.
541 Warning, this class uses a decorator that currently uses **kwargs.
542 Because of this block must be passed as a kwarg, not positionally.
542 Because of this block must be passed as a kwarg, not positionally.
543 """
543 """
544
544
545 implements(ISynchronousMultiEngine)
545 implements(ISynchronousMultiEngine)
546
546
547 def __init__(self, multiengine):
547 def __init__(self, multiengine):
548 self.multiengine = multiengine
548 self.multiengine = multiengine
549 PendingDeferredManager.__init__(self)
549 PendingDeferredManager.__init__(self)
550
550
551 #---------------------------------------------------------------------------
551 #---------------------------------------------------------------------------
552 # Decorated pending deferred methods
552 # Decorated pending deferred methods
553 #---------------------------------------------------------------------------
553 #---------------------------------------------------------------------------
554
554
555 @profile
555 @two_phase
556 @two_phase
556 def execute(self, lines, targets='all'):
557 def execute(self, lines, targets='all'):
557 d = self.multiengine.execute(lines, targets)
558 d = self.multiengine.execute(lines, targets)
558 return d
559 return d
559
560
560 @two_phase
561 @two_phase
561 def push(self, namespace, targets='all'):
562 def push(self, namespace, targets='all'):
562 return self.multiengine.push(namespace, targets)
563 return self.multiengine.push(namespace, targets)
563
564
564 @two_phase
565 @two_phase
565 def pull(self, keys, targets='all'):
566 def pull(self, keys, targets='all'):
566 d = self.multiengine.pull(keys, targets)
567 d = self.multiengine.pull(keys, targets)
567 return d
568 return d
568
569
569 @two_phase
570 @two_phase
570 def push_function(self, namespace, targets='all'):
571 def push_function(self, namespace, targets='all'):
571 return self.multiengine.push_function(namespace, targets)
572 return self.multiengine.push_function(namespace, targets)
572
573
573 @two_phase
574 @two_phase
574 def pull_function(self, keys, targets='all'):
575 def pull_function(self, keys, targets='all'):
575 d = self.multiengine.pull_function(keys, targets)
576 d = self.multiengine.pull_function(keys, targets)
576 return d
577 return d
577
578
578 @two_phase
579 @two_phase
579 def get_result(self, i=None, targets='all'):
580 def get_result(self, i=None, targets='all'):
580 return self.multiengine.get_result(i, targets='all')
581 return self.multiengine.get_result(i, targets='all')
581
582
582 @two_phase
583 @two_phase
583 def reset(self, targets='all'):
584 def reset(self, targets='all'):
584 return self.multiengine.reset(targets)
585 return self.multiengine.reset(targets)
585
586
586 @two_phase
587 @two_phase
587 def keys(self, targets='all'):
588 def keys(self, targets='all'):
588 return self.multiengine.keys(targets)
589 return self.multiengine.keys(targets)
589
590
590 @two_phase
591 @two_phase
591 def kill(self, controller=False, targets='all'):
592 def kill(self, controller=False, targets='all'):
592 return self.multiengine.kill(controller, targets)
593 return self.multiengine.kill(controller, targets)
593
594
594 @two_phase
595 @two_phase
595 def push_serialized(self, namespace, targets='all'):
596 def push_serialized(self, namespace, targets='all'):
596 return self.multiengine.push_serialized(namespace, targets)
597 return self.multiengine.push_serialized(namespace, targets)
597
598
598 @two_phase
599 @two_phase
599 def pull_serialized(self, keys, targets='all'):
600 def pull_serialized(self, keys, targets='all'):
600 return self.multiengine.pull_serialized(keys, targets)
601 return self.multiengine.pull_serialized(keys, targets)
601
602
602 @two_phase
603 @two_phase
603 def clear_queue(self, targets='all'):
604 def clear_queue(self, targets='all'):
604 return self.multiengine.clear_queue(targets)
605 return self.multiengine.clear_queue(targets)
605
606
606 @two_phase
607 @two_phase
607 def queue_status(self, targets='all'):
608 def queue_status(self, targets='all'):
608 return self.multiengine.queue_status(targets)
609 return self.multiengine.queue_status(targets)
609
610
610 @two_phase
611 @two_phase
611 def set_properties(self, properties, targets='all'):
612 def set_properties(self, properties, targets='all'):
612 return self.multiengine.set_properties(properties, targets)
613 return self.multiengine.set_properties(properties, targets)
613
614
614 @two_phase
615 @two_phase
615 def get_properties(self, keys=None, targets='all'):
616 def get_properties(self, keys=None, targets='all'):
616 return self.multiengine.get_properties(keys, targets)
617 return self.multiengine.get_properties(keys, targets)
617
618
618 @two_phase
619 @two_phase
619 def has_properties(self, keys, targets='all'):
620 def has_properties(self, keys, targets='all'):
620 return self.multiengine.has_properties(keys, targets)
621 return self.multiengine.has_properties(keys, targets)
621
622
622 @two_phase
623 @two_phase
623 def del_properties(self, keys, targets='all'):
624 def del_properties(self, keys, targets='all'):
624 return self.multiengine.del_properties(keys, targets)
625 return self.multiengine.del_properties(keys, targets)
625
626
626 @two_phase
627 @two_phase
627 def clear_properties(self, targets='all'):
628 def clear_properties(self, targets='all'):
628 return self.multiengine.clear_properties(targets)
629 return self.multiengine.clear_properties(targets)
629
630
630 #---------------------------------------------------------------------------
631 #---------------------------------------------------------------------------
631 # IMultiEngine methods
632 # IMultiEngine methods
632 #---------------------------------------------------------------------------
633 #---------------------------------------------------------------------------
633
634
634 def get_ids(self):
635 def get_ids(self):
635 """Return a list of registered engine ids.
636 """Return a list of registered engine ids.
636
637
637 Never use the two phase block/non-block stuff for this.
638 Never use the two phase block/non-block stuff for this.
638 """
639 """
639 return self.multiengine.get_ids()
640 return self.multiengine.get_ids()
640
641
641
642
642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643
644
644
645
645 #-------------------------------------------------------------------------------
646 #-------------------------------------------------------------------------------
646 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 #-------------------------------------------------------------------------------
648 #-------------------------------------------------------------------------------
648
649
649 #-------------------------------------------------------------------------------
650 #-------------------------------------------------------------------------------
650 # IMultiEngineCoordinator
651 # IMultiEngineCoordinator
651 #-------------------------------------------------------------------------------
652 #-------------------------------------------------------------------------------
652
653
653 class IMultiEngineCoordinator(Interface):
654 class IMultiEngineCoordinator(Interface):
654 """Methods that work on multiple engines explicitly."""
655 """Methods that work on multiple engines explicitly."""
655
656
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets."""
658 """Partition and distribute a sequence to targets."""
658
659
659 def gather(key, dist='b', targets='all'):
660 def gather(key, dist='b', targets='all'):
660 """Gather object key from targets."""
661 """Gather object key from targets."""
661
662
662 def raw_map(func, seqs, dist='b', targets='all'):
663 def raw_map(func, seqs, dist='b', targets='all'):
663 """
664 """
664 A parallelized version of Python's builtin `map` function.
665 A parallelized version of Python's builtin `map` function.
665
666
666 This has a slightly different syntax than the builtin `map`.
667 This has a slightly different syntax than the builtin `map`.
667 This is needed because we need to have keyword arguments and thus
668 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
669 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
670 be passed in a list or tuple.
670
671
671 The equivalence is:
672 The equivalence is:
672
673
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
675
675 Most users will want to use parallel functions or the `mapper`
676 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
677 and `map` methods for an API that follows that of the builtin
677 `map`.
678 `map`.
678 """
679 """
679
680
680
681
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 """Methods that work on multiple engines explicitly."""
683 """Methods that work on multiple engines explicitly."""
683
684
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 """Partition and distribute a sequence to targets."""
686 """Partition and distribute a sequence to targets."""
686
687
687 def gather(key, dist='b', targets='all', block=True):
688 def gather(key, dist='b', targets='all', block=True):
688 """Gather object key from targets"""
689 """Gather object key from targets"""
689
690
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
692 """
692 A parallelized version of Python's builtin map.
693 A parallelized version of Python's builtin map.
693
694
694 This has a slightly different syntax than the builtin `map`.
695 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
696 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
697 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
698 be passed in a list or tuple.
698
699
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
701
701 Most users will want to use parallel functions or the `mapper`
702 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
703 and `map` methods for an API that follows that of the builtin
703 `map`.
704 `map`.
704 """
705 """
705
706
706
707
707 #-------------------------------------------------------------------------------
708 #-------------------------------------------------------------------------------
708 # IMultiEngineExtras
709 # IMultiEngineExtras
709 #-------------------------------------------------------------------------------
710 #-------------------------------------------------------------------------------
710
711
711 class IMultiEngineExtras(Interface):
712 class IMultiEngineExtras(Interface):
712
713
713 def zip_pull(targets, keys):
714 def zip_pull(targets, keys):
714 """
715 """
715 Pull, but return results in a different format from `pull`.
716 Pull, but return results in a different format from `pull`.
716
717
717 This method basically returns zip(pull(targets, *keys)), with a few
718 This method basically returns zip(pull(targets, *keys)), with a few
718 edge cases handled differently. Users of chainsaw will find this format
719 edge cases handled differently. Users of chainsaw will find this format
719 familiar.
720 familiar.
720 """
721 """
721
722
722 def run(targets, fname):
723 def run(targets, fname):
723 """Run a .py file on targets."""
724 """Run a .py file on targets."""
724
725
725
726
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 def zip_pull(targets, keys, block=True):
728 def zip_pull(targets, keys, block=True):
728 """
729 """
729 Pull, but return results in a different format from `pull`.
730 Pull, but return results in a different format from `pull`.
730
731
731 This method basically returns zip(pull(targets, *keys)), with a few
732 This method basically returns zip(pull(targets, *keys)), with a few
732 edge cases handled differently. Users of chainsaw will find this format
733 edge cases handled differently. Users of chainsaw will find this format
733 familiar.
734 familiar.
734 """
735 """
735
736
736 def run(targets, fname, block=True):
737 def run(targets, fname, block=True):
737 """Run a .py file on targets."""
738 """Run a .py file on targets."""
738
739
739 #-------------------------------------------------------------------------------
740 #-------------------------------------------------------------------------------
740 # The full MultiEngine interface
741 # The full MultiEngine interface
741 #-------------------------------------------------------------------------------
742 #-------------------------------------------------------------------------------
742
743
743 class IFullMultiEngine(IMultiEngine,
744 class IFullMultiEngine(IMultiEngine,
744 IMultiEngineCoordinator,
745 IMultiEngineCoordinator,
745 IMultiEngineExtras):
746 IMultiEngineExtras):
746 pass
747 pass
747
748
748
749
749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 ISynchronousMultiEngineCoordinator,
751 ISynchronousMultiEngineCoordinator,
751 ISynchronousMultiEngineExtras):
752 ISynchronousMultiEngineExtras):
752 pass
753 pass
753
754
1 NO CONTENT: modified file
NO CONTENT: modified file
General Comments 0
You need to be logged in to leave comments. Login now