Show More
@@ -1,903 +1,904 b'' | |||||
1 | # encoding: utf-8 |
|
1 | # encoding: utf-8 | |
2 | # -*- test-case-name: IPython.kernel.tests.test_engineservice -*- |
|
2 | # -*- test-case-name: IPython.kernel.tests.test_engineservice -*- | |
3 |
|
3 | |||
4 | """A Twisted Service Representation of the IPython core. |
|
4 | """A Twisted Service Representation of the IPython core. | |
5 |
|
5 | |||
6 | The IPython Core exposed to the network is called the Engine. Its |
|
6 | The IPython Core exposed to the network is called the Engine. Its | |
7 | representation in Twisted in the EngineService. Interfaces and adapters |
|
7 | representation in Twisted in the EngineService. Interfaces and adapters | |
8 | are used to abstract out the details of the actual network protocol used. |
|
8 | are used to abstract out the details of the actual network protocol used. | |
9 | The EngineService is an Engine that knows nothing about the actual protocol |
|
9 | The EngineService is an Engine that knows nothing about the actual protocol | |
10 | used. |
|
10 | used. | |
11 |
|
11 | |||
12 | The EngineService is exposed with various network protocols in modules like: |
|
12 | The EngineService is exposed with various network protocols in modules like: | |
13 |
|
13 | |||
14 | enginepb.py |
|
14 | enginepb.py | |
15 | enginevanilla.py |
|
15 | enginevanilla.py | |
16 |
|
16 | |||
17 | As of 12/12/06 the classes in this module have been simplified greatly. It was |
|
17 | As of 12/12/06 the classes in this module have been simplified greatly. It was | |
18 | felt that we had over-engineered things. To improve the maintainability of the |
|
18 | felt that we had over-engineered things. To improve the maintainability of the | |
19 | code we have taken out the ICompleteEngine interface and the completeEngine |
|
19 | code we have taken out the ICompleteEngine interface and the completeEngine | |
20 | method that automatically added methods to engines. |
|
20 | method that automatically added methods to engines. | |
21 |
|
21 | |||
22 | """ |
|
22 | """ | |
23 |
|
23 | |||
24 | __docformat__ = "restructuredtext en" |
|
24 | __docformat__ = "restructuredtext en" | |
25 |
|
25 | |||
26 | #------------------------------------------------------------------------------- |
|
26 | #------------------------------------------------------------------------------- | |
27 | # Copyright (C) 2008 The IPython Development Team |
|
27 | # Copyright (C) 2008 The IPython Development Team | |
28 | # |
|
28 | # | |
29 | # Distributed under the terms of the BSD License. The full license is in |
|
29 | # Distributed under the terms of the BSD License. The full license is in | |
30 | # the file COPYING, distributed as part of this software. |
|
30 | # the file COPYING, distributed as part of this software. | |
31 | #------------------------------------------------------------------------------- |
|
31 | #------------------------------------------------------------------------------- | |
32 |
|
32 | |||
33 | #------------------------------------------------------------------------------- |
|
33 | #------------------------------------------------------------------------------- | |
34 | # Imports |
|
34 | # Imports | |
35 | #------------------------------------------------------------------------------- |
|
35 | #------------------------------------------------------------------------------- | |
36 |
|
36 | |||
37 | import os, sys, copy |
|
37 | import os, sys, copy | |
38 | import cPickle as pickle |
|
38 | import cPickle as pickle | |
39 | from new import instancemethod |
|
39 | from new import instancemethod | |
40 |
|
40 | |||
41 | from twisted.application import service |
|
41 | from twisted.application import service | |
42 | from twisted.internet import defer, reactor |
|
42 | from twisted.internet import defer, reactor | |
43 | from twisted.python import log, failure, components |
|
43 | from twisted.python import log, failure, components | |
44 | import zope.interface as zi |
|
44 | import zope.interface as zi | |
45 |
|
45 | |||
46 | from IPython.kernel.core.interpreter import Interpreter |
|
46 | from IPython.kernel.core.interpreter import Interpreter | |
47 | from IPython.kernel import newserialized, error, util |
|
47 | from IPython.kernel import newserialized, error, util | |
48 | from IPython.kernel.util import printer |
|
48 | from IPython.kernel.util import printer | |
49 | from IPython.kernel.twistedutil import gatherBoth, DeferredList |
|
49 | from IPython.kernel.twistedutil import gatherBoth, DeferredList | |
50 | from IPython.kernel import codeutil |
|
50 | from IPython.kernel import codeutil | |
51 |
|
51 | |||
52 |
|
52 | |||
53 | #------------------------------------------------------------------------------- |
|
53 | #------------------------------------------------------------------------------- | |
54 | # Interface specification for the Engine |
|
54 | # Interface specification for the Engine | |
55 | #------------------------------------------------------------------------------- |
|
55 | #------------------------------------------------------------------------------- | |
56 |
|
56 | |||
57 | class IEngineCore(zi.Interface): |
|
57 | class IEngineCore(zi.Interface): | |
58 | """The minimal required interface for the IPython Engine. |
|
58 | """The minimal required interface for the IPython Engine. | |
59 |
|
59 | |||
60 | This interface provides a formal specification of the IPython core. |
|
60 | This interface provides a formal specification of the IPython core. | |
61 | All these methods should return deferreds regardless of what side of a |
|
61 | All these methods should return deferreds regardless of what side of a | |
62 | network connection they are on. |
|
62 | network connection they are on. | |
63 |
|
63 | |||
64 | In general, this class simply wraps a shell class and wraps its return |
|
64 | In general, this class simply wraps a shell class and wraps its return | |
65 | values as Deferred objects. If the underlying shell class method raises |
|
65 | values as Deferred objects. If the underlying shell class method raises | |
66 | an exception, this class should convert it to a twisted.failure.Failure |
|
66 | an exception, this class should convert it to a twisted.failure.Failure | |
67 | that will be propagated along the Deferred's errback chain. |
|
67 | that will be propagated along the Deferred's errback chain. | |
68 |
|
68 | |||
69 | In addition, Failures are aggressive. By this, we mean that if a method |
|
69 | In addition, Failures are aggressive. By this, we mean that if a method | |
70 | is performing multiple actions (like pulling multiple object) if any |
|
70 | is performing multiple actions (like pulling multiple object) if any | |
71 | single one fails, the entire method will fail with that Failure. It is |
|
71 | single one fails, the entire method will fail with that Failure. It is | |
72 | all or nothing. |
|
72 | all or nothing. | |
73 | """ |
|
73 | """ | |
74 |
|
74 | |||
75 | id = zi.interface.Attribute("the id of the Engine object") |
|
75 | id = zi.interface.Attribute("the id of the Engine object") | |
76 | properties = zi.interface.Attribute("A dict of properties of the Engine") |
|
76 | properties = zi.interface.Attribute("A dict of properties of the Engine") | |
77 |
|
77 | |||
78 | def execute(lines): |
|
78 | def execute(lines): | |
79 | """Execute lines of Python code. |
|
79 | """Execute lines of Python code. | |
80 |
|
80 | |||
81 | Returns a dictionary with keys (id, number, stdin, stdout, stderr) |
|
81 | Returns a dictionary with keys (id, number, stdin, stdout, stderr) | |
82 | upon success. |
|
82 | upon success. | |
83 |
|
83 | |||
84 | Returns a failure object if the execution of lines raises an exception. |
|
84 | Returns a failure object if the execution of lines raises an exception. | |
85 | """ |
|
85 | """ | |
86 |
|
86 | |||
87 | def push(namespace): |
|
87 | def push(namespace): | |
88 | """Push dict namespace into the user's namespace. |
|
88 | """Push dict namespace into the user's namespace. | |
89 |
|
89 | |||
90 | Returns a deferred to None or a failure. |
|
90 | Returns a deferred to None or a failure. | |
91 | """ |
|
91 | """ | |
92 |
|
92 | |||
93 | def pull(keys): |
|
93 | def pull(keys): | |
94 | """Pulls values out of the user's namespace by keys. |
|
94 | """Pulls values out of the user's namespace by keys. | |
95 |
|
95 | |||
96 | Returns a deferred to a tuple objects or a single object. |
|
96 | Returns a deferred to a tuple objects or a single object. | |
97 |
|
97 | |||
98 | Raises NameError if any one of objects doess not exist. |
|
98 | Raises NameError if any one of objects doess not exist. | |
99 | """ |
|
99 | """ | |
100 |
|
100 | |||
101 | def push_function(namespace): |
|
101 | def push_function(namespace): | |
102 | """Push a dict of key, function pairs into the user's namespace. |
|
102 | """Push a dict of key, function pairs into the user's namespace. | |
103 |
|
103 | |||
104 | Returns a deferred to None or a failure.""" |
|
104 | Returns a deferred to None or a failure.""" | |
105 |
|
105 | |||
106 | def pull_function(keys): |
|
106 | def pull_function(keys): | |
107 | """Pulls functions out of the user's namespace by keys. |
|
107 | """Pulls functions out of the user's namespace by keys. | |
108 |
|
108 | |||
109 | Returns a deferred to a tuple of functions or a single function. |
|
109 | Returns a deferred to a tuple of functions or a single function. | |
110 |
|
110 | |||
111 | Raises NameError if any one of the functions does not exist. |
|
111 | Raises NameError if any one of the functions does not exist. | |
112 | """ |
|
112 | """ | |
113 |
|
113 | |||
114 | def get_result(i=None): |
|
114 | def get_result(i=None): | |
115 | """Get the stdin/stdout/stderr of command i. |
|
115 | """Get the stdin/stdout/stderr of command i. | |
116 |
|
116 | |||
117 | Returns a deferred to a dict with keys |
|
117 | Returns a deferred to a dict with keys | |
118 | (id, number, stdin, stdout, stderr). |
|
118 | (id, number, stdin, stdout, stderr). | |
119 |
|
119 | |||
120 | Raises IndexError if command i does not exist. |
|
120 | Raises IndexError if command i does not exist. | |
121 | Raises TypeError if i in not an int. |
|
121 | Raises TypeError if i in not an int. | |
122 | """ |
|
122 | """ | |
123 |
|
123 | |||
124 | def reset(): |
|
124 | def reset(): | |
125 | """Reset the shell. |
|
125 | """Reset the shell. | |
126 |
|
126 | |||
127 | This clears the users namespace. Won't cause modules to be |
|
127 | This clears the users namespace. Won't cause modules to be | |
128 | reloaded. Should also re-initialize certain variables like id. |
|
128 | reloaded. Should also re-initialize certain variables like id. | |
129 | """ |
|
129 | """ | |
130 |
|
130 | |||
131 | def kill(): |
|
131 | def kill(): | |
132 | """Kill the engine by stopping the reactor.""" |
|
132 | """Kill the engine by stopping the reactor.""" | |
133 |
|
133 | |||
134 | def keys(): |
|
134 | def keys(): | |
135 | """Return the top level variables in the users namspace. |
|
135 | """Return the top level variables in the users namspace. | |
136 |
|
136 | |||
137 | Returns a deferred to a dict.""" |
|
137 | Returns a deferred to a dict.""" | |
138 |
|
138 | |||
139 |
|
139 | |||
140 | class IEngineSerialized(zi.Interface): |
|
140 | class IEngineSerialized(zi.Interface): | |
141 | """Push/Pull methods that take Serialized objects. |
|
141 | """Push/Pull methods that take Serialized objects. | |
142 |
|
142 | |||
143 | All methods should return deferreds. |
|
143 | All methods should return deferreds. | |
144 | """ |
|
144 | """ | |
145 |
|
145 | |||
146 | def push_serialized(namespace): |
|
146 | def push_serialized(namespace): | |
147 | """Push a dict of keys and Serialized objects into the user's namespace.""" |
|
147 | """Push a dict of keys and Serialized objects into the user's namespace.""" | |
148 |
|
148 | |||
149 | def pull_serialized(keys): |
|
149 | def pull_serialized(keys): | |
150 | """Pull objects by key from the user's namespace as Serialized. |
|
150 | """Pull objects by key from the user's namespace as Serialized. | |
151 |
|
151 | |||
152 | Returns a list of or one Serialized. |
|
152 | Returns a list of or one Serialized. | |
153 |
|
153 | |||
154 | Raises NameError is any one of the objects does not exist. |
|
154 | Raises NameError is any one of the objects does not exist. | |
155 | """ |
|
155 | """ | |
156 |
|
156 | |||
157 |
|
157 | |||
158 | class IEngineProperties(zi.Interface): |
|
158 | class IEngineProperties(zi.Interface): | |
159 | """Methods for access to the properties object of an Engine""" |
|
159 | """Methods for access to the properties object of an Engine""" | |
160 |
|
160 | |||
161 | properties = zi.Attribute("A StrictDict object, containing the properties") |
|
161 | properties = zi.Attribute("A StrictDict object, containing the properties") | |
162 |
|
162 | |||
163 | def set_properties(properties): |
|
163 | def set_properties(properties): | |
164 | """set properties by key and value""" |
|
164 | """set properties by key and value""" | |
165 |
|
165 | |||
166 | def get_properties(keys=None): |
|
166 | def get_properties(keys=None): | |
167 | """get a list of properties by `keys`, if no keys specified, get all""" |
|
167 | """get a list of properties by `keys`, if no keys specified, get all""" | |
168 |
|
168 | |||
169 | def del_properties(keys): |
|
169 | def del_properties(keys): | |
170 | """delete properties by `keys`""" |
|
170 | """delete properties by `keys`""" | |
171 |
|
171 | |||
172 | def has_properties(keys): |
|
172 | def has_properties(keys): | |
173 | """get a list of bool values for whether `properties` has `keys`""" |
|
173 | """get a list of bool values for whether `properties` has `keys`""" | |
174 |
|
174 | |||
175 | def clear_properties(): |
|
175 | def clear_properties(): | |
176 | """clear the properties dict""" |
|
176 | """clear the properties dict""" | |
177 |
|
177 | |||
178 | class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties): |
|
178 | class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties): | |
179 | """The basic engine interface that EngineService will implement. |
|
179 | """The basic engine interface that EngineService will implement. | |
180 |
|
180 | |||
181 | This exists so it is easy to specify adapters that adapt to and from the |
|
181 | This exists so it is easy to specify adapters that adapt to and from the | |
182 | API that the basic EngineService implements. |
|
182 | API that the basic EngineService implements. | |
183 | """ |
|
183 | """ | |
184 | pass |
|
184 | pass | |
185 |
|
185 | |||
186 | class IEngineQueued(IEngineBase): |
|
186 | class IEngineQueued(IEngineBase): | |
187 | """Interface for adding a queue to an IEngineBase. |
|
187 | """Interface for adding a queue to an IEngineBase. | |
188 |
|
188 | |||
189 | This interface extends the IEngineBase interface to add methods for managing |
|
189 | This interface extends the IEngineBase interface to add methods for managing | |
190 | the engine's queue. The implicit details of this interface are that the |
|
190 | the engine's queue. The implicit details of this interface are that the | |
191 | execution of all methods declared in IEngineBase should appropriately be |
|
191 | execution of all methods declared in IEngineBase should appropriately be | |
192 | put through a queue before execution. |
|
192 | put through a queue before execution. | |
193 |
|
193 | |||
194 | All methods should return deferreds. |
|
194 | All methods should return deferreds. | |
195 | """ |
|
195 | """ | |
196 |
|
196 | |||
197 | def clear_queue(): |
|
197 | def clear_queue(): | |
198 | """Clear the queue.""" |
|
198 | """Clear the queue.""" | |
199 |
|
199 | |||
200 | def queue_status(): |
|
200 | def queue_status(): | |
201 | """Get the queued and pending commands in the queue.""" |
|
201 | """Get the queued and pending commands in the queue.""" | |
202 |
|
202 | |||
203 | def register_failure_observer(obs): |
|
203 | def register_failure_observer(obs): | |
204 | """Register an observer of pending Failures. |
|
204 | """Register an observer of pending Failures. | |
205 |
|
205 | |||
206 | The observer must implement IFailureObserver. |
|
206 | The observer must implement IFailureObserver. | |
207 | """ |
|
207 | """ | |
208 |
|
208 | |||
209 | def unregister_failure_observer(obs): |
|
209 | def unregister_failure_observer(obs): | |
210 | """Unregister an observer of pending Failures.""" |
|
210 | """Unregister an observer of pending Failures.""" | |
211 |
|
211 | |||
212 |
|
212 | |||
213 | class IEngineThreaded(zi.Interface): |
|
213 | class IEngineThreaded(zi.Interface): | |
214 | """A place holder for threaded commands. |
|
214 | """A place holder for threaded commands. | |
215 |
|
215 | |||
216 | All methods should return deferreds. |
|
216 | All methods should return deferreds. | |
217 | """ |
|
217 | """ | |
218 | pass |
|
218 | pass | |
219 |
|
219 | |||
220 |
|
220 | |||
221 | #------------------------------------------------------------------------------- |
|
221 | #------------------------------------------------------------------------------- | |
222 | # Functions and classes to implement the EngineService |
|
222 | # Functions and classes to implement the EngineService | |
223 | #------------------------------------------------------------------------------- |
|
223 | #------------------------------------------------------------------------------- | |
224 |
|
224 | |||
225 |
|
225 | |||
226 | class StrictDict(dict): |
|
226 | class StrictDict(dict): | |
227 | """This is a strict copying dictionary for use as the interface to the |
|
227 | """This is a strict copying dictionary for use as the interface to the | |
228 | properties of an Engine. |
|
228 | properties of an Engine. | |
229 |
|
229 | |||
230 | :IMPORTANT: |
|
230 | :IMPORTANT: | |
231 | This object copies the values you set to it, and returns copies to you |
|
231 | This object copies the values you set to it, and returns copies to you | |
232 | when you request them. The only way to change properties os explicitly |
|
232 | when you request them. The only way to change properties os explicitly | |
233 | through the setitem and getitem of the dictionary interface. |
|
233 | through the setitem and getitem of the dictionary interface. | |
234 |
|
234 | |||
235 | Example: |
|
235 | Example: | |
236 | >>> e = get_engine(id) |
|
236 | >>> e = get_engine(id) | |
237 | >>> L = [1,2,3] |
|
237 | >>> L = [1,2,3] | |
238 | >>> e.properties['L'] = L |
|
238 | >>> e.properties['L'] = L | |
239 | >>> L == e.properties['L'] |
|
239 | >>> L == e.properties['L'] | |
240 | True |
|
240 | True | |
241 | >>> L.append(99) |
|
241 | >>> L.append(99) | |
242 | >>> L == e.properties['L'] |
|
242 | >>> L == e.properties['L'] | |
243 | False |
|
243 | False | |
244 |
|
244 | |||
245 | Note that getitem copies, so calls to methods of objects do not affect |
|
245 | Note that getitem copies, so calls to methods of objects do not affect | |
246 | the properties, as seen here: |
|
246 | the properties, as seen here: | |
247 |
|
247 | |||
248 | >>> e.properties[1] = range(2) |
|
248 | >>> e.properties[1] = range(2) | |
249 | >>> print e.properties[1] |
|
249 | >>> print e.properties[1] | |
250 | [0, 1] |
|
250 | [0, 1] | |
251 | >>> e.properties[1].append(2) |
|
251 | >>> e.properties[1].append(2) | |
252 | >>> print e.properties[1] |
|
252 | >>> print e.properties[1] | |
253 | [0, 1] |
|
253 | [0, 1] | |
254 | """ |
|
254 | """ | |
255 | def __init__(self, *args, **kwargs): |
|
255 | def __init__(self, *args, **kwargs): | |
256 | dict.__init__(self, *args, **kwargs) |
|
256 | dict.__init__(self, *args, **kwargs) | |
257 | self.modified = True |
|
257 | self.modified = True | |
258 |
|
258 | |||
259 | def __getitem__(self, key): |
|
259 | def __getitem__(self, key): | |
260 | return copy.deepcopy(dict.__getitem__(self, key)) |
|
260 | return copy.deepcopy(dict.__getitem__(self, key)) | |
261 |
|
261 | |||
262 | def __setitem__(self, key, value): |
|
262 | def __setitem__(self, key, value): | |
263 | # check if this entry is valid for transport around the network |
|
263 | # check if this entry is valid for transport around the network | |
264 | # and copying |
|
264 | # and copying | |
265 | try: |
|
265 | try: | |
266 | pickle.dumps(key, 2) |
|
266 | pickle.dumps(key, 2) | |
267 | pickle.dumps(value, 2) |
|
267 | pickle.dumps(value, 2) | |
268 | newvalue = copy.deepcopy(value) |
|
268 | newvalue = copy.deepcopy(value) | |
269 | except: |
|
269 | except: | |
270 | raise error.InvalidProperty(value) |
|
270 | raise error.InvalidProperty(value) | |
271 | dict.__setitem__(self, key, newvalue) |
|
271 | dict.__setitem__(self, key, newvalue) | |
272 | self.modified = True |
|
272 | self.modified = True | |
273 |
|
273 | |||
274 | def __delitem__(self, key): |
|
274 | def __delitem__(self, key): | |
275 | dict.__delitem__(self, key) |
|
275 | dict.__delitem__(self, key) | |
276 | self.modified = True |
|
276 | self.modified = True | |
277 |
|
277 | |||
278 | def update(self, dikt): |
|
278 | def update(self, dikt): | |
279 | for k,v in dikt.iteritems(): |
|
279 | for k,v in dikt.iteritems(): | |
280 | self[k] = v |
|
280 | self[k] = v | |
281 |
|
281 | |||
282 | def pop(self, key): |
|
282 | def pop(self, key): | |
283 | self.modified = True |
|
283 | self.modified = True | |
284 | return dict.pop(self, key) |
|
284 | return dict.pop(self, key) | |
285 |
|
285 | |||
286 | def popitem(self): |
|
286 | def popitem(self): | |
287 | self.modified = True |
|
287 | self.modified = True | |
288 | return dict.popitem(self) |
|
288 | return dict.popitem(self) | |
289 |
|
289 | |||
290 | def clear(self): |
|
290 | def clear(self): | |
291 | self.modified = True |
|
291 | self.modified = True | |
292 | dict.clear(self) |
|
292 | dict.clear(self) | |
293 |
|
293 | |||
294 | def subDict(self, *keys): |
|
294 | def subDict(self, *keys): | |
295 | d = {} |
|
295 | d = {} | |
296 | for key in keys: |
|
296 | for key in keys: | |
297 | d[key] = self[key] |
|
297 | d[key] = self[key] | |
298 | return d |
|
298 | return d | |
299 |
|
299 | |||
300 |
|
300 | |||
301 |
|
301 | |||
302 | class EngineAPI(object): |
|
302 | class EngineAPI(object): | |
303 | """This is the object through which the user can edit the `properties` |
|
303 | """This is the object through which the user can edit the `properties` | |
304 | attribute of an Engine. |
|
304 | attribute of an Engine. | |
305 | The Engine Properties object copies all object in and out of itself. |
|
305 | The Engine Properties object copies all object in and out of itself. | |
306 | See the EngineProperties object for details. |
|
306 | See the EngineProperties object for details. | |
307 | """ |
|
307 | """ | |
308 | _fix=False |
|
308 | _fix=False | |
309 | def __init__(self, id): |
|
309 | def __init__(self, id): | |
310 | self.id = id |
|
310 | self.id = id | |
311 | self.properties = StrictDict() |
|
311 | self.properties = StrictDict() | |
312 | self._fix=True |
|
312 | self._fix=True | |
313 |
|
313 | |||
314 | def __setattr__(self, k,v): |
|
314 | def __setattr__(self, k,v): | |
315 | if self._fix: |
|
315 | if self._fix: | |
316 | raise error.KernelError("I am protected!") |
|
316 | raise error.KernelError("I am protected!") | |
317 | else: |
|
317 | else: | |
318 | object.__setattr__(self, k, v) |
|
318 | object.__setattr__(self, k, v) | |
319 |
|
319 | |||
320 | def __delattr__(self, key): |
|
320 | def __delattr__(self, key): | |
321 | raise error.KernelError("I am protected!") |
|
321 | raise error.KernelError("I am protected!") | |
322 |
|
322 | |||
323 |
|
323 | |||
324 | _apiDict = {} |
|
324 | _apiDict = {} | |
325 |
|
325 | |||
326 | def get_engine(id): |
|
326 | def get_engine(id): | |
327 | """Get the Engine API object, whcih currently just provides the properties |
|
327 | """Get the Engine API object, whcih currently just provides the properties | |
328 | object, by ID""" |
|
328 | object, by ID""" | |
329 | global _apiDict |
|
329 | global _apiDict | |
330 | if not _apiDict.get(id): |
|
330 | if not _apiDict.get(id): | |
331 | _apiDict[id] = EngineAPI(id) |
|
331 | _apiDict[id] = EngineAPI(id) | |
332 | return _apiDict[id] |
|
332 | return _apiDict[id] | |
333 |
|
333 | |||
334 | def drop_engine(id): |
|
334 | def drop_engine(id): | |
335 | """remove an engine""" |
|
335 | """remove an engine""" | |
336 | global _apiDict |
|
336 | global _apiDict | |
337 | if _apiDict.has_key(id): |
|
337 | if _apiDict.has_key(id): | |
338 | del _apiDict[id] |
|
338 | del _apiDict[id] | |
339 |
|
339 | |||
340 | class EngineService(object, service.Service): |
|
340 | class EngineService(object, service.Service): | |
341 | """Adapt a IPython shell into a IEngine implementing Twisted Service.""" |
|
341 | """Adapt a IPython shell into a IEngine implementing Twisted Service.""" | |
342 |
|
342 | |||
343 | zi.implements(IEngineBase) |
|
343 | zi.implements(IEngineBase) | |
344 | name = 'EngineService' |
|
344 | name = 'EngineService' | |
345 |
|
345 | |||
346 | def __init__(self, shellClass=Interpreter, mpi=None): |
|
346 | def __init__(self, shellClass=Interpreter, mpi=None): | |
347 | """Create an EngineService. |
|
347 | """Create an EngineService. | |
348 |
|
348 | |||
349 | shellClass: something that implements IInterpreter or core1 |
|
349 | shellClass: something that implements IInterpreter or core1 | |
350 | mpi: an mpi module that has rank and size attributes |
|
350 | mpi: an mpi module that has rank and size attributes | |
351 | """ |
|
351 | """ | |
352 | self.shellClass = shellClass |
|
352 | self.shellClass = shellClass | |
353 | self.shell = self.shellClass() |
|
353 | self.shell = self.shellClass() | |
354 | self.mpi = mpi |
|
354 | self.mpi = mpi | |
355 | self.id = None |
|
355 | self.id = None | |
356 | self.properties = get_engine(self.id).properties |
|
356 | self.properties = get_engine(self.id).properties | |
357 | if self.mpi is not None: |
|
357 | if self.mpi is not None: | |
358 | log.msg("MPI started with rank = %i and size = %i" % |
|
358 | log.msg("MPI started with rank = %i and size = %i" % | |
359 | (self.mpi.rank, self.mpi.size)) |
|
359 | (self.mpi.rank, self.mpi.size)) | |
360 | self.id = self.mpi.rank |
|
360 | self.id = self.mpi.rank | |
361 | self._seedNamespace() |
|
361 | self._seedNamespace() | |
362 |
|
362 | |||
363 | # Make id a property so that the shell can get the updated id |
|
363 | # Make id a property so that the shell can get the updated id | |
364 |
|
364 | |||
365 | def _setID(self, id): |
|
365 | def _setID(self, id): | |
366 | self._id = id |
|
366 | self._id = id | |
367 | self.properties = get_engine(id).properties |
|
367 | self.properties = get_engine(id).properties | |
368 | self.shell.push({'id': id}) |
|
368 | self.shell.push({'id': id}) | |
369 |
|
369 | |||
370 | def _getID(self): |
|
370 | def _getID(self): | |
371 | return self._id |
|
371 | return self._id | |
372 |
|
372 | |||
373 | id = property(_getID, _setID) |
|
373 | id = property(_getID, _setID) | |
374 |
|
374 | |||
375 | def _seedNamespace(self): |
|
375 | def _seedNamespace(self): | |
376 | self.shell.push({'mpi': self.mpi, 'id' : self.id}) |
|
376 | self.shell.push({'mpi': self.mpi, 'id' : self.id}) | |
377 |
|
377 | |||
378 | def executeAndRaise(self, msg, callable, *args, **kwargs): |
|
378 | def executeAndRaise(self, msg, callable, *args, **kwargs): | |
379 | """Call a method of self.shell and wrap any exception.""" |
|
379 | """Call a method of self.shell and wrap any exception.""" | |
380 | d = defer.Deferred() |
|
380 | d = defer.Deferred() | |
381 | try: |
|
381 | try: | |
382 | result = callable(*args, **kwargs) |
|
382 | result = callable(*args, **kwargs) | |
383 | except: |
|
383 | except: | |
384 | # This gives the following: |
|
384 | # This gives the following: | |
385 | # et=exception class |
|
385 | # et=exception class | |
386 | # ev=exception class instance |
|
386 | # ev=exception class instance | |
387 | # tb=traceback object |
|
387 | # tb=traceback object | |
388 | et,ev,tb = sys.exc_info() |
|
388 | et,ev,tb = sys.exc_info() | |
389 | # This call adds attributes to the exception value |
|
389 | # This call adds attributes to the exception value | |
390 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) |
|
390 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) | |
391 | # Add another attribute |
|
391 | # Add another attribute | |
392 | ev._ipython_engine_info = msg |
|
392 | ev._ipython_engine_info = msg | |
393 | f = failure.Failure(ev,et,None) |
|
393 | f = failure.Failure(ev,et,None) | |
394 | d.errback(f) |
|
394 | d.errback(f) | |
395 | else: |
|
395 | else: | |
396 | d.callback(result) |
|
396 | d.callback(result) | |
397 |
|
397 | |||
398 | return d |
|
398 | return d | |
399 |
|
399 | |||
400 |
|
400 | |||
401 | # The IEngine methods. See the interface for documentation. |
|
401 | # The IEngine methods. See the interface for documentation. | |
402 |
|
402 | |||
|
403 | @profile | |||
403 | def execute(self, lines): |
|
404 | def execute(self, lines): | |
404 | msg = {'engineid':self.id, |
|
405 | msg = {'engineid':self.id, | |
405 | 'method':'execute', |
|
406 | 'method':'execute', | |
406 | 'args':[lines]} |
|
407 | 'args':[lines]} | |
407 | d = self.executeAndRaise(msg, self.shell.execute, lines) |
|
408 | d = self.executeAndRaise(msg, self.shell.execute, lines) | |
408 | d.addCallback(self.addIDToResult) |
|
409 | d.addCallback(self.addIDToResult) | |
409 | return d |
|
410 | return d | |
410 |
|
411 | |||
411 | def addIDToResult(self, result): |
|
412 | def addIDToResult(self, result): | |
412 | result['id'] = self.id |
|
413 | result['id'] = self.id | |
413 | return result |
|
414 | return result | |
414 |
|
415 | |||
415 | def push(self, namespace): |
|
416 | def push(self, namespace): | |
416 | msg = {'engineid':self.id, |
|
417 | msg = {'engineid':self.id, | |
417 | 'method':'push', |
|
418 | 'method':'push', | |
418 | 'args':[repr(namespace.keys())]} |
|
419 | 'args':[repr(namespace.keys())]} | |
419 | d = self.executeAndRaise(msg, self.shell.push, namespace) |
|
420 | d = self.executeAndRaise(msg, self.shell.push, namespace) | |
420 | return d |
|
421 | return d | |
421 |
|
422 | |||
422 | def pull(self, keys): |
|
423 | def pull(self, keys): | |
423 | msg = {'engineid':self.id, |
|
424 | msg = {'engineid':self.id, | |
424 | 'method':'pull', |
|
425 | 'method':'pull', | |
425 | 'args':[repr(keys)]} |
|
426 | 'args':[repr(keys)]} | |
426 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
427 | d = self.executeAndRaise(msg, self.shell.pull, keys) | |
427 | return d |
|
428 | return d | |
428 |
|
429 | |||
429 | def push_function(self, namespace): |
|
430 | def push_function(self, namespace): | |
430 | msg = {'engineid':self.id, |
|
431 | msg = {'engineid':self.id, | |
431 | 'method':'push_function', |
|
432 | 'method':'push_function', | |
432 | 'args':[repr(namespace.keys())]} |
|
433 | 'args':[repr(namespace.keys())]} | |
433 | d = self.executeAndRaise(msg, self.shell.push_function, namespace) |
|
434 | d = self.executeAndRaise(msg, self.shell.push_function, namespace) | |
434 | return d |
|
435 | return d | |
435 |
|
436 | |||
436 | def pull_function(self, keys): |
|
437 | def pull_function(self, keys): | |
437 | msg = {'engineid':self.id, |
|
438 | msg = {'engineid':self.id, | |
438 | 'method':'pull_function', |
|
439 | 'method':'pull_function', | |
439 | 'args':[repr(keys)]} |
|
440 | 'args':[repr(keys)]} | |
440 | d = self.executeAndRaise(msg, self.shell.pull_function, keys) |
|
441 | d = self.executeAndRaise(msg, self.shell.pull_function, keys) | |
441 | return d |
|
442 | return d | |
442 |
|
443 | |||
443 | def get_result(self, i=None): |
|
444 | def get_result(self, i=None): | |
444 | msg = {'engineid':self.id, |
|
445 | msg = {'engineid':self.id, | |
445 | 'method':'get_result', |
|
446 | 'method':'get_result', | |
446 | 'args':[repr(i)]} |
|
447 | 'args':[repr(i)]} | |
447 | d = self.executeAndRaise(msg, self.shell.getCommand, i) |
|
448 | d = self.executeAndRaise(msg, self.shell.getCommand, i) | |
448 | d.addCallback(self.addIDToResult) |
|
449 | d.addCallback(self.addIDToResult) | |
449 | return d |
|
450 | return d | |
450 |
|
451 | |||
451 | def reset(self): |
|
452 | def reset(self): | |
452 | msg = {'engineid':self.id, |
|
453 | msg = {'engineid':self.id, | |
453 | 'method':'reset', |
|
454 | 'method':'reset', | |
454 | 'args':[]} |
|
455 | 'args':[]} | |
455 | del self.shell |
|
456 | del self.shell | |
456 | self.shell = self.shellClass() |
|
457 | self.shell = self.shellClass() | |
457 | self.properties.clear() |
|
458 | self.properties.clear() | |
458 | d = self.executeAndRaise(msg, self._seedNamespace) |
|
459 | d = self.executeAndRaise(msg, self._seedNamespace) | |
459 | return d |
|
460 | return d | |
460 |
|
461 | |||
461 | def kill(self): |
|
462 | def kill(self): | |
462 | drop_engine(self.id) |
|
463 | drop_engine(self.id) | |
463 | try: |
|
464 | try: | |
464 | reactor.stop() |
|
465 | reactor.stop() | |
465 | except RuntimeError: |
|
466 | except RuntimeError: | |
466 | log.msg('The reactor was not running apparently.') |
|
467 | log.msg('The reactor was not running apparently.') | |
467 | return defer.fail() |
|
468 | return defer.fail() | |
468 | else: |
|
469 | else: | |
469 | return defer.succeed(None) |
|
470 | return defer.succeed(None) | |
470 |
|
471 | |||
471 | def keys(self): |
|
472 | def keys(self): | |
472 | """Return a list of variables names in the users top level namespace. |
|
473 | """Return a list of variables names in the users top level namespace. | |
473 |
|
474 | |||
474 | This used to return a dict of all the keys/repr(values) in the |
|
475 | This used to return a dict of all the keys/repr(values) in the | |
475 | user's namespace. This was too much info for the ControllerService |
|
476 | user's namespace. This was too much info for the ControllerService | |
476 | to handle so it is now just a list of keys. |
|
477 | to handle so it is now just a list of keys. | |
477 | """ |
|
478 | """ | |
478 |
|
479 | |||
479 | remotes = [] |
|
480 | remotes = [] | |
480 | for k in self.shell.user_ns.iterkeys(): |
|
481 | for k in self.shell.user_ns.iterkeys(): | |
481 | if k not in ['__name__', '_ih', '_oh', '__builtins__', |
|
482 | if k not in ['__name__', '_ih', '_oh', '__builtins__', | |
482 | 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']: |
|
483 | 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input']: | |
483 | remotes.append(k) |
|
484 | remotes.append(k) | |
484 | return defer.succeed(remotes) |
|
485 | return defer.succeed(remotes) | |
485 |
|
486 | |||
486 | def set_properties(self, properties): |
|
487 | def set_properties(self, properties): | |
487 | msg = {'engineid':self.id, |
|
488 | msg = {'engineid':self.id, | |
488 | 'method':'set_properties', |
|
489 | 'method':'set_properties', | |
489 | 'args':[repr(properties.keys())]} |
|
490 | 'args':[repr(properties.keys())]} | |
490 | return self.executeAndRaise(msg, self.properties.update, properties) |
|
491 | return self.executeAndRaise(msg, self.properties.update, properties) | |
491 |
|
492 | |||
492 | def get_properties(self, keys=None): |
|
493 | def get_properties(self, keys=None): | |
493 | msg = {'engineid':self.id, |
|
494 | msg = {'engineid':self.id, | |
494 | 'method':'get_properties', |
|
495 | 'method':'get_properties', | |
495 | 'args':[repr(keys)]} |
|
496 | 'args':[repr(keys)]} | |
496 | if keys is None: |
|
497 | if keys is None: | |
497 | keys = self.properties.keys() |
|
498 | keys = self.properties.keys() | |
498 | return self.executeAndRaise(msg, self.properties.subDict, *keys) |
|
499 | return self.executeAndRaise(msg, self.properties.subDict, *keys) | |
499 |
|
500 | |||
500 | def _doDel(self, keys): |
|
501 | def _doDel(self, keys): | |
501 | for key in keys: |
|
502 | for key in keys: | |
502 | del self.properties[key] |
|
503 | del self.properties[key] | |
503 |
|
504 | |||
504 | def del_properties(self, keys): |
|
505 | def del_properties(self, keys): | |
505 | msg = {'engineid':self.id, |
|
506 | msg = {'engineid':self.id, | |
506 | 'method':'del_properties', |
|
507 | 'method':'del_properties', | |
507 | 'args':[repr(keys)]} |
|
508 | 'args':[repr(keys)]} | |
508 | return self.executeAndRaise(msg, self._doDel, keys) |
|
509 | return self.executeAndRaise(msg, self._doDel, keys) | |
509 |
|
510 | |||
510 | def _doHas(self, keys): |
|
511 | def _doHas(self, keys): | |
511 | return [self.properties.has_key(key) for key in keys] |
|
512 | return [self.properties.has_key(key) for key in keys] | |
512 |
|
513 | |||
513 | def has_properties(self, keys): |
|
514 | def has_properties(self, keys): | |
514 | msg = {'engineid':self.id, |
|
515 | msg = {'engineid':self.id, | |
515 | 'method':'has_properties', |
|
516 | 'method':'has_properties', | |
516 | 'args':[repr(keys)]} |
|
517 | 'args':[repr(keys)]} | |
517 | return self.executeAndRaise(msg, self._doHas, keys) |
|
518 | return self.executeAndRaise(msg, self._doHas, keys) | |
518 |
|
519 | |||
519 | def clear_properties(self): |
|
520 | def clear_properties(self): | |
520 | msg = {'engineid':self.id, |
|
521 | msg = {'engineid':self.id, | |
521 | 'method':'clear_properties', |
|
522 | 'method':'clear_properties', | |
522 | 'args':[]} |
|
523 | 'args':[]} | |
523 | return self.executeAndRaise(msg, self.properties.clear) |
|
524 | return self.executeAndRaise(msg, self.properties.clear) | |
524 |
|
525 | |||
525 | def push_serialized(self, sNamespace): |
|
526 | def push_serialized(self, sNamespace): | |
526 | msg = {'engineid':self.id, |
|
527 | msg = {'engineid':self.id, | |
527 | 'method':'push_serialized', |
|
528 | 'method':'push_serialized', | |
528 | 'args':[repr(sNamespace.keys())]} |
|
529 | 'args':[repr(sNamespace.keys())]} | |
529 | ns = {} |
|
530 | ns = {} | |
530 | for k,v in sNamespace.iteritems(): |
|
531 | for k,v in sNamespace.iteritems(): | |
531 | try: |
|
532 | try: | |
532 | unserialized = newserialized.IUnSerialized(v) |
|
533 | unserialized = newserialized.IUnSerialized(v) | |
533 | ns[k] = unserialized.getObject() |
|
534 | ns[k] = unserialized.getObject() | |
534 | except: |
|
535 | except: | |
535 | return defer.fail() |
|
536 | return defer.fail() | |
536 | return self.executeAndRaise(msg, self.shell.push, ns) |
|
537 | return self.executeAndRaise(msg, self.shell.push, ns) | |
537 |
|
538 | |||
538 | def pull_serialized(self, keys): |
|
539 | def pull_serialized(self, keys): | |
539 | msg = {'engineid':self.id, |
|
540 | msg = {'engineid':self.id, | |
540 | 'method':'pull_serialized', |
|
541 | 'method':'pull_serialized', | |
541 | 'args':[repr(keys)]} |
|
542 | 'args':[repr(keys)]} | |
542 | if isinstance(keys, str): |
|
543 | if isinstance(keys, str): | |
543 | keys = [keys] |
|
544 | keys = [keys] | |
544 | if len(keys)==1: |
|
545 | if len(keys)==1: | |
545 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
546 | d = self.executeAndRaise(msg, self.shell.pull, keys) | |
546 | d.addCallback(newserialized.serialize) |
|
547 | d.addCallback(newserialized.serialize) | |
547 | return d |
|
548 | return d | |
548 | elif len(keys)>1: |
|
549 | elif len(keys)>1: | |
549 | d = self.executeAndRaise(msg, self.shell.pull, keys) |
|
550 | d = self.executeAndRaise(msg, self.shell.pull, keys) | |
550 | @d.addCallback |
|
551 | @d.addCallback | |
551 | def packThemUp(values): |
|
552 | def packThemUp(values): | |
552 | serials = [] |
|
553 | serials = [] | |
553 | for v in values: |
|
554 | for v in values: | |
554 | try: |
|
555 | try: | |
555 | serials.append(newserialized.serialize(v)) |
|
556 | serials.append(newserialized.serialize(v)) | |
556 | except: |
|
557 | except: | |
557 | return defer.fail(failure.Failure()) |
|
558 | return defer.fail(failure.Failure()) | |
558 | return serials |
|
559 | return serials | |
559 | return packThemUp |
|
560 | return packThemUp | |
560 |
|
561 | |||
561 |
|
562 | |||
562 | def queue(methodToQueue): |
|
563 | def queue(methodToQueue): | |
563 | def queuedMethod(this, *args, **kwargs): |
|
564 | def queuedMethod(this, *args, **kwargs): | |
564 | name = methodToQueue.__name__ |
|
565 | name = methodToQueue.__name__ | |
565 | return this.submitCommand(Command(name, *args, **kwargs)) |
|
566 | return this.submitCommand(Command(name, *args, **kwargs)) | |
566 | return queuedMethod |
|
567 | return queuedMethod | |
567 |
|
568 | |||
568 | class QueuedEngine(object): |
|
569 | class QueuedEngine(object): | |
569 | """Adapt an IEngineBase to an IEngineQueued by wrapping it. |
|
570 | """Adapt an IEngineBase to an IEngineQueued by wrapping it. | |
570 |
|
571 | |||
571 | The resulting object will implement IEngineQueued which extends |
|
572 | The resulting object will implement IEngineQueued which extends | |
572 | IEngineCore which extends (IEngineBase, IEngineSerialized). |
|
573 | IEngineCore which extends (IEngineBase, IEngineSerialized). | |
573 |
|
574 | |||
574 | This seems like the best way of handling it, but I am not sure. The |
|
575 | This seems like the best way of handling it, but I am not sure. The | |
575 | other option is to have the various base interfaces be used like |
|
576 | other option is to have the various base interfaces be used like | |
576 | mix-in intefaces. The problem I have with this is adpatation is |
|
577 | mix-in intefaces. The problem I have with this is adpatation is | |
577 | more difficult and complicated because there can be can multiple |
|
578 | more difficult and complicated because there can be can multiple | |
578 | original and final Interfaces. |
|
579 | original and final Interfaces. | |
579 | """ |
|
580 | """ | |
580 |
|
581 | |||
581 | zi.implements(IEngineQueued) |
|
582 | zi.implements(IEngineQueued) | |
582 |
|
583 | |||
583 | def __init__(self, engine): |
|
584 | def __init__(self, engine): | |
584 | """Create a QueuedEngine object from an engine |
|
585 | """Create a QueuedEngine object from an engine | |
585 |
|
586 | |||
586 | engine: An implementor of IEngineCore and IEngineSerialized |
|
587 | engine: An implementor of IEngineCore and IEngineSerialized | |
587 | keepUpToDate: whether to update the remote status when the |
|
588 | keepUpToDate: whether to update the remote status when the | |
588 | queue is empty. Defaults to False. |
|
589 | queue is empty. Defaults to False. | |
589 | """ |
|
590 | """ | |
590 |
|
591 | |||
591 | # This is the right way to do these tests rather than |
|
592 | # This is the right way to do these tests rather than | |
592 | # IEngineCore in list(zi.providedBy(engine)) which will only |
|
593 | # IEngineCore in list(zi.providedBy(engine)) which will only | |
593 | # picks of the interfaces that are directly declared by engine. |
|
594 | # picks of the interfaces that are directly declared by engine. | |
594 | assert IEngineBase.providedBy(engine), \ |
|
595 | assert IEngineBase.providedBy(engine), \ | |
595 | "engine passed to QueuedEngine doesn't provide IEngineBase" |
|
596 | "engine passed to QueuedEngine doesn't provide IEngineBase" | |
596 |
|
597 | |||
597 | self.engine = engine |
|
598 | self.engine = engine | |
598 | self.id = engine.id |
|
599 | self.id = engine.id | |
599 | self.queued = [] |
|
600 | self.queued = [] | |
600 | self.history = {} |
|
601 | self.history = {} | |
601 | self.engineStatus = {} |
|
602 | self.engineStatus = {} | |
602 | self.currentCommand = None |
|
603 | self.currentCommand = None | |
603 | self.failureObservers = [] |
|
604 | self.failureObservers = [] | |
604 |
|
605 | |||
605 | def _get_properties(self): |
|
606 | def _get_properties(self): | |
606 | return self.engine.properties |
|
607 | return self.engine.properties | |
607 |
|
608 | |||
608 | properties = property(_get_properties, lambda self, _: None) |
|
609 | properties = property(_get_properties, lambda self, _: None) | |
609 | # Queue management methods. You should not call these directly |
|
610 | # Queue management methods. You should not call these directly | |
610 |
|
611 | |||
611 | def submitCommand(self, cmd): |
|
612 | def submitCommand(self, cmd): | |
612 | """Submit command to queue.""" |
|
613 | """Submit command to queue.""" | |
613 |
|
614 | |||
614 | d = defer.Deferred() |
|
615 | d = defer.Deferred() | |
615 | cmd.setDeferred(d) |
|
616 | cmd.setDeferred(d) | |
616 | if self.currentCommand is not None: |
|
617 | if self.currentCommand is not None: | |
617 | if self.currentCommand.finished: |
|
618 | if self.currentCommand.finished: | |
618 | # log.msg("Running command immediately: %r" % cmd) |
|
619 | # log.msg("Running command immediately: %r" % cmd) | |
619 | self.currentCommand = cmd |
|
620 | self.currentCommand = cmd | |
620 | self.runCurrentCommand() |
|
621 | self.runCurrentCommand() | |
621 | else: # command is still running |
|
622 | else: # command is still running | |
622 | # log.msg("Command is running: %r" % self.currentCommand) |
|
623 | # log.msg("Command is running: %r" % self.currentCommand) | |
623 | # log.msg("Queueing: %r" % cmd) |
|
624 | # log.msg("Queueing: %r" % cmd) | |
624 | self.queued.append(cmd) |
|
625 | self.queued.append(cmd) | |
625 | else: |
|
626 | else: | |
626 | # log.msg("No current commands, running: %r" % cmd) |
|
627 | # log.msg("No current commands, running: %r" % cmd) | |
627 | self.currentCommand = cmd |
|
628 | self.currentCommand = cmd | |
628 | self.runCurrentCommand() |
|
629 | self.runCurrentCommand() | |
629 | return d |
|
630 | return d | |
630 |
|
631 | |||
631 | def runCurrentCommand(self): |
|
632 | def runCurrentCommand(self): | |
632 | """Run current command.""" |
|
633 | """Run current command.""" | |
633 |
|
634 | |||
634 | cmd = self.currentCommand |
|
635 | cmd = self.currentCommand | |
635 | f = getattr(self.engine, cmd.remoteMethod, None) |
|
636 | f = getattr(self.engine, cmd.remoteMethod, None) | |
636 | if f: |
|
637 | if f: | |
637 | d = f(*cmd.args, **cmd.kwargs) |
|
638 | d = f(*cmd.args, **cmd.kwargs) | |
638 | if cmd.remoteMethod is 'execute': |
|
639 | if cmd.remoteMethod is 'execute': | |
639 | d.addCallback(self.saveResult) |
|
640 | d.addCallback(self.saveResult) | |
640 | d.addCallback(self.finishCommand) |
|
641 | d.addCallback(self.finishCommand) | |
641 | d.addErrback(self.abortCommand) |
|
642 | d.addErrback(self.abortCommand) | |
642 | else: |
|
643 | else: | |
643 | return defer.fail(AttributeError(cmd.remoteMethod)) |
|
644 | return defer.fail(AttributeError(cmd.remoteMethod)) | |
644 |
|
645 | |||
645 | def _flushQueue(self): |
|
646 | def _flushQueue(self): | |
646 | """Pop next command in queue and run it.""" |
|
647 | """Pop next command in queue and run it.""" | |
647 |
|
648 | |||
648 | if len(self.queued) > 0: |
|
649 | if len(self.queued) > 0: | |
649 | self.currentCommand = self.queued.pop(0) |
|
650 | self.currentCommand = self.queued.pop(0) | |
650 | self.runCurrentCommand() |
|
651 | self.runCurrentCommand() | |
651 |
|
652 | |||
652 | def saveResult(self, result): |
|
653 | def saveResult(self, result): | |
653 | """Put the result in the history.""" |
|
654 | """Put the result in the history.""" | |
654 | self.history[result['number']] = result |
|
655 | self.history[result['number']] = result | |
655 | return result |
|
656 | return result | |
656 |
|
657 | |||
657 | def finishCommand(self, result): |
|
658 | def finishCommand(self, result): | |
658 | """Finish currrent command.""" |
|
659 | """Finish currrent command.""" | |
659 |
|
660 | |||
660 | # The order of these commands is absolutely critical. |
|
661 | # The order of these commands is absolutely critical. | |
661 | self.currentCommand.handleResult(result) |
|
662 | self.currentCommand.handleResult(result) | |
662 | self.currentCommand.finished = True |
|
663 | self.currentCommand.finished = True | |
663 | self._flushQueue() |
|
664 | self._flushQueue() | |
664 | return result |
|
665 | return result | |
665 |
|
666 | |||
666 | def abortCommand(self, reason): |
|
667 | def abortCommand(self, reason): | |
667 | """Abort current command. |
|
668 | """Abort current command. | |
668 |
|
669 | |||
669 | This eats the Failure but first passes it onto the Deferred that the |
|
670 | This eats the Failure but first passes it onto the Deferred that the | |
670 | user has. |
|
671 | user has. | |
671 |
|
672 | |||
672 | It also clear out the queue so subsequence commands don't run. |
|
673 | It also clear out the queue so subsequence commands don't run. | |
673 | """ |
|
674 | """ | |
674 |
|
675 | |||
675 | # The order of these 3 commands is absolutely critical. The currentCommand |
|
676 | # The order of these 3 commands is absolutely critical. The currentCommand | |
676 | # must first be marked as finished BEFORE the queue is cleared and before |
|
677 | # must first be marked as finished BEFORE the queue is cleared and before | |
677 | # the current command is sent the failure. |
|
678 | # the current command is sent the failure. | |
678 | # Also, the queue must be cleared BEFORE the current command is sent the Failure |
|
679 | # Also, the queue must be cleared BEFORE the current command is sent the Failure | |
679 | # otherwise the errback chain could trigger new commands to be added to the |
|
680 | # otherwise the errback chain could trigger new commands to be added to the | |
680 | # queue before we clear it. We should clear ONLY the commands that were in |
|
681 | # queue before we clear it. We should clear ONLY the commands that were in | |
681 | # the queue when the error occured. |
|
682 | # the queue when the error occured. | |
682 | self.currentCommand.finished = True |
|
683 | self.currentCommand.finished = True | |
683 | s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs) |
|
684 | s = "%r %r %r" % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs) | |
684 | self.clear_queue(msg=s) |
|
685 | self.clear_queue(msg=s) | |
685 | self.currentCommand.handleError(reason) |
|
686 | self.currentCommand.handleError(reason) | |
686 |
|
687 | |||
687 | return None |
|
688 | return None | |
688 |
|
689 | |||
689 | #--------------------------------------------------------------------------- |
|
690 | #--------------------------------------------------------------------------- | |
690 | # IEngineCore methods |
|
691 | # IEngineCore methods | |
691 | #--------------------------------------------------------------------------- |
|
692 | #--------------------------------------------------------------------------- | |
692 |
|
693 | |||
693 | @queue |
|
694 | @queue | |
694 | def execute(self, lines): |
|
695 | def execute(self, lines): | |
695 | pass |
|
696 | pass | |
696 |
|
697 | |||
697 | @queue |
|
698 | @queue | |
698 | def push(self, namespace): |
|
699 | def push(self, namespace): | |
699 | pass |
|
700 | pass | |
700 |
|
701 | |||
701 | @queue |
|
702 | @queue | |
702 | def pull(self, keys): |
|
703 | def pull(self, keys): | |
703 | pass |
|
704 | pass | |
704 |
|
705 | |||
705 | @queue |
|
706 | @queue | |
706 | def push_function(self, namespace): |
|
707 | def push_function(self, namespace): | |
707 | pass |
|
708 | pass | |
708 |
|
709 | |||
709 | @queue |
|
710 | @queue | |
710 | def pull_function(self, keys): |
|
711 | def pull_function(self, keys): | |
711 | pass |
|
712 | pass | |
712 |
|
713 | |||
713 | def get_result(self, i=None): |
|
714 | def get_result(self, i=None): | |
714 | if i is None: |
|
715 | if i is None: | |
715 | i = max(self.history.keys()+[None]) |
|
716 | i = max(self.history.keys()+[None]) | |
716 |
|
717 | |||
717 | cmd = self.history.get(i, None) |
|
718 | cmd = self.history.get(i, None) | |
718 | # Uncomment this line to disable chaching of results |
|
719 | # Uncomment this line to disable chaching of results | |
719 | #cmd = None |
|
720 | #cmd = None | |
720 | if cmd is None: |
|
721 | if cmd is None: | |
721 | return self.submitCommand(Command('get_result', i)) |
|
722 | return self.submitCommand(Command('get_result', i)) | |
722 | else: |
|
723 | else: | |
723 | return defer.succeed(cmd) |
|
724 | return defer.succeed(cmd) | |
724 |
|
725 | |||
725 | def reset(self): |
|
726 | def reset(self): | |
726 | self.clear_queue() |
|
727 | self.clear_queue() | |
727 | self.history = {} # reset the cache - I am not sure we should do this |
|
728 | self.history = {} # reset the cache - I am not sure we should do this | |
728 | return self.submitCommand(Command('reset')) |
|
729 | return self.submitCommand(Command('reset')) | |
729 |
|
730 | |||
730 | def kill(self): |
|
731 | def kill(self): | |
731 | self.clear_queue() |
|
732 | self.clear_queue() | |
732 | return self.submitCommand(Command('kill')) |
|
733 | return self.submitCommand(Command('kill')) | |
733 |
|
734 | |||
734 | @queue |
|
735 | @queue | |
735 | def keys(self): |
|
736 | def keys(self): | |
736 | pass |
|
737 | pass | |
737 |
|
738 | |||
738 | #--------------------------------------------------------------------------- |
|
739 | #--------------------------------------------------------------------------- | |
739 | # IEngineSerialized methods |
|
740 | # IEngineSerialized methods | |
740 | #--------------------------------------------------------------------------- |
|
741 | #--------------------------------------------------------------------------- | |
741 |
|
742 | |||
742 | @queue |
|
743 | @queue | |
743 | def push_serialized(self, namespace): |
|
744 | def push_serialized(self, namespace): | |
744 | pass |
|
745 | pass | |
745 |
|
746 | |||
746 | @queue |
|
747 | @queue | |
747 | def pull_serialized(self, keys): |
|
748 | def pull_serialized(self, keys): | |
748 | pass |
|
749 | pass | |
749 |
|
750 | |||
750 | #--------------------------------------------------------------------------- |
|
751 | #--------------------------------------------------------------------------- | |
751 | # IEngineProperties methods |
|
752 | # IEngineProperties methods | |
752 | #--------------------------------------------------------------------------- |
|
753 | #--------------------------------------------------------------------------- | |
753 |
|
754 | |||
754 | @queue |
|
755 | @queue | |
755 | def set_properties(self, namespace): |
|
756 | def set_properties(self, namespace): | |
756 | pass |
|
757 | pass | |
757 |
|
758 | |||
758 | @queue |
|
759 | @queue | |
759 | def get_properties(self, keys=None): |
|
760 | def get_properties(self, keys=None): | |
760 | pass |
|
761 | pass | |
761 |
|
762 | |||
762 | @queue |
|
763 | @queue | |
763 | def del_properties(self, keys): |
|
764 | def del_properties(self, keys): | |
764 | pass |
|
765 | pass | |
765 |
|
766 | |||
766 | @queue |
|
767 | @queue | |
767 | def has_properties(self, keys): |
|
768 | def has_properties(self, keys): | |
768 | pass |
|
769 | pass | |
769 |
|
770 | |||
770 | @queue |
|
771 | @queue | |
771 | def clear_properties(self): |
|
772 | def clear_properties(self): | |
772 | pass |
|
773 | pass | |
773 |
|
774 | |||
774 | #--------------------------------------------------------------------------- |
|
775 | #--------------------------------------------------------------------------- | |
775 | # IQueuedEngine methods |
|
776 | # IQueuedEngine methods | |
776 | #--------------------------------------------------------------------------- |
|
777 | #--------------------------------------------------------------------------- | |
777 |
|
778 | |||
778 | def clear_queue(self, msg=''): |
|
779 | def clear_queue(self, msg=''): | |
779 | """Clear the queue, but doesn't cancel the currently running commmand.""" |
|
780 | """Clear the queue, but doesn't cancel the currently running commmand.""" | |
780 |
|
781 | |||
781 | for cmd in self.queued: |
|
782 | for cmd in self.queued: | |
782 | cmd.deferred.errback(failure.Failure(error.QueueCleared(msg))) |
|
783 | cmd.deferred.errback(failure.Failure(error.QueueCleared(msg))) | |
783 | self.queued = [] |
|
784 | self.queued = [] | |
784 | return defer.succeed(None) |
|
785 | return defer.succeed(None) | |
785 |
|
786 | |||
786 | def queue_status(self): |
|
787 | def queue_status(self): | |
787 | if self.currentCommand is not None: |
|
788 | if self.currentCommand is not None: | |
788 | if self.currentCommand.finished: |
|
789 | if self.currentCommand.finished: | |
789 | pending = repr(None) |
|
790 | pending = repr(None) | |
790 | else: |
|
791 | else: | |
791 | pending = repr(self.currentCommand) |
|
792 | pending = repr(self.currentCommand) | |
792 | else: |
|
793 | else: | |
793 | pending = repr(None) |
|
794 | pending = repr(None) | |
794 | dikt = {'queue':map(repr,self.queued), 'pending':pending} |
|
795 | dikt = {'queue':map(repr,self.queued), 'pending':pending} | |
795 | return defer.succeed(dikt) |
|
796 | return defer.succeed(dikt) | |
796 |
|
797 | |||
797 | def register_failure_observer(self, obs): |
|
798 | def register_failure_observer(self, obs): | |
798 | self.failureObservers.append(obs) |
|
799 | self.failureObservers.append(obs) | |
799 |
|
800 | |||
800 | def unregister_failure_observer(self, obs): |
|
801 | def unregister_failure_observer(self, obs): | |
801 | self.failureObservers.remove(obs) |
|
802 | self.failureObservers.remove(obs) | |
802 |
|
803 | |||
803 |
|
804 | |||
804 | # Now register QueuedEngine as an adpater class that makes an IEngineBase into a |
|
805 | # Now register QueuedEngine as an adpater class that makes an IEngineBase into a | |
805 | # IEngineQueued. |
|
806 | # IEngineQueued. | |
806 | components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued) |
|
807 | components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued) | |
807 |
|
808 | |||
808 |
|
809 | |||
809 | class Command(object): |
|
810 | class Command(object): | |
810 | """A command object that encapslates queued commands. |
|
811 | """A command object that encapslates queued commands. | |
811 |
|
812 | |||
812 | This class basically keeps track of a command that has been queued |
|
813 | This class basically keeps track of a command that has been queued | |
813 | in a QueuedEngine. It manages the deferreds and hold the method to be called |
|
814 | in a QueuedEngine. It manages the deferreds and hold the method to be called | |
814 | and the arguments to that method. |
|
815 | and the arguments to that method. | |
815 | """ |
|
816 | """ | |
816 |
|
817 | |||
817 |
|
818 | |||
818 | def __init__(self, remoteMethod, *args, **kwargs): |
|
819 | def __init__(self, remoteMethod, *args, **kwargs): | |
819 | """Build a new Command object.""" |
|
820 | """Build a new Command object.""" | |
820 |
|
821 | |||
821 | self.remoteMethod = remoteMethod |
|
822 | self.remoteMethod = remoteMethod | |
822 | self.args = args |
|
823 | self.args = args | |
823 | self.kwargs = kwargs |
|
824 | self.kwargs = kwargs | |
824 | self.finished = False |
|
825 | self.finished = False | |
825 |
|
826 | |||
826 | def setDeferred(self, d): |
|
827 | def setDeferred(self, d): | |
827 | """Sets the deferred attribute of the Command.""" |
|
828 | """Sets the deferred attribute of the Command.""" | |
828 |
|
829 | |||
829 | self.deferred = d |
|
830 | self.deferred = d | |
830 |
|
831 | |||
831 | def __repr__(self): |
|
832 | def __repr__(self): | |
832 | if not self.args: |
|
833 | if not self.args: | |
833 | args = '' |
|
834 | args = '' | |
834 | else: |
|
835 | else: | |
835 | args = str(self.args)[1:-2] #cut off (...,) |
|
836 | args = str(self.args)[1:-2] #cut off (...,) | |
836 | for k,v in self.kwargs.iteritems(): |
|
837 | for k,v in self.kwargs.iteritems(): | |
837 | if args: |
|
838 | if args: | |
838 | args += ', ' |
|
839 | args += ', ' | |
839 | args += '%s=%r' %(k,v) |
|
840 | args += '%s=%r' %(k,v) | |
840 | return "%s(%s)" %(self.remoteMethod, args) |
|
841 | return "%s(%s)" %(self.remoteMethod, args) | |
841 |
|
842 | |||
842 | def handleResult(self, result): |
|
843 | def handleResult(self, result): | |
843 | """When the result is ready, relay it to self.deferred.""" |
|
844 | """When the result is ready, relay it to self.deferred.""" | |
844 |
|
845 | |||
845 | self.deferred.callback(result) |
|
846 | self.deferred.callback(result) | |
846 |
|
847 | |||
847 | def handleError(self, reason): |
|
848 | def handleError(self, reason): | |
848 | """When an error has occured, relay it to self.deferred.""" |
|
849 | """When an error has occured, relay it to self.deferred.""" | |
849 |
|
850 | |||
850 | self.deferred.errback(reason) |
|
851 | self.deferred.errback(reason) | |
851 |
|
852 | |||
852 | class ThreadedEngineService(EngineService): |
|
853 | class ThreadedEngineService(EngineService): | |
853 | """An EngineService subclass that defers execute commands to a separate |
|
854 | """An EngineService subclass that defers execute commands to a separate | |
854 | thread. |
|
855 | thread. | |
855 |
|
856 | |||
856 | ThreadedEngineService uses twisted.internet.threads.deferToThread to |
|
857 | ThreadedEngineService uses twisted.internet.threads.deferToThread to | |
857 | defer execute requests to a separate thread. GUI frontends may want to |
|
858 | defer execute requests to a separate thread. GUI frontends may want to | |
858 | use ThreadedEngineService as the engine in an |
|
859 | use ThreadedEngineService as the engine in an | |
859 | IPython.frontend.frontendbase.FrontEndBase subclass to prevent |
|
860 | IPython.frontend.frontendbase.FrontEndBase subclass to prevent | |
860 | block execution from blocking the GUI thread. |
|
861 | block execution from blocking the GUI thread. | |
861 | """ |
|
862 | """ | |
862 |
|
863 | |||
863 | zi.implements(IEngineBase) |
|
864 | zi.implements(IEngineBase) | |
864 |
|
865 | |||
865 | def __init__(self, shellClass=Interpreter, mpi=None): |
|
866 | def __init__(self, shellClass=Interpreter, mpi=None): | |
866 | EngineService.__init__(self, shellClass, mpi) |
|
867 | EngineService.__init__(self, shellClass, mpi) | |
867 |
|
868 | |||
868 | def wrapped_execute(self, msg, lines): |
|
869 | def wrapped_execute(self, msg, lines): | |
869 | """Wrap self.shell.execute to add extra information to tracebacks""" |
|
870 | """Wrap self.shell.execute to add extra information to tracebacks""" | |
870 |
|
871 | |||
871 | try: |
|
872 | try: | |
872 | result = self.shell.execute(lines) |
|
873 | result = self.shell.execute(lines) | |
873 | except Exception,e: |
|
874 | except Exception,e: | |
874 | # This gives the following: |
|
875 | # This gives the following: | |
875 | # et=exception class |
|
876 | # et=exception class | |
876 | # ev=exception class instance |
|
877 | # ev=exception class instance | |
877 | # tb=traceback object |
|
878 | # tb=traceback object | |
878 | et,ev,tb = sys.exc_info() |
|
879 | et,ev,tb = sys.exc_info() | |
879 | # This call adds attributes to the exception value |
|
880 | # This call adds attributes to the exception value | |
880 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) |
|
881 | et,ev,tb = self.shell.formatTraceback(et,ev,tb,msg) | |
881 | # Add another attribute |
|
882 | # Add another attribute | |
882 |
|
883 | |||
883 | # Create a new exception with the new attributes |
|
884 | # Create a new exception with the new attributes | |
884 | e = et(ev._ipython_traceback_text) |
|
885 | e = et(ev._ipython_traceback_text) | |
885 | e._ipython_engine_info = msg |
|
886 | e._ipython_engine_info = msg | |
886 |
|
887 | |||
887 | # Re-raise |
|
888 | # Re-raise | |
888 | raise e |
|
889 | raise e | |
889 |
|
890 | |||
890 | return result |
|
891 | return result | |
891 |
|
892 | |||
892 |
|
893 | |||
893 | def execute(self, lines): |
|
894 | def execute(self, lines): | |
894 | # Only import this if we are going to use this class |
|
895 | # Only import this if we are going to use this class | |
895 | from twisted.internet import threads |
|
896 | from twisted.internet import threads | |
896 |
|
897 | |||
897 | msg = {'engineid':self.id, |
|
898 | msg = {'engineid':self.id, | |
898 | 'method':'execute', |
|
899 | 'method':'execute', | |
899 | 'args':[lines]} |
|
900 | 'args':[lines]} | |
900 |
|
901 | |||
901 | d = threads.deferToThread(self.wrapped_execute, msg, lines) |
|
902 | d = threads.deferToThread(self.wrapped_execute, msg, lines) | |
902 | d.addCallback(self.addIDToResult) |
|
903 | d.addCallback(self.addIDToResult) | |
903 | return d |
|
904 | return d |
@@ -1,753 +1,754 b'' | |||||
1 | # encoding: utf-8 |
|
1 | # encoding: utf-8 | |
2 | # -*- test-case-name: IPython.kernel.test.test_multiengine -*- |
|
2 | # -*- test-case-name: IPython.kernel.test.test_multiengine -*- | |
3 |
|
3 | |||
4 | """Adapt the IPython ControllerServer to IMultiEngine. |
|
4 | """Adapt the IPython ControllerServer to IMultiEngine. | |
5 |
|
5 | |||
6 | This module provides classes that adapt a ControllerService to the |
|
6 | This module provides classes that adapt a ControllerService to the | |
7 | IMultiEngine interface. This interface is a basic interactive interface |
|
7 | IMultiEngine interface. This interface is a basic interactive interface | |
8 | for working with a set of engines where it is desired to have explicit |
|
8 | for working with a set of engines where it is desired to have explicit | |
9 | access to each registered engine. |
|
9 | access to each registered engine. | |
10 |
|
10 | |||
11 | The classes here are exposed to the network in files like: |
|
11 | The classes here are exposed to the network in files like: | |
12 |
|
12 | |||
13 | * multienginevanilla.py |
|
13 | * multienginevanilla.py | |
14 | * multienginepb.py |
|
14 | * multienginepb.py | |
15 | """ |
|
15 | """ | |
16 |
|
16 | |||
17 | __docformat__ = "restructuredtext en" |
|
17 | __docformat__ = "restructuredtext en" | |
18 |
|
18 | |||
19 | #------------------------------------------------------------------------------- |
|
19 | #------------------------------------------------------------------------------- | |
20 | # Copyright (C) 2008 The IPython Development Team |
|
20 | # Copyright (C) 2008 The IPython Development Team | |
21 | # |
|
21 | # | |
22 | # Distributed under the terms of the BSD License. The full license is in |
|
22 | # Distributed under the terms of the BSD License. The full license is in | |
23 | # the file COPYING, distributed as part of this software. |
|
23 | # the file COPYING, distributed as part of this software. | |
24 | #------------------------------------------------------------------------------- |
|
24 | #------------------------------------------------------------------------------- | |
25 |
|
25 | |||
26 | #------------------------------------------------------------------------------- |
|
26 | #------------------------------------------------------------------------------- | |
27 | # Imports |
|
27 | # Imports | |
28 | #------------------------------------------------------------------------------- |
|
28 | #------------------------------------------------------------------------------- | |
29 |
|
29 | |||
30 | from new import instancemethod |
|
30 | from new import instancemethod | |
31 | from types import FunctionType |
|
31 | from types import FunctionType | |
32 |
|
32 | |||
33 | from twisted.application import service |
|
33 | from twisted.application import service | |
34 | from twisted.internet import defer, reactor |
|
34 | from twisted.internet import defer, reactor | |
35 | from twisted.python import log, components, failure |
|
35 | from twisted.python import log, components, failure | |
36 | from zope.interface import Interface, implements, Attribute |
|
36 | from zope.interface import Interface, implements, Attribute | |
37 |
|
37 | |||
38 | from IPython.tools import growl |
|
38 | from IPython.tools import growl | |
39 | from IPython.kernel.util import printer |
|
39 | from IPython.kernel.util import printer | |
40 | from IPython.kernel.twistedutil import gatherBoth |
|
40 | from IPython.kernel.twistedutil import gatherBoth | |
41 | from IPython.kernel import map as Map |
|
41 | from IPython.kernel import map as Map | |
42 | from IPython.kernel import error |
|
42 | from IPython.kernel import error | |
43 | from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase |
|
43 | from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase | |
44 | from IPython.kernel.controllerservice import \ |
|
44 | from IPython.kernel.controllerservice import \ | |
45 | ControllerAdapterBase, \ |
|
45 | ControllerAdapterBase, \ | |
46 | ControllerService, \ |
|
46 | ControllerService, \ | |
47 | IControllerBase |
|
47 | IControllerBase | |
48 |
|
48 | |||
49 |
|
49 | |||
50 | #------------------------------------------------------------------------------- |
|
50 | #------------------------------------------------------------------------------- | |
51 | # Interfaces for the MultiEngine representation of a controller |
|
51 | # Interfaces for the MultiEngine representation of a controller | |
52 | #------------------------------------------------------------------------------- |
|
52 | #------------------------------------------------------------------------------- | |
53 |
|
53 | |||
54 | class IEngineMultiplexer(Interface): |
|
54 | class IEngineMultiplexer(Interface): | |
55 | """Interface to multiple engines implementing IEngineCore/Serialized/Queued. |
|
55 | """Interface to multiple engines implementing IEngineCore/Serialized/Queued. | |
56 |
|
56 | |||
57 | This class simply acts as a multiplexer of methods that are in the |
|
57 | This class simply acts as a multiplexer of methods that are in the | |
58 | various IEngines* interfaces. Thus the methods here are jut like those |
|
58 | various IEngines* interfaces. Thus the methods here are jut like those | |
59 | in the IEngine* interfaces, but with an extra first argument, targets. |
|
59 | in the IEngine* interfaces, but with an extra first argument, targets. | |
60 | The targets argument can have the following forms: |
|
60 | The targets argument can have the following forms: | |
61 |
|
61 | |||
62 | * targets = 10 # Engines are indexed by ints |
|
62 | * targets = 10 # Engines are indexed by ints | |
63 | * targets = [0,1,2,3] # A list of ints |
|
63 | * targets = [0,1,2,3] # A list of ints | |
64 | * targets = 'all' # A string to indicate all targets |
|
64 | * targets = 'all' # A string to indicate all targets | |
65 |
|
65 | |||
66 | If targets is bad in any way, an InvalidEngineID will be raised. This |
|
66 | If targets is bad in any way, an InvalidEngineID will be raised. This | |
67 | includes engines not being registered. |
|
67 | includes engines not being registered. | |
68 |
|
68 | |||
69 | All IEngineMultiplexer multiplexer methods must return a Deferred to a list |
|
69 | All IEngineMultiplexer multiplexer methods must return a Deferred to a list | |
70 | with length equal to the number of targets. The elements of the list will |
|
70 | with length equal to the number of targets. The elements of the list will | |
71 | correspond to the return of the corresponding IEngine method. |
|
71 | correspond to the return of the corresponding IEngine method. | |
72 |
|
72 | |||
73 | Failures are aggressive, meaning that if an action fails for any target, |
|
73 | Failures are aggressive, meaning that if an action fails for any target, | |
74 | the overall action will fail immediately with that Failure. |
|
74 | the overall action will fail immediately with that Failure. | |
75 |
|
75 | |||
76 | :Parameters: |
|
76 | :Parameters: | |
77 | targets : int, list of ints, or 'all' |
|
77 | targets : int, list of ints, or 'all' | |
78 | Engine ids the action will apply to. |
|
78 | Engine ids the action will apply to. | |
79 |
|
79 | |||
80 | :Returns: Deferred to a list of results for each engine. |
|
80 | :Returns: Deferred to a list of results for each engine. | |
81 |
|
81 | |||
82 | :Exception: |
|
82 | :Exception: | |
83 | InvalidEngineID |
|
83 | InvalidEngineID | |
84 | If the targets argument is bad or engines aren't registered. |
|
84 | If the targets argument is bad or engines aren't registered. | |
85 | NoEnginesRegistered |
|
85 | NoEnginesRegistered | |
86 | If there are no engines registered and targets='all' |
|
86 | If there are no engines registered and targets='all' | |
87 | """ |
|
87 | """ | |
88 |
|
88 | |||
89 | #--------------------------------------------------------------------------- |
|
89 | #--------------------------------------------------------------------------- | |
90 | # Mutiplexed methods |
|
90 | # Mutiplexed methods | |
91 | #--------------------------------------------------------------------------- |
|
91 | #--------------------------------------------------------------------------- | |
92 |
|
92 | |||
93 | def execute(lines, targets='all'): |
|
93 | def execute(lines, targets='all'): | |
94 | """Execute lines of Python code on targets. |
|
94 | """Execute lines of Python code on targets. | |
95 |
|
95 | |||
96 | See the class docstring for information about targets and possible |
|
96 | See the class docstring for information about targets and possible | |
97 | exceptions this method can raise. |
|
97 | exceptions this method can raise. | |
98 |
|
98 | |||
99 | :Parameters: |
|
99 | :Parameters: | |
100 | lines : str |
|
100 | lines : str | |
101 | String of python code to be executed on targets. |
|
101 | String of python code to be executed on targets. | |
102 | """ |
|
102 | """ | |
103 |
|
103 | |||
104 | def push(namespace, targets='all'): |
|
104 | def push(namespace, targets='all'): | |
105 | """Push dict namespace into the user's namespace on targets. |
|
105 | """Push dict namespace into the user's namespace on targets. | |
106 |
|
106 | |||
107 | See the class docstring for information about targets and possible |
|
107 | See the class docstring for information about targets and possible | |
108 | exceptions this method can raise. |
|
108 | exceptions this method can raise. | |
109 |
|
109 | |||
110 | :Parameters: |
|
110 | :Parameters: | |
111 | namspace : dict |
|
111 | namspace : dict | |
112 | Dict of key value pairs to be put into the users namspace. |
|
112 | Dict of key value pairs to be put into the users namspace. | |
113 | """ |
|
113 | """ | |
114 |
|
114 | |||
115 | def pull(keys, targets='all'): |
|
115 | def pull(keys, targets='all'): | |
116 | """Pull values out of the user's namespace on targets by keys. |
|
116 | """Pull values out of the user's namespace on targets by keys. | |
117 |
|
117 | |||
118 | See the class docstring for information about targets and possible |
|
118 | See the class docstring for information about targets and possible | |
119 | exceptions this method can raise. |
|
119 | exceptions this method can raise. | |
120 |
|
120 | |||
121 | :Parameters: |
|
121 | :Parameters: | |
122 | keys : tuple of strings |
|
122 | keys : tuple of strings | |
123 | Sequence of keys to be pulled from user's namespace. |
|
123 | Sequence of keys to be pulled from user's namespace. | |
124 | """ |
|
124 | """ | |
125 |
|
125 | |||
126 | def push_function(namespace, targets='all'): |
|
126 | def push_function(namespace, targets='all'): | |
127 | """""" |
|
127 | """""" | |
128 |
|
128 | |||
129 | def pull_function(keys, targets='all'): |
|
129 | def pull_function(keys, targets='all'): | |
130 | """""" |
|
130 | """""" | |
131 |
|
131 | |||
132 | def get_result(i=None, targets='all'): |
|
132 | def get_result(i=None, targets='all'): | |
133 | """Get the result for command i from targets. |
|
133 | """Get the result for command i from targets. | |
134 |
|
134 | |||
135 | See the class docstring for information about targets and possible |
|
135 | See the class docstring for information about targets and possible | |
136 | exceptions this method can raise. |
|
136 | exceptions this method can raise. | |
137 |
|
137 | |||
138 | :Parameters: |
|
138 | :Parameters: | |
139 | i : int or None |
|
139 | i : int or None | |
140 | Command index or None to indicate most recent command. |
|
140 | Command index or None to indicate most recent command. | |
141 | """ |
|
141 | """ | |
142 |
|
142 | |||
143 | def reset(targets='all'): |
|
143 | def reset(targets='all'): | |
144 | """Reset targets. |
|
144 | """Reset targets. | |
145 |
|
145 | |||
146 | This clears the users namespace of the Engines, but won't cause |
|
146 | This clears the users namespace of the Engines, but won't cause | |
147 | modules to be reloaded. |
|
147 | modules to be reloaded. | |
148 | """ |
|
148 | """ | |
149 |
|
149 | |||
150 | def keys(targets='all'): |
|
150 | def keys(targets='all'): | |
151 | """Get variable names defined in user's namespace on targets.""" |
|
151 | """Get variable names defined in user's namespace on targets.""" | |
152 |
|
152 | |||
153 | def kill(controller=False, targets='all'): |
|
153 | def kill(controller=False, targets='all'): | |
154 | """Kill the targets Engines and possibly the controller. |
|
154 | """Kill the targets Engines and possibly the controller. | |
155 |
|
155 | |||
156 | :Parameters: |
|
156 | :Parameters: | |
157 | controller : boolean |
|
157 | controller : boolean | |
158 | Should the controller be killed as well. If so all the |
|
158 | Should the controller be killed as well. If so all the | |
159 | engines will be killed first no matter what targets is. |
|
159 | engines will be killed first no matter what targets is. | |
160 | """ |
|
160 | """ | |
161 |
|
161 | |||
162 | def push_serialized(namespace, targets='all'): |
|
162 | def push_serialized(namespace, targets='all'): | |
163 | """Push a namespace of Serialized objects to targets. |
|
163 | """Push a namespace of Serialized objects to targets. | |
164 |
|
164 | |||
165 | :Parameters: |
|
165 | :Parameters: | |
166 | namespace : dict |
|
166 | namespace : dict | |
167 | A dict whose keys are the variable names and whose values |
|
167 | A dict whose keys are the variable names and whose values | |
168 | are serialized version of the objects. |
|
168 | are serialized version of the objects. | |
169 | """ |
|
169 | """ | |
170 |
|
170 | |||
171 | def pull_serialized(keys, targets='all'): |
|
171 | def pull_serialized(keys, targets='all'): | |
172 | """Pull Serialized objects by keys from targets. |
|
172 | """Pull Serialized objects by keys from targets. | |
173 |
|
173 | |||
174 | :Parameters: |
|
174 | :Parameters: | |
175 | keys : tuple of strings |
|
175 | keys : tuple of strings | |
176 | Sequence of variable names to pull as serialized objects. |
|
176 | Sequence of variable names to pull as serialized objects. | |
177 | """ |
|
177 | """ | |
178 |
|
178 | |||
179 | def clear_queue(targets='all'): |
|
179 | def clear_queue(targets='all'): | |
180 | """Clear the queue of pending command for targets.""" |
|
180 | """Clear the queue of pending command for targets.""" | |
181 |
|
181 | |||
182 | def queue_status(targets='all'): |
|
182 | def queue_status(targets='all'): | |
183 | """Get the status of the queue on the targets.""" |
|
183 | """Get the status of the queue on the targets.""" | |
184 |
|
184 | |||
185 | def set_properties(properties, targets='all'): |
|
185 | def set_properties(properties, targets='all'): | |
186 | """set properties by key and value""" |
|
186 | """set properties by key and value""" | |
187 |
|
187 | |||
188 | def get_properties(keys=None, targets='all'): |
|
188 | def get_properties(keys=None, targets='all'): | |
189 | """get a list of properties by `keys`, if no keys specified, get all""" |
|
189 | """get a list of properties by `keys`, if no keys specified, get all""" | |
190 |
|
190 | |||
191 | def del_properties(keys, targets='all'): |
|
191 | def del_properties(keys, targets='all'): | |
192 | """delete properties by `keys`""" |
|
192 | """delete properties by `keys`""" | |
193 |
|
193 | |||
194 | def has_properties(keys, targets='all'): |
|
194 | def has_properties(keys, targets='all'): | |
195 | """get a list of bool values for whether `properties` has `keys`""" |
|
195 | """get a list of bool values for whether `properties` has `keys`""" | |
196 |
|
196 | |||
197 | def clear_properties(targets='all'): |
|
197 | def clear_properties(targets='all'): | |
198 | """clear the properties dict""" |
|
198 | """clear the properties dict""" | |
199 |
|
199 | |||
200 |
|
200 | |||
201 | class IMultiEngine(IEngineMultiplexer): |
|
201 | class IMultiEngine(IEngineMultiplexer): | |
202 | """A controller that exposes an explicit interface to all of its engines. |
|
202 | """A controller that exposes an explicit interface to all of its engines. | |
203 |
|
203 | |||
204 | This is the primary inteface for interactive usage. |
|
204 | This is the primary inteface for interactive usage. | |
205 | """ |
|
205 | """ | |
206 |
|
206 | |||
207 | def get_ids(): |
|
207 | def get_ids(): | |
208 | """Return list of currently registered ids. |
|
208 | """Return list of currently registered ids. | |
209 |
|
209 | |||
210 | :Returns: A Deferred to a list of registered engine ids. |
|
210 | :Returns: A Deferred to a list of registered engine ids. | |
211 | """ |
|
211 | """ | |
212 |
|
212 | |||
213 |
|
213 | |||
214 |
|
214 | |||
215 | #------------------------------------------------------------------------------- |
|
215 | #------------------------------------------------------------------------------- | |
216 | # Implementation of the core MultiEngine classes |
|
216 | # Implementation of the core MultiEngine classes | |
217 | #------------------------------------------------------------------------------- |
|
217 | #------------------------------------------------------------------------------- | |
218 |
|
218 | |||
219 | class MultiEngine(ControllerAdapterBase): |
|
219 | class MultiEngine(ControllerAdapterBase): | |
220 | """The representation of a ControllerService as a IMultiEngine. |
|
220 | """The representation of a ControllerService as a IMultiEngine. | |
221 |
|
221 | |||
222 | Although it is not implemented currently, this class would be where a |
|
222 | Although it is not implemented currently, this class would be where a | |
223 | client/notification API is implemented. It could inherit from something |
|
223 | client/notification API is implemented. It could inherit from something | |
224 | like results.NotifierParent and then use the notify method to send |
|
224 | like results.NotifierParent and then use the notify method to send | |
225 | notifications. |
|
225 | notifications. | |
226 | """ |
|
226 | """ | |
227 |
|
227 | |||
228 | implements(IMultiEngine) |
|
228 | implements(IMultiEngine) | |
229 |
|
229 | |||
230 | def __init(self, controller): |
|
230 | def __init(self, controller): | |
231 | ControllerAdapterBase.__init__(self, controller) |
|
231 | ControllerAdapterBase.__init__(self, controller) | |
232 |
|
232 | |||
233 | #--------------------------------------------------------------------------- |
|
233 | #--------------------------------------------------------------------------- | |
234 | # Helper methods |
|
234 | # Helper methods | |
235 | #--------------------------------------------------------------------------- |
|
235 | #--------------------------------------------------------------------------- | |
236 |
|
236 | |||
237 | def engineList(self, targets): |
|
237 | def engineList(self, targets): | |
238 | """Parse the targets argument into a list of valid engine objects. |
|
238 | """Parse the targets argument into a list of valid engine objects. | |
239 |
|
239 | |||
240 | :Parameters: |
|
240 | :Parameters: | |
241 | targets : int, list of ints or 'all' |
|
241 | targets : int, list of ints or 'all' | |
242 | The targets argument to be parsed. |
|
242 | The targets argument to be parsed. | |
243 |
|
243 | |||
244 | :Returns: List of engine objects. |
|
244 | :Returns: List of engine objects. | |
245 |
|
245 | |||
246 | :Exception: |
|
246 | :Exception: | |
247 | InvalidEngineID |
|
247 | InvalidEngineID | |
248 | If targets is not valid or if an engine is not registered. |
|
248 | If targets is not valid or if an engine is not registered. | |
249 | """ |
|
249 | """ | |
250 | if isinstance(targets, int): |
|
250 | if isinstance(targets, int): | |
251 | if targets not in self.engines.keys(): |
|
251 | if targets not in self.engines.keys(): | |
252 | log.msg("Engine with id %i is not registered" % targets) |
|
252 | log.msg("Engine with id %i is not registered" % targets) | |
253 | raise error.InvalidEngineID("Engine with id %i is not registered" % targets) |
|
253 | raise error.InvalidEngineID("Engine with id %i is not registered" % targets) | |
254 | else: |
|
254 | else: | |
255 | return [self.engines[targets]] |
|
255 | return [self.engines[targets]] | |
256 | elif isinstance(targets, (list, tuple)): |
|
256 | elif isinstance(targets, (list, tuple)): | |
257 | for id in targets: |
|
257 | for id in targets: | |
258 | if id not in self.engines.keys(): |
|
258 | if id not in self.engines.keys(): | |
259 | log.msg("Engine with id %r is not registered" % id) |
|
259 | log.msg("Engine with id %r is not registered" % id) | |
260 | raise error.InvalidEngineID("Engine with id %r is not registered" % id) |
|
260 | raise error.InvalidEngineID("Engine with id %r is not registered" % id) | |
261 | return map(self.engines.get, targets) |
|
261 | return map(self.engines.get, targets) | |
262 | elif targets == 'all': |
|
262 | elif targets == 'all': | |
263 | eList = self.engines.values() |
|
263 | eList = self.engines.values() | |
264 | if len(eList) == 0: |
|
264 | if len(eList) == 0: | |
265 | msg = """There are no engines registered. |
|
265 | msg = """There are no engines registered. | |
266 | Check the logs in ~/.ipython/log if you think there should have been.""" |
|
266 | Check the logs in ~/.ipython/log if you think there should have been.""" | |
267 | raise error.NoEnginesRegistered(msg) |
|
267 | raise error.NoEnginesRegistered(msg) | |
268 | else: |
|
268 | else: | |
269 | return eList |
|
269 | return eList | |
270 | else: |
|
270 | else: | |
271 | raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets) |
|
271 | raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets) | |
272 |
|
272 | |||
273 | def _performOnEngines(self, methodName, *args, **kwargs): |
|
273 | def _performOnEngines(self, methodName, *args, **kwargs): | |
274 | """Calls a method on engines and returns deferred to list of results. |
|
274 | """Calls a method on engines and returns deferred to list of results. | |
275 |
|
275 | |||
276 | :Parameters: |
|
276 | :Parameters: | |
277 | methodName : str |
|
277 | methodName : str | |
278 | Name of the method to be called. |
|
278 | Name of the method to be called. | |
279 | targets : int, list of ints, 'all' |
|
279 | targets : int, list of ints, 'all' | |
280 | The targets argument to be parsed into a list of engine objects. |
|
280 | The targets argument to be parsed into a list of engine objects. | |
281 | args |
|
281 | args | |
282 | The positional keyword arguments to be passed to the engines. |
|
282 | The positional keyword arguments to be passed to the engines. | |
283 | kwargs |
|
283 | kwargs | |
284 | The keyword arguments passed to the method |
|
284 | The keyword arguments passed to the method | |
285 |
|
285 | |||
286 | :Returns: List of deferreds to the results on each engine |
|
286 | :Returns: List of deferreds to the results on each engine | |
287 |
|
287 | |||
288 | :Exception: |
|
288 | :Exception: | |
289 | InvalidEngineID |
|
289 | InvalidEngineID | |
290 | If the targets argument is bad in any way. |
|
290 | If the targets argument is bad in any way. | |
291 | AttributeError |
|
291 | AttributeError | |
292 | If the method doesn't exist on one of the engines. |
|
292 | If the method doesn't exist on one of the engines. | |
293 | """ |
|
293 | """ | |
294 | targets = kwargs.pop('targets') |
|
294 | targets = kwargs.pop('targets') | |
295 | log.msg("Performing %s on %r" % (methodName, targets)) |
|
295 | log.msg("Performing %s on %r" % (methodName, targets)) | |
296 | # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets)) |
|
296 | # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets)) | |
297 | # This will and should raise if targets is not valid! |
|
297 | # This will and should raise if targets is not valid! | |
298 | engines = self.engineList(targets) |
|
298 | engines = self.engineList(targets) | |
299 | dList = [] |
|
299 | dList = [] | |
300 | for e in engines: |
|
300 | for e in engines: | |
301 | meth = getattr(e, methodName, None) |
|
301 | meth = getattr(e, methodName, None) | |
302 | if meth is not None: |
|
302 | if meth is not None: | |
303 | dList.append(meth(*args, **kwargs)) |
|
303 | dList.append(meth(*args, **kwargs)) | |
304 | else: |
|
304 | else: | |
305 | raise AttributeError("Engine %i does not have method %s" % (e.id, methodName)) |
|
305 | raise AttributeError("Engine %i does not have method %s" % (e.id, methodName)) | |
306 | return dList |
|
306 | return dList | |
307 |
|
307 | |||
308 | def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs): |
|
308 | def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs): | |
309 | """Called _performOnEngines and wraps result/exception into deferred.""" |
|
309 | """Called _performOnEngines and wraps result/exception into deferred.""" | |
310 | try: |
|
310 | try: | |
311 | dList = self._performOnEngines(methodName, *args, **kwargs) |
|
311 | dList = self._performOnEngines(methodName, *args, **kwargs) | |
312 | except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered): |
|
312 | except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered): | |
313 | return defer.fail(failure.Failure()) |
|
313 | return defer.fail(failure.Failure()) | |
314 | else: |
|
314 | else: | |
315 | # Having fireOnOneErrback is causing problems with the determinacy |
|
315 | # Having fireOnOneErrback is causing problems with the determinacy | |
316 | # of the system. Basically, once a single engine has errbacked, this |
|
316 | # of the system. Basically, once a single engine has errbacked, this | |
317 | # method returns. In some cases, this will cause client to submit |
|
317 | # method returns. In some cases, this will cause client to submit | |
318 | # another command. Because the previous command is still running |
|
318 | # another command. Because the previous command is still running | |
319 | # on some engines, this command will be queued. When those commands |
|
319 | # on some engines, this command will be queued. When those commands | |
320 | # then errback, the second command will raise QueueCleared. Ahhh! |
|
320 | # then errback, the second command will raise QueueCleared. Ahhh! | |
321 | d = gatherBoth(dList, |
|
321 | d = gatherBoth(dList, | |
322 | fireOnOneErrback=0, |
|
322 | fireOnOneErrback=0, | |
323 | consumeErrors=1, |
|
323 | consumeErrors=1, | |
324 | logErrors=0) |
|
324 | logErrors=0) | |
325 | d.addCallback(error.collect_exceptions, methodName) |
|
325 | d.addCallback(error.collect_exceptions, methodName) | |
326 | return d |
|
326 | return d | |
327 |
|
327 | |||
328 | #--------------------------------------------------------------------------- |
|
328 | #--------------------------------------------------------------------------- | |
329 | # General IMultiEngine methods |
|
329 | # General IMultiEngine methods | |
330 | #--------------------------------------------------------------------------- |
|
330 | #--------------------------------------------------------------------------- | |
331 |
|
331 | |||
332 | def get_ids(self): |
|
332 | def get_ids(self): | |
333 | return defer.succeed(self.engines.keys()) |
|
333 | return defer.succeed(self.engines.keys()) | |
334 |
|
334 | |||
335 | #--------------------------------------------------------------------------- |
|
335 | #--------------------------------------------------------------------------- | |
336 | # IEngineMultiplexer methods |
|
336 | # IEngineMultiplexer methods | |
337 | #--------------------------------------------------------------------------- |
|
337 | #--------------------------------------------------------------------------- | |
338 |
|
338 | |||
339 | def execute(self, lines, targets='all'): |
|
339 | def execute(self, lines, targets='all'): | |
340 | return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets) |
|
340 | return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets) | |
341 |
|
341 | |||
342 | def push(self, ns, targets='all'): |
|
342 | def push(self, ns, targets='all'): | |
343 | return self._performOnEnginesAndGatherBoth('push', ns, targets=targets) |
|
343 | return self._performOnEnginesAndGatherBoth('push', ns, targets=targets) | |
344 |
|
344 | |||
345 | def pull(self, keys, targets='all'): |
|
345 | def pull(self, keys, targets='all'): | |
346 | return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets) |
|
346 | return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets) | |
347 |
|
347 | |||
348 | def push_function(self, ns, targets='all'): |
|
348 | def push_function(self, ns, targets='all'): | |
349 | return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets) |
|
349 | return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets) | |
350 |
|
350 | |||
351 | def pull_function(self, keys, targets='all'): |
|
351 | def pull_function(self, keys, targets='all'): | |
352 | return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets) |
|
352 | return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets) | |
353 |
|
353 | |||
354 | def get_result(self, i=None, targets='all'): |
|
354 | def get_result(self, i=None, targets='all'): | |
355 | return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets) |
|
355 | return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets) | |
356 |
|
356 | |||
357 | def reset(self, targets='all'): |
|
357 | def reset(self, targets='all'): | |
358 | return self._performOnEnginesAndGatherBoth('reset', targets=targets) |
|
358 | return self._performOnEnginesAndGatherBoth('reset', targets=targets) | |
359 |
|
359 | |||
360 | def keys(self, targets='all'): |
|
360 | def keys(self, targets='all'): | |
361 | return self._performOnEnginesAndGatherBoth('keys', targets=targets) |
|
361 | return self._performOnEnginesAndGatherBoth('keys', targets=targets) | |
362 |
|
362 | |||
363 | def kill(self, controller=False, targets='all'): |
|
363 | def kill(self, controller=False, targets='all'): | |
364 | if controller: |
|
364 | if controller: | |
365 | targets = 'all' |
|
365 | targets = 'all' | |
366 | d = self._performOnEnginesAndGatherBoth('kill', targets=targets) |
|
366 | d = self._performOnEnginesAndGatherBoth('kill', targets=targets) | |
367 | if controller: |
|
367 | if controller: | |
368 | log.msg("Killing controller") |
|
368 | log.msg("Killing controller") | |
369 | d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop)) |
|
369 | d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop)) | |
370 | # Consume any weird stuff coming back |
|
370 | # Consume any weird stuff coming back | |
371 | d.addBoth(lambda _: None) |
|
371 | d.addBoth(lambda _: None) | |
372 | return d |
|
372 | return d | |
373 |
|
373 | |||
374 | def push_serialized(self, namespace, targets='all'): |
|
374 | def push_serialized(self, namespace, targets='all'): | |
375 | for k, v in namespace.iteritems(): |
|
375 | for k, v in namespace.iteritems(): | |
376 | log.msg("Pushed object %s is %f MB" % (k, v.getDataSize())) |
|
376 | log.msg("Pushed object %s is %f MB" % (k, v.getDataSize())) | |
377 | d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets) |
|
377 | d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets) | |
378 | return d |
|
378 | return d | |
379 |
|
379 | |||
380 | def pull_serialized(self, keys, targets='all'): |
|
380 | def pull_serialized(self, keys, targets='all'): | |
381 | try: |
|
381 | try: | |
382 | dList = self._performOnEngines('pull_serialized', keys, targets=targets) |
|
382 | dList = self._performOnEngines('pull_serialized', keys, targets=targets) | |
383 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
383 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
384 | return defer.fail(failure.Failure()) |
|
384 | return defer.fail(failure.Failure()) | |
385 | else: |
|
385 | else: | |
386 | for d in dList: |
|
386 | for d in dList: | |
387 | d.addCallback(self._logSizes) |
|
387 | d.addCallback(self._logSizes) | |
388 | d = gatherBoth(dList, |
|
388 | d = gatherBoth(dList, | |
389 | fireOnOneErrback=0, |
|
389 | fireOnOneErrback=0, | |
390 | consumeErrors=1, |
|
390 | consumeErrors=1, | |
391 | logErrors=0) |
|
391 | logErrors=0) | |
392 | d.addCallback(error.collect_exceptions, 'pull_serialized') |
|
392 | d.addCallback(error.collect_exceptions, 'pull_serialized') | |
393 | return d |
|
393 | return d | |
394 |
|
394 | |||
395 | def _logSizes(self, listOfSerialized): |
|
395 | def _logSizes(self, listOfSerialized): | |
396 | if isinstance(listOfSerialized, (list, tuple)): |
|
396 | if isinstance(listOfSerialized, (list, tuple)): | |
397 | for s in listOfSerialized: |
|
397 | for s in listOfSerialized: | |
398 | log.msg("Pulled object is %f MB" % s.getDataSize()) |
|
398 | log.msg("Pulled object is %f MB" % s.getDataSize()) | |
399 | else: |
|
399 | else: | |
400 | log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize()) |
|
400 | log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize()) | |
401 | return listOfSerialized |
|
401 | return listOfSerialized | |
402 |
|
402 | |||
403 | def clear_queue(self, targets='all'): |
|
403 | def clear_queue(self, targets='all'): | |
404 | return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets) |
|
404 | return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets) | |
405 |
|
405 | |||
406 | def queue_status(self, targets='all'): |
|
406 | def queue_status(self, targets='all'): | |
407 | log.msg("Getting queue status on %r" % targets) |
|
407 | log.msg("Getting queue status on %r" % targets) | |
408 | try: |
|
408 | try: | |
409 | engines = self.engineList(targets) |
|
409 | engines = self.engineList(targets) | |
410 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
410 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
411 | return defer.fail(failure.Failure()) |
|
411 | return defer.fail(failure.Failure()) | |
412 | else: |
|
412 | else: | |
413 | dList = [] |
|
413 | dList = [] | |
414 | for e in engines: |
|
414 | for e in engines: | |
415 | dList.append(e.queue_status().addCallback(lambda s:(e.id, s))) |
|
415 | dList.append(e.queue_status().addCallback(lambda s:(e.id, s))) | |
416 | d = gatherBoth(dList, |
|
416 | d = gatherBoth(dList, | |
417 | fireOnOneErrback=0, |
|
417 | fireOnOneErrback=0, | |
418 | consumeErrors=1, |
|
418 | consumeErrors=1, | |
419 | logErrors=0) |
|
419 | logErrors=0) | |
420 | d.addCallback(error.collect_exceptions, 'queue_status') |
|
420 | d.addCallback(error.collect_exceptions, 'queue_status') | |
421 | return d |
|
421 | return d | |
422 |
|
422 | |||
423 | def get_properties(self, keys=None, targets='all'): |
|
423 | def get_properties(self, keys=None, targets='all'): | |
424 | log.msg("Getting properties on %r" % targets) |
|
424 | log.msg("Getting properties on %r" % targets) | |
425 | try: |
|
425 | try: | |
426 | engines = self.engineList(targets) |
|
426 | engines = self.engineList(targets) | |
427 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
427 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
428 | return defer.fail(failure.Failure()) |
|
428 | return defer.fail(failure.Failure()) | |
429 | else: |
|
429 | else: | |
430 | dList = [e.get_properties(keys) for e in engines] |
|
430 | dList = [e.get_properties(keys) for e in engines] | |
431 | d = gatherBoth(dList, |
|
431 | d = gatherBoth(dList, | |
432 | fireOnOneErrback=0, |
|
432 | fireOnOneErrback=0, | |
433 | consumeErrors=1, |
|
433 | consumeErrors=1, | |
434 | logErrors=0) |
|
434 | logErrors=0) | |
435 | d.addCallback(error.collect_exceptions, 'get_properties') |
|
435 | d.addCallback(error.collect_exceptions, 'get_properties') | |
436 | return d |
|
436 | return d | |
437 |
|
437 | |||
438 | def set_properties(self, properties, targets='all'): |
|
438 | def set_properties(self, properties, targets='all'): | |
439 | log.msg("Setting properties on %r" % targets) |
|
439 | log.msg("Setting properties on %r" % targets) | |
440 | try: |
|
440 | try: | |
441 | engines = self.engineList(targets) |
|
441 | engines = self.engineList(targets) | |
442 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
442 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
443 | return defer.fail(failure.Failure()) |
|
443 | return defer.fail(failure.Failure()) | |
444 | else: |
|
444 | else: | |
445 | dList = [e.set_properties(properties) for e in engines] |
|
445 | dList = [e.set_properties(properties) for e in engines] | |
446 | d = gatherBoth(dList, |
|
446 | d = gatherBoth(dList, | |
447 | fireOnOneErrback=0, |
|
447 | fireOnOneErrback=0, | |
448 | consumeErrors=1, |
|
448 | consumeErrors=1, | |
449 | logErrors=0) |
|
449 | logErrors=0) | |
450 | d.addCallback(error.collect_exceptions, 'set_properties') |
|
450 | d.addCallback(error.collect_exceptions, 'set_properties') | |
451 | return d |
|
451 | return d | |
452 |
|
452 | |||
453 | def has_properties(self, keys, targets='all'): |
|
453 | def has_properties(self, keys, targets='all'): | |
454 | log.msg("Checking properties on %r" % targets) |
|
454 | log.msg("Checking properties on %r" % targets) | |
455 | try: |
|
455 | try: | |
456 | engines = self.engineList(targets) |
|
456 | engines = self.engineList(targets) | |
457 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
457 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
458 | return defer.fail(failure.Failure()) |
|
458 | return defer.fail(failure.Failure()) | |
459 | else: |
|
459 | else: | |
460 | dList = [e.has_properties(keys) for e in engines] |
|
460 | dList = [e.has_properties(keys) for e in engines] | |
461 | d = gatherBoth(dList, |
|
461 | d = gatherBoth(dList, | |
462 | fireOnOneErrback=0, |
|
462 | fireOnOneErrback=0, | |
463 | consumeErrors=1, |
|
463 | consumeErrors=1, | |
464 | logErrors=0) |
|
464 | logErrors=0) | |
465 | d.addCallback(error.collect_exceptions, 'has_properties') |
|
465 | d.addCallback(error.collect_exceptions, 'has_properties') | |
466 | return d |
|
466 | return d | |
467 |
|
467 | |||
468 | def del_properties(self, keys, targets='all'): |
|
468 | def del_properties(self, keys, targets='all'): | |
469 | log.msg("Deleting properties on %r" % targets) |
|
469 | log.msg("Deleting properties on %r" % targets) | |
470 | try: |
|
470 | try: | |
471 | engines = self.engineList(targets) |
|
471 | engines = self.engineList(targets) | |
472 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
472 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
473 | return defer.fail(failure.Failure()) |
|
473 | return defer.fail(failure.Failure()) | |
474 | else: |
|
474 | else: | |
475 | dList = [e.del_properties(keys) for e in engines] |
|
475 | dList = [e.del_properties(keys) for e in engines] | |
476 | d = gatherBoth(dList, |
|
476 | d = gatherBoth(dList, | |
477 | fireOnOneErrback=0, |
|
477 | fireOnOneErrback=0, | |
478 | consumeErrors=1, |
|
478 | consumeErrors=1, | |
479 | logErrors=0) |
|
479 | logErrors=0) | |
480 | d.addCallback(error.collect_exceptions, 'del_properties') |
|
480 | d.addCallback(error.collect_exceptions, 'del_properties') | |
481 | return d |
|
481 | return d | |
482 |
|
482 | |||
483 | def clear_properties(self, targets='all'): |
|
483 | def clear_properties(self, targets='all'): | |
484 | log.msg("Clearing properties on %r" % targets) |
|
484 | log.msg("Clearing properties on %r" % targets) | |
485 | try: |
|
485 | try: | |
486 | engines = self.engineList(targets) |
|
486 | engines = self.engineList(targets) | |
487 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): |
|
487 | except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered): | |
488 | return defer.fail(failure.Failure()) |
|
488 | return defer.fail(failure.Failure()) | |
489 | else: |
|
489 | else: | |
490 | dList = [e.clear_properties() for e in engines] |
|
490 | dList = [e.clear_properties() for e in engines] | |
491 | d = gatherBoth(dList, |
|
491 | d = gatherBoth(dList, | |
492 | fireOnOneErrback=0, |
|
492 | fireOnOneErrback=0, | |
493 | consumeErrors=1, |
|
493 | consumeErrors=1, | |
494 | logErrors=0) |
|
494 | logErrors=0) | |
495 | d.addCallback(error.collect_exceptions, 'clear_properties') |
|
495 | d.addCallback(error.collect_exceptions, 'clear_properties') | |
496 | return d |
|
496 | return d | |
497 |
|
497 | |||
498 |
|
498 | |||
499 | components.registerAdapter(MultiEngine, |
|
499 | components.registerAdapter(MultiEngine, | |
500 | IControllerBase, |
|
500 | IControllerBase, | |
501 | IMultiEngine) |
|
501 | IMultiEngine) | |
502 |
|
502 | |||
503 |
|
503 | |||
504 | #------------------------------------------------------------------------------- |
|
504 | #------------------------------------------------------------------------------- | |
505 | # Interfaces for the Synchronous MultiEngine |
|
505 | # Interfaces for the Synchronous MultiEngine | |
506 | #------------------------------------------------------------------------------- |
|
506 | #------------------------------------------------------------------------------- | |
507 |
|
507 | |||
508 | class ISynchronousEngineMultiplexer(Interface): |
|
508 | class ISynchronousEngineMultiplexer(Interface): | |
509 | pass |
|
509 | pass | |
510 |
|
510 | |||
511 |
|
511 | |||
512 | class ISynchronousMultiEngine(ISynchronousEngineMultiplexer): |
|
512 | class ISynchronousMultiEngine(ISynchronousEngineMultiplexer): | |
513 | """Synchronous, two-phase version of IMultiEngine. |
|
513 | """Synchronous, two-phase version of IMultiEngine. | |
514 |
|
514 | |||
515 | Methods in this interface are identical to those of IMultiEngine, but they |
|
515 | Methods in this interface are identical to those of IMultiEngine, but they | |
516 | take one additional argument: |
|
516 | take one additional argument: | |
517 |
|
517 | |||
518 | execute(lines, targets='all') -> execute(lines, targets='all, block=True) |
|
518 | execute(lines, targets='all') -> execute(lines, targets='all, block=True) | |
519 |
|
519 | |||
520 | :Parameters: |
|
520 | :Parameters: | |
521 | block : boolean |
|
521 | block : boolean | |
522 | Should the method return a deferred to a deferredID or the |
|
522 | Should the method return a deferred to a deferredID or the | |
523 | actual result. If block=False a deferred to a deferredID is |
|
523 | actual result. If block=False a deferred to a deferredID is | |
524 | returned and the user must call `get_pending_deferred` at a later |
|
524 | returned and the user must call `get_pending_deferred` at a later | |
525 | point. If block=True, a deferred to the actual result comes back. |
|
525 | point. If block=True, a deferred to the actual result comes back. | |
526 | """ |
|
526 | """ | |
527 | def get_pending_deferred(deferredID, block=True): |
|
527 | def get_pending_deferred(deferredID, block=True): | |
528 | """""" |
|
528 | """""" | |
529 |
|
529 | |||
530 | def clear_pending_deferreds(): |
|
530 | def clear_pending_deferreds(): | |
531 | """""" |
|
531 | """""" | |
532 |
|
532 | |||
533 |
|
533 | |||
534 | #------------------------------------------------------------------------------- |
|
534 | #------------------------------------------------------------------------------- | |
535 | # Implementation of the Synchronous MultiEngine |
|
535 | # Implementation of the Synchronous MultiEngine | |
536 | #------------------------------------------------------------------------------- |
|
536 | #------------------------------------------------------------------------------- | |
537 |
|
537 | |||
538 | class SynchronousMultiEngine(PendingDeferredManager): |
|
538 | class SynchronousMultiEngine(PendingDeferredManager): | |
539 | """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine` |
|
539 | """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine` | |
540 |
|
540 | |||
541 | Warning, this class uses a decorator that currently uses **kwargs. |
|
541 | Warning, this class uses a decorator that currently uses **kwargs. | |
542 | Because of this block must be passed as a kwarg, not positionally. |
|
542 | Because of this block must be passed as a kwarg, not positionally. | |
543 | """ |
|
543 | """ | |
544 |
|
544 | |||
545 | implements(ISynchronousMultiEngine) |
|
545 | implements(ISynchronousMultiEngine) | |
546 |
|
546 | |||
547 | def __init__(self, multiengine): |
|
547 | def __init__(self, multiengine): | |
548 | self.multiengine = multiengine |
|
548 | self.multiengine = multiengine | |
549 | PendingDeferredManager.__init__(self) |
|
549 | PendingDeferredManager.__init__(self) | |
550 |
|
550 | |||
551 | #--------------------------------------------------------------------------- |
|
551 | #--------------------------------------------------------------------------- | |
552 | # Decorated pending deferred methods |
|
552 | # Decorated pending deferred methods | |
553 | #--------------------------------------------------------------------------- |
|
553 | #--------------------------------------------------------------------------- | |
554 |
|
554 | |||
|
555 | @profile | |||
555 | @two_phase |
|
556 | @two_phase | |
556 | def execute(self, lines, targets='all'): |
|
557 | def execute(self, lines, targets='all'): | |
557 | d = self.multiengine.execute(lines, targets) |
|
558 | d = self.multiengine.execute(lines, targets) | |
558 | return d |
|
559 | return d | |
559 |
|
560 | |||
560 | @two_phase |
|
561 | @two_phase | |
561 | def push(self, namespace, targets='all'): |
|
562 | def push(self, namespace, targets='all'): | |
562 | return self.multiengine.push(namespace, targets) |
|
563 | return self.multiengine.push(namespace, targets) | |
563 |
|
564 | |||
564 | @two_phase |
|
565 | @two_phase | |
565 | def pull(self, keys, targets='all'): |
|
566 | def pull(self, keys, targets='all'): | |
566 | d = self.multiengine.pull(keys, targets) |
|
567 | d = self.multiengine.pull(keys, targets) | |
567 | return d |
|
568 | return d | |
568 |
|
569 | |||
569 | @two_phase |
|
570 | @two_phase | |
570 | def push_function(self, namespace, targets='all'): |
|
571 | def push_function(self, namespace, targets='all'): | |
571 | return self.multiengine.push_function(namespace, targets) |
|
572 | return self.multiengine.push_function(namespace, targets) | |
572 |
|
573 | |||
573 | @two_phase |
|
574 | @two_phase | |
574 | def pull_function(self, keys, targets='all'): |
|
575 | def pull_function(self, keys, targets='all'): | |
575 | d = self.multiengine.pull_function(keys, targets) |
|
576 | d = self.multiengine.pull_function(keys, targets) | |
576 | return d |
|
577 | return d | |
577 |
|
578 | |||
578 | @two_phase |
|
579 | @two_phase | |
579 | def get_result(self, i=None, targets='all'): |
|
580 | def get_result(self, i=None, targets='all'): | |
580 | return self.multiengine.get_result(i, targets='all') |
|
581 | return self.multiengine.get_result(i, targets='all') | |
581 |
|
582 | |||
582 | @two_phase |
|
583 | @two_phase | |
583 | def reset(self, targets='all'): |
|
584 | def reset(self, targets='all'): | |
584 | return self.multiengine.reset(targets) |
|
585 | return self.multiengine.reset(targets) | |
585 |
|
586 | |||
586 | @two_phase |
|
587 | @two_phase | |
587 | def keys(self, targets='all'): |
|
588 | def keys(self, targets='all'): | |
588 | return self.multiengine.keys(targets) |
|
589 | return self.multiengine.keys(targets) | |
589 |
|
590 | |||
590 | @two_phase |
|
591 | @two_phase | |
591 | def kill(self, controller=False, targets='all'): |
|
592 | def kill(self, controller=False, targets='all'): | |
592 | return self.multiengine.kill(controller, targets) |
|
593 | return self.multiengine.kill(controller, targets) | |
593 |
|
594 | |||
594 | @two_phase |
|
595 | @two_phase | |
595 | def push_serialized(self, namespace, targets='all'): |
|
596 | def push_serialized(self, namespace, targets='all'): | |
596 | return self.multiengine.push_serialized(namespace, targets) |
|
597 | return self.multiengine.push_serialized(namespace, targets) | |
597 |
|
598 | |||
598 | @two_phase |
|
599 | @two_phase | |
599 | def pull_serialized(self, keys, targets='all'): |
|
600 | def pull_serialized(self, keys, targets='all'): | |
600 | return self.multiengine.pull_serialized(keys, targets) |
|
601 | return self.multiengine.pull_serialized(keys, targets) | |
601 |
|
602 | |||
602 | @two_phase |
|
603 | @two_phase | |
603 | def clear_queue(self, targets='all'): |
|
604 | def clear_queue(self, targets='all'): | |
604 | return self.multiengine.clear_queue(targets) |
|
605 | return self.multiengine.clear_queue(targets) | |
605 |
|
606 | |||
606 | @two_phase |
|
607 | @two_phase | |
607 | def queue_status(self, targets='all'): |
|
608 | def queue_status(self, targets='all'): | |
608 | return self.multiengine.queue_status(targets) |
|
609 | return self.multiengine.queue_status(targets) | |
609 |
|
610 | |||
610 | @two_phase |
|
611 | @two_phase | |
611 | def set_properties(self, properties, targets='all'): |
|
612 | def set_properties(self, properties, targets='all'): | |
612 | return self.multiengine.set_properties(properties, targets) |
|
613 | return self.multiengine.set_properties(properties, targets) | |
613 |
|
614 | |||
614 | @two_phase |
|
615 | @two_phase | |
615 | def get_properties(self, keys=None, targets='all'): |
|
616 | def get_properties(self, keys=None, targets='all'): | |
616 | return self.multiengine.get_properties(keys, targets) |
|
617 | return self.multiengine.get_properties(keys, targets) | |
617 |
|
618 | |||
618 | @two_phase |
|
619 | @two_phase | |
619 | def has_properties(self, keys, targets='all'): |
|
620 | def has_properties(self, keys, targets='all'): | |
620 | return self.multiengine.has_properties(keys, targets) |
|
621 | return self.multiengine.has_properties(keys, targets) | |
621 |
|
622 | |||
622 | @two_phase |
|
623 | @two_phase | |
623 | def del_properties(self, keys, targets='all'): |
|
624 | def del_properties(self, keys, targets='all'): | |
624 | return self.multiengine.del_properties(keys, targets) |
|
625 | return self.multiengine.del_properties(keys, targets) | |
625 |
|
626 | |||
626 | @two_phase |
|
627 | @two_phase | |
627 | def clear_properties(self, targets='all'): |
|
628 | def clear_properties(self, targets='all'): | |
628 | return self.multiengine.clear_properties(targets) |
|
629 | return self.multiengine.clear_properties(targets) | |
629 |
|
630 | |||
630 | #--------------------------------------------------------------------------- |
|
631 | #--------------------------------------------------------------------------- | |
631 | # IMultiEngine methods |
|
632 | # IMultiEngine methods | |
632 | #--------------------------------------------------------------------------- |
|
633 | #--------------------------------------------------------------------------- | |
633 |
|
634 | |||
634 | def get_ids(self): |
|
635 | def get_ids(self): | |
635 | """Return a list of registered engine ids. |
|
636 | """Return a list of registered engine ids. | |
636 |
|
637 | |||
637 | Never use the two phase block/non-block stuff for this. |
|
638 | Never use the two phase block/non-block stuff for this. | |
638 | """ |
|
639 | """ | |
639 | return self.multiengine.get_ids() |
|
640 | return self.multiengine.get_ids() | |
640 |
|
641 | |||
641 |
|
642 | |||
642 | components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine) |
|
643 | components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine) | |
643 |
|
644 | |||
644 |
|
645 | |||
645 | #------------------------------------------------------------------------------- |
|
646 | #------------------------------------------------------------------------------- | |
646 | # Various high-level interfaces that can be used as MultiEngine mix-ins |
|
647 | # Various high-level interfaces that can be used as MultiEngine mix-ins | |
647 | #------------------------------------------------------------------------------- |
|
648 | #------------------------------------------------------------------------------- | |
648 |
|
649 | |||
649 | #------------------------------------------------------------------------------- |
|
650 | #------------------------------------------------------------------------------- | |
650 | # IMultiEngineCoordinator |
|
651 | # IMultiEngineCoordinator | |
651 | #------------------------------------------------------------------------------- |
|
652 | #------------------------------------------------------------------------------- | |
652 |
|
653 | |||
653 | class IMultiEngineCoordinator(Interface): |
|
654 | class IMultiEngineCoordinator(Interface): | |
654 | """Methods that work on multiple engines explicitly.""" |
|
655 | """Methods that work on multiple engines explicitly.""" | |
655 |
|
656 | |||
656 | def scatter(key, seq, dist='b', flatten=False, targets='all'): |
|
657 | def scatter(key, seq, dist='b', flatten=False, targets='all'): | |
657 | """Partition and distribute a sequence to targets.""" |
|
658 | """Partition and distribute a sequence to targets.""" | |
658 |
|
659 | |||
659 | def gather(key, dist='b', targets='all'): |
|
660 | def gather(key, dist='b', targets='all'): | |
660 | """Gather object key from targets.""" |
|
661 | """Gather object key from targets.""" | |
661 |
|
662 | |||
662 | def raw_map(func, seqs, dist='b', targets='all'): |
|
663 | def raw_map(func, seqs, dist='b', targets='all'): | |
663 | """ |
|
664 | """ | |
664 | A parallelized version of Python's builtin `map` function. |
|
665 | A parallelized version of Python's builtin `map` function. | |
665 |
|
666 | |||
666 | This has a slightly different syntax than the builtin `map`. |
|
667 | This has a slightly different syntax than the builtin `map`. | |
667 | This is needed because we need to have keyword arguments and thus |
|
668 | This is needed because we need to have keyword arguments and thus | |
668 | can't use *args to capture all the sequences. Instead, they must |
|
669 | can't use *args to capture all the sequences. Instead, they must | |
669 | be passed in a list or tuple. |
|
670 | be passed in a list or tuple. | |
670 |
|
671 | |||
671 | The equivalence is: |
|
672 | The equivalence is: | |
672 |
|
673 | |||
673 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) |
|
674 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) | |
674 |
|
675 | |||
675 | Most users will want to use parallel functions or the `mapper` |
|
676 | Most users will want to use parallel functions or the `mapper` | |
676 | and `map` methods for an API that follows that of the builtin |
|
677 | and `map` methods for an API that follows that of the builtin | |
677 | `map`. |
|
678 | `map`. | |
678 | """ |
|
679 | """ | |
679 |
|
680 | |||
680 |
|
681 | |||
681 | class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): |
|
682 | class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): | |
682 | """Methods that work on multiple engines explicitly.""" |
|
683 | """Methods that work on multiple engines explicitly.""" | |
683 |
|
684 | |||
684 | def scatter(key, seq, dist='b', flatten=False, targets='all', block=True): |
|
685 | def scatter(key, seq, dist='b', flatten=False, targets='all', block=True): | |
685 | """Partition and distribute a sequence to targets.""" |
|
686 | """Partition and distribute a sequence to targets.""" | |
686 |
|
687 | |||
687 | def gather(key, dist='b', targets='all', block=True): |
|
688 | def gather(key, dist='b', targets='all', block=True): | |
688 | """Gather object key from targets""" |
|
689 | """Gather object key from targets""" | |
689 |
|
690 | |||
690 | def raw_map(func, seqs, dist='b', targets='all', block=True): |
|
691 | def raw_map(func, seqs, dist='b', targets='all', block=True): | |
691 | """ |
|
692 | """ | |
692 | A parallelized version of Python's builtin map. |
|
693 | A parallelized version of Python's builtin map. | |
693 |
|
694 | |||
694 | This has a slightly different syntax than the builtin `map`. |
|
695 | This has a slightly different syntax than the builtin `map`. | |
695 | This is needed because we need to have keyword arguments and thus |
|
696 | This is needed because we need to have keyword arguments and thus | |
696 | can't use *args to capture all the sequences. Instead, they must |
|
697 | can't use *args to capture all the sequences. Instead, they must | |
697 | be passed in a list or tuple. |
|
698 | be passed in a list or tuple. | |
698 |
|
699 | |||
699 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) |
|
700 | raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) | |
700 |
|
701 | |||
701 | Most users will want to use parallel functions or the `mapper` |
|
702 | Most users will want to use parallel functions or the `mapper` | |
702 | and `map` methods for an API that follows that of the builtin |
|
703 | and `map` methods for an API that follows that of the builtin | |
703 | `map`. |
|
704 | `map`. | |
704 | """ |
|
705 | """ | |
705 |
|
706 | |||
706 |
|
707 | |||
707 | #------------------------------------------------------------------------------- |
|
708 | #------------------------------------------------------------------------------- | |
708 | # IMultiEngineExtras |
|
709 | # IMultiEngineExtras | |
709 | #------------------------------------------------------------------------------- |
|
710 | #------------------------------------------------------------------------------- | |
710 |
|
711 | |||
711 | class IMultiEngineExtras(Interface): |
|
712 | class IMultiEngineExtras(Interface): | |
712 |
|
713 | |||
713 | def zip_pull(targets, keys): |
|
714 | def zip_pull(targets, keys): | |
714 | """ |
|
715 | """ | |
715 | Pull, but return results in a different format from `pull`. |
|
716 | Pull, but return results in a different format from `pull`. | |
716 |
|
717 | |||
717 | This method basically returns zip(pull(targets, *keys)), with a few |
|
718 | This method basically returns zip(pull(targets, *keys)), with a few | |
718 | edge cases handled differently. Users of chainsaw will find this format |
|
719 | edge cases handled differently. Users of chainsaw will find this format | |
719 | familiar. |
|
720 | familiar. | |
720 | """ |
|
721 | """ | |
721 |
|
722 | |||
722 | def run(targets, fname): |
|
723 | def run(targets, fname): | |
723 | """Run a .py file on targets.""" |
|
724 | """Run a .py file on targets.""" | |
724 |
|
725 | |||
725 |
|
726 | |||
726 | class ISynchronousMultiEngineExtras(IMultiEngineExtras): |
|
727 | class ISynchronousMultiEngineExtras(IMultiEngineExtras): | |
727 | def zip_pull(targets, keys, block=True): |
|
728 | def zip_pull(targets, keys, block=True): | |
728 | """ |
|
729 | """ | |
729 | Pull, but return results in a different format from `pull`. |
|
730 | Pull, but return results in a different format from `pull`. | |
730 |
|
731 | |||
731 | This method basically returns zip(pull(targets, *keys)), with a few |
|
732 | This method basically returns zip(pull(targets, *keys)), with a few | |
732 | edge cases handled differently. Users of chainsaw will find this format |
|
733 | edge cases handled differently. Users of chainsaw will find this format | |
733 | familiar. |
|
734 | familiar. | |
734 | """ |
|
735 | """ | |
735 |
|
736 | |||
736 | def run(targets, fname, block=True): |
|
737 | def run(targets, fname, block=True): | |
737 | """Run a .py file on targets.""" |
|
738 | """Run a .py file on targets.""" | |
738 |
|
739 | |||
739 | #------------------------------------------------------------------------------- |
|
740 | #------------------------------------------------------------------------------- | |
740 | # The full MultiEngine interface |
|
741 | # The full MultiEngine interface | |
741 | #------------------------------------------------------------------------------- |
|
742 | #------------------------------------------------------------------------------- | |
742 |
|
743 | |||
743 | class IFullMultiEngine(IMultiEngine, |
|
744 | class IFullMultiEngine(IMultiEngine, | |
744 | IMultiEngineCoordinator, |
|
745 | IMultiEngineCoordinator, | |
745 | IMultiEngineExtras): |
|
746 | IMultiEngineExtras): | |
746 | pass |
|
747 | pass | |
747 |
|
748 | |||
748 |
|
749 | |||
749 | class IFullSynchronousMultiEngine(ISynchronousMultiEngine, |
|
750 | class IFullSynchronousMultiEngine(ISynchronousMultiEngine, | |
750 | ISynchronousMultiEngineCoordinator, |
|
751 | ISynchronousMultiEngineCoordinator, | |
751 | ISynchronousMultiEngineExtras): |
|
752 | ISynchronousMultiEngineExtras): | |
752 | pass |
|
753 | pass | |
753 |
|
754 |
1 | NO CONTENT: modified file |
|
NO CONTENT: modified file |
General Comments 0
You need to be logged in to leave comments.
Login now