##// END OF EJS Templates
Merging Brian's trunk-dev....
Brian Granger -
r2530:9d940dc9 merge
parent child Browse files
Show More
@@ -1,541 +1,544 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_enginepb -*-
2 # -*- test-case-name: IPython.kernel.test.test_enginepb -*-
3
3
4 """
4 """
5 Expose the IPython EngineService using the Foolscap network protocol.
5 Expose the IPython EngineService using the Foolscap network protocol.
6
6
7 Foolscap is a high-performance and secure network protocol.
7 Foolscap is a high-performance and secure network protocol.
8 """
8 """
9 __docformat__ = "restructuredtext en"
9 __docformat__ = "restructuredtext en"
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import cPickle as pickle
22 import cPickle as pickle
23
23
24 from twisted.python import components, log, failure
24 from twisted.python import components, log, failure
25 from twisted.internet import defer, threads
25 from twisted.internet import defer, threads
26 from zope.interface import Interface, implements
26 from zope.interface import Interface, implements
27
27
28 from twisted.internet.base import DelayedCall
28 from twisted.internet.base import DelayedCall
29 DelayedCall.debug = True
29 DelayedCall.debug = True
30
30
31 try:
32 from foolscap.api import Referenceable, DeadReferenceError
33 except ImportError:
31 from foolscap import Referenceable, DeadReferenceError
34 from foolscap import Referenceable, DeadReferenceError
32 from foolscap.referenceable import RemoteReference
35 from foolscap.referenceable import RemoteReference
33
36
34 from IPython.kernel.pbutil import packageFailure, unpackageFailure
37 from IPython.kernel.pbutil import packageFailure, unpackageFailure
35 from IPython.kernel.controllerservice import IControllerBase
38 from IPython.kernel.controllerservice import IControllerBase
36 from IPython.kernel.engineservice import (
39 from IPython.kernel.engineservice import (
37 IEngineBase,
40 IEngineBase,
38 IEngineQueued,
41 IEngineQueued,
39 StrictDict
42 StrictDict
40 )
43 )
41 from IPython.kernel.pickleutil import (
44 from IPython.kernel.pickleutil import (
42 can,
45 can,
43 canDict,
46 canDict,
44 canSequence,
47 canSequence,
45 uncan,
48 uncan,
46 uncanDict,
49 uncanDict,
47 uncanSequence
50 uncanSequence
48 )
51 )
49
52
50
53
51 #-------------------------------------------------------------------------------
54 #-------------------------------------------------------------------------------
52 # The client (Engine) side of things
55 # The client (Engine) side of things
53 #-------------------------------------------------------------------------------
56 #-------------------------------------------------------------------------------
54
57
55 # Expose a FC interface to the EngineService
58 # Expose a FC interface to the EngineService
56
59
57 class IFCEngine(Interface):
60 class IFCEngine(Interface):
58 """An interface that exposes an EngineService over Foolscap.
61 """An interface that exposes an EngineService over Foolscap.
59
62
60 The methods in this interface are similar to those from IEngine,
63 The methods in this interface are similar to those from IEngine,
61 but their arguments and return values slightly different to reflect
64 but their arguments and return values slightly different to reflect
62 that FC cannot send arbitrary objects. We handle this by pickling/
65 that FC cannot send arbitrary objects. We handle this by pickling/
63 unpickling that the two endpoints.
66 unpickling that the two endpoints.
64
67
65 If a remote or local exception is raised, the appropriate Failure
68 If a remote or local exception is raised, the appropriate Failure
66 will be returned instead.
69 will be returned instead.
67 """
70 """
68 pass
71 pass
69
72
70
73
71 class FCEngineReferenceFromService(Referenceable, object):
74 class FCEngineReferenceFromService(Referenceable, object):
72 """Adapt an `IEngineBase` to an `IFCEngine` implementer.
75 """Adapt an `IEngineBase` to an `IFCEngine` implementer.
73
76
74 This exposes an `IEngineBase` to foolscap by adapting it to a
77 This exposes an `IEngineBase` to foolscap by adapting it to a
75 `foolscap.Referenceable`.
78 `foolscap.Referenceable`.
76
79
77 See the documentation of the `IEngineBase` methods for more details.
80 See the documentation of the `IEngineBase` methods for more details.
78 """
81 """
79
82
80 implements(IFCEngine)
83 implements(IFCEngine)
81
84
82 def __init__(self, service):
85 def __init__(self, service):
83 assert IEngineBase.providedBy(service), \
86 assert IEngineBase.providedBy(service), \
84 "IEngineBase is not provided by" + repr(service)
87 "IEngineBase is not provided by" + repr(service)
85 self.service = service
88 self.service = service
86 self.collectors = {}
89 self.collectors = {}
87
90
88 def remote_get_id(self):
91 def remote_get_id(self):
89 return self.service.id
92 return self.service.id
90
93
91 def remote_set_id(self, id):
94 def remote_set_id(self, id):
92 self.service.id = id
95 self.service.id = id
93
96
94 def _checkProperties(self, result):
97 def _checkProperties(self, result):
95 dosync = self.service.properties.modified
98 dosync = self.service.properties.modified
96 self.service.properties.modified = False
99 self.service.properties.modified = False
97 return (dosync and pickle.dumps(self.service.properties, 2)), result
100 return (dosync and pickle.dumps(self.service.properties, 2)), result
98
101
99 def remote_execute(self, lines):
102 def remote_execute(self, lines):
100 d = self.service.execute(lines)
103 d = self.service.execute(lines)
101 d.addErrback(packageFailure)
104 d.addErrback(packageFailure)
102 d.addCallback(self._checkProperties)
105 d.addCallback(self._checkProperties)
103 d.addErrback(packageFailure)
106 d.addErrback(packageFailure)
104 #d.addCallback(lambda r: log.msg("Got result: " + str(r)))
107 #d.addCallback(lambda r: log.msg("Got result: " + str(r)))
105 return d
108 return d
106
109
107 #---------------------------------------------------------------------------
110 #---------------------------------------------------------------------------
108 # Old version of push
111 # Old version of push
109 #---------------------------------------------------------------------------
112 #---------------------------------------------------------------------------
110
113
111 def remote_push(self, pNamespace):
114 def remote_push(self, pNamespace):
112 try:
115 try:
113 namespace = pickle.loads(pNamespace)
116 namespace = pickle.loads(pNamespace)
114 except:
117 except:
115 return defer.fail(failure.Failure()).addErrback(packageFailure)
118 return defer.fail(failure.Failure()).addErrback(packageFailure)
116 else:
119 else:
117 return self.service.push(namespace).addErrback(packageFailure)
120 return self.service.push(namespace).addErrback(packageFailure)
118
121
119 #---------------------------------------------------------------------------
122 #---------------------------------------------------------------------------
120 # pull
123 # pull
121 #---------------------------------------------------------------------------
124 #---------------------------------------------------------------------------
122
125
123 def remote_pull(self, keys):
126 def remote_pull(self, keys):
124 d = self.service.pull(keys)
127 d = self.service.pull(keys)
125 d.addCallback(pickle.dumps, 2)
128 d.addCallback(pickle.dumps, 2)
126 d.addErrback(packageFailure)
129 d.addErrback(packageFailure)
127 return d
130 return d
128
131
129 #---------------------------------------------------------------------------
132 #---------------------------------------------------------------------------
130 # push/pullFuction
133 # push/pullFuction
131 #---------------------------------------------------------------------------
134 #---------------------------------------------------------------------------
132
135
133 def remote_push_function(self, pNamespace):
136 def remote_push_function(self, pNamespace):
134 try:
137 try:
135 namespace = pickle.loads(pNamespace)
138 namespace = pickle.loads(pNamespace)
136 except:
139 except:
137 return defer.fail(failure.Failure()).addErrback(packageFailure)
140 return defer.fail(failure.Failure()).addErrback(packageFailure)
138 else:
141 else:
139 # The usage of globals() here is an attempt to bind any pickled functions
142 # The usage of globals() here is an attempt to bind any pickled functions
140 # to the globals of this module. What we really want is to have it bound
143 # to the globals of this module. What we really want is to have it bound
141 # to the globals of the callers module. This will require walking the
144 # to the globals of the callers module. This will require walking the
142 # stack. BG 10/3/07.
145 # stack. BG 10/3/07.
143 namespace = uncanDict(namespace, globals())
146 namespace = uncanDict(namespace, globals())
144 return self.service.push_function(namespace).addErrback(packageFailure)
147 return self.service.push_function(namespace).addErrback(packageFailure)
145
148
146 def remote_pull_function(self, keys):
149 def remote_pull_function(self, keys):
147 d = self.service.pull_function(keys)
150 d = self.service.pull_function(keys)
148 if len(keys)>1:
151 if len(keys)>1:
149 d.addCallback(canSequence)
152 d.addCallback(canSequence)
150 elif len(keys)==1:
153 elif len(keys)==1:
151 d.addCallback(can)
154 d.addCallback(can)
152 d.addCallback(pickle.dumps, 2)
155 d.addCallback(pickle.dumps, 2)
153 d.addErrback(packageFailure)
156 d.addErrback(packageFailure)
154 return d
157 return d
155
158
156 #---------------------------------------------------------------------------
159 #---------------------------------------------------------------------------
157 # Other methods
160 # Other methods
158 #---------------------------------------------------------------------------
161 #---------------------------------------------------------------------------
159
162
160 def remote_get_result(self, i=None):
163 def remote_get_result(self, i=None):
161 return self.service.get_result(i).addErrback(packageFailure)
164 return self.service.get_result(i).addErrback(packageFailure)
162
165
163 def remote_reset(self):
166 def remote_reset(self):
164 return self.service.reset().addErrback(packageFailure)
167 return self.service.reset().addErrback(packageFailure)
165
168
166 def remote_kill(self):
169 def remote_kill(self):
167 return self.service.kill().addErrback(packageFailure)
170 return self.service.kill().addErrback(packageFailure)
168
171
169 def remote_keys(self):
172 def remote_keys(self):
170 return self.service.keys().addErrback(packageFailure)
173 return self.service.keys().addErrback(packageFailure)
171
174
172 #---------------------------------------------------------------------------
175 #---------------------------------------------------------------------------
173 # push/pull_serialized
176 # push/pull_serialized
174 #---------------------------------------------------------------------------
177 #---------------------------------------------------------------------------
175
178
176 def remote_push_serialized(self, pNamespace):
179 def remote_push_serialized(self, pNamespace):
177 try:
180 try:
178 namespace = pickle.loads(pNamespace)
181 namespace = pickle.loads(pNamespace)
179 except:
182 except:
180 return defer.fail(failure.Failure()).addErrback(packageFailure)
183 return defer.fail(failure.Failure()).addErrback(packageFailure)
181 else:
184 else:
182 d = self.service.push_serialized(namespace)
185 d = self.service.push_serialized(namespace)
183 return d.addErrback(packageFailure)
186 return d.addErrback(packageFailure)
184
187
185 def remote_pull_serialized(self, keys):
188 def remote_pull_serialized(self, keys):
186 d = self.service.pull_serialized(keys)
189 d = self.service.pull_serialized(keys)
187 d.addCallback(pickle.dumps, 2)
190 d.addCallback(pickle.dumps, 2)
188 d.addErrback(packageFailure)
191 d.addErrback(packageFailure)
189 return d
192 return d
190
193
191 #---------------------------------------------------------------------------
194 #---------------------------------------------------------------------------
192 # Properties interface
195 # Properties interface
193 #---------------------------------------------------------------------------
196 #---------------------------------------------------------------------------
194
197
195 def remote_set_properties(self, pNamespace):
198 def remote_set_properties(self, pNamespace):
196 try:
199 try:
197 namespace = pickle.loads(pNamespace)
200 namespace = pickle.loads(pNamespace)
198 except:
201 except:
199 return defer.fail(failure.Failure()).addErrback(packageFailure)
202 return defer.fail(failure.Failure()).addErrback(packageFailure)
200 else:
203 else:
201 return self.service.set_properties(namespace).addErrback(packageFailure)
204 return self.service.set_properties(namespace).addErrback(packageFailure)
202
205
203 def remote_get_properties(self, keys=None):
206 def remote_get_properties(self, keys=None):
204 d = self.service.get_properties(keys)
207 d = self.service.get_properties(keys)
205 d.addCallback(pickle.dumps, 2)
208 d.addCallback(pickle.dumps, 2)
206 d.addErrback(packageFailure)
209 d.addErrback(packageFailure)
207 return d
210 return d
208
211
209 def remote_has_properties(self, keys):
212 def remote_has_properties(self, keys):
210 d = self.service.has_properties(keys)
213 d = self.service.has_properties(keys)
211 d.addCallback(pickle.dumps, 2)
214 d.addCallback(pickle.dumps, 2)
212 d.addErrback(packageFailure)
215 d.addErrback(packageFailure)
213 return d
216 return d
214
217
215 def remote_del_properties(self, keys):
218 def remote_del_properties(self, keys):
216 d = self.service.del_properties(keys)
219 d = self.service.del_properties(keys)
217 d.addErrback(packageFailure)
220 d.addErrback(packageFailure)
218 return d
221 return d
219
222
220 def remote_clear_properties(self):
223 def remote_clear_properties(self):
221 d = self.service.clear_properties()
224 d = self.service.clear_properties()
222 d.addErrback(packageFailure)
225 d.addErrback(packageFailure)
223 return d
226 return d
224
227
225
228
226 components.registerAdapter(FCEngineReferenceFromService,
229 components.registerAdapter(FCEngineReferenceFromService,
227 IEngineBase,
230 IEngineBase,
228 IFCEngine)
231 IFCEngine)
229
232
230
233
231 #-------------------------------------------------------------------------------
234 #-------------------------------------------------------------------------------
232 # Now the server (Controller) side of things
235 # Now the server (Controller) side of things
233 #-------------------------------------------------------------------------------
236 #-------------------------------------------------------------------------------
234
237
235 class EngineFromReference(object):
238 class EngineFromReference(object):
236 """Adapt a `RemoteReference` to an `IEngineBase` implementing object.
239 """Adapt a `RemoteReference` to an `IEngineBase` implementing object.
237
240
238 When an engine connects to a controller, it calls the `register_engine`
241 When an engine connects to a controller, it calls the `register_engine`
239 method of the controller and passes the controller a `RemoteReference` to
242 method of the controller and passes the controller a `RemoteReference` to
240 itself. This class is used to adapt this `RemoteReference` to an object
243 itself. This class is used to adapt this `RemoteReference` to an object
241 that implements the full `IEngineBase` interface.
244 that implements the full `IEngineBase` interface.
242
245
243 See the documentation of `IEngineBase` for details on the methods.
246 See the documentation of `IEngineBase` for details on the methods.
244 """
247 """
245
248
246 implements(IEngineBase)
249 implements(IEngineBase)
247
250
248 def __init__(self, reference):
251 def __init__(self, reference):
249 self.reference = reference
252 self.reference = reference
250 self._id = None
253 self._id = None
251 self._properties = StrictDict()
254 self._properties = StrictDict()
252 self.currentCommand = None
255 self.currentCommand = None
253
256
254 def callRemote(self, *args, **kwargs):
257 def callRemote(self, *args, **kwargs):
255 try:
258 try:
256 return self.reference.callRemote(*args, **kwargs)
259 return self.reference.callRemote(*args, **kwargs)
257 except DeadReferenceError:
260 except DeadReferenceError:
258 self.notifier()
261 self.notifier()
259 self.stopNotifying(self.notifier)
262 self.stopNotifying(self.notifier)
260 return defer.fail()
263 return defer.fail()
261
264
262 def get_id(self):
265 def get_id(self):
263 """Return the Engines id."""
266 """Return the Engines id."""
264 return self._id
267 return self._id
265
268
266 def set_id(self, id):
269 def set_id(self, id):
267 """Set the Engines id."""
270 """Set the Engines id."""
268 self._id = id
271 self._id = id
269 return self.callRemote('set_id', id)
272 return self.callRemote('set_id', id)
270
273
271 id = property(get_id, set_id)
274 id = property(get_id, set_id)
272
275
273 def syncProperties(self, r):
276 def syncProperties(self, r):
274 try:
277 try:
275 psync, result = r
278 psync, result = r
276 except (ValueError, TypeError):
279 except (ValueError, TypeError):
277 return r
280 return r
278 else:
281 else:
279 if psync:
282 if psync:
280 log.msg("sync properties")
283 log.msg("sync properties")
281 pick = self.checkReturnForFailure(psync)
284 pick = self.checkReturnForFailure(psync)
282 if isinstance(pick, failure.Failure):
285 if isinstance(pick, failure.Failure):
283 self.properties = pick
286 self.properties = pick
284 return pick
287 return pick
285 else:
288 else:
286 self.properties = pickle.loads(pick)
289 self.properties = pickle.loads(pick)
287 return result
290 return result
288
291
289 def _set_properties(self, dikt):
292 def _set_properties(self, dikt):
290 self._properties.clear()
293 self._properties.clear()
291 self._properties.update(dikt)
294 self._properties.update(dikt)
292
295
293 def _get_properties(self):
296 def _get_properties(self):
294 if isinstance(self._properties, failure.Failure):
297 if isinstance(self._properties, failure.Failure):
295 self._properties.raiseException()
298 self._properties.raiseException()
296 return self._properties
299 return self._properties
297
300
298 properties = property(_get_properties, _set_properties)
301 properties = property(_get_properties, _set_properties)
299
302
300 #---------------------------------------------------------------------------
303 #---------------------------------------------------------------------------
301 # Methods from IEngine
304 # Methods from IEngine
302 #---------------------------------------------------------------------------
305 #---------------------------------------------------------------------------
303
306
304 #---------------------------------------------------------------------------
307 #---------------------------------------------------------------------------
305 # execute
308 # execute
306 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
307
310
308 def execute(self, lines):
311 def execute(self, lines):
309 # self._needProperties = True
312 # self._needProperties = True
310 d = self.callRemote('execute', lines)
313 d = self.callRemote('execute', lines)
311 d.addCallback(self.syncProperties)
314 d.addCallback(self.syncProperties)
312 return d.addCallback(self.checkReturnForFailure)
315 return d.addCallback(self.checkReturnForFailure)
313
316
314 #---------------------------------------------------------------------------
317 #---------------------------------------------------------------------------
315 # push
318 # push
316 #---------------------------------------------------------------------------
319 #---------------------------------------------------------------------------
317
320
318 def push(self, namespace):
321 def push(self, namespace):
319 try:
322 try:
320 package = pickle.dumps(namespace, 2)
323 package = pickle.dumps(namespace, 2)
321 except:
324 except:
322 return defer.fail(failure.Failure())
325 return defer.fail(failure.Failure())
323 else:
326 else:
324 if isinstance(package, failure.Failure):
327 if isinstance(package, failure.Failure):
325 return defer.fail(package)
328 return defer.fail(package)
326 else:
329 else:
327 d = self.callRemote('push', package)
330 d = self.callRemote('push', package)
328 return d.addCallback(self.checkReturnForFailure)
331 return d.addCallback(self.checkReturnForFailure)
329
332
330 #---------------------------------------------------------------------------
333 #---------------------------------------------------------------------------
331 # pull
334 # pull
332 #---------------------------------------------------------------------------
335 #---------------------------------------------------------------------------
333
336
334 def pull(self, keys):
337 def pull(self, keys):
335 d = self.callRemote('pull', keys)
338 d = self.callRemote('pull', keys)
336 d.addCallback(self.checkReturnForFailure)
339 d.addCallback(self.checkReturnForFailure)
337 d.addCallback(pickle.loads)
340 d.addCallback(pickle.loads)
338 return d
341 return d
339
342
340 #---------------------------------------------------------------------------
343 #---------------------------------------------------------------------------
341 # push/pull_function
344 # push/pull_function
342 #---------------------------------------------------------------------------
345 #---------------------------------------------------------------------------
343
346
344 def push_function(self, namespace):
347 def push_function(self, namespace):
345 try:
348 try:
346 package = pickle.dumps(canDict(namespace), 2)
349 package = pickle.dumps(canDict(namespace), 2)
347 except:
350 except:
348 return defer.fail(failure.Failure())
351 return defer.fail(failure.Failure())
349 else:
352 else:
350 if isinstance(package, failure.Failure):
353 if isinstance(package, failure.Failure):
351 return defer.fail(package)
354 return defer.fail(package)
352 else:
355 else:
353 d = self.callRemote('push_function', package)
356 d = self.callRemote('push_function', package)
354 return d.addCallback(self.checkReturnForFailure)
357 return d.addCallback(self.checkReturnForFailure)
355
358
356 def pull_function(self, keys):
359 def pull_function(self, keys):
357 d = self.callRemote('pull_function', keys)
360 d = self.callRemote('pull_function', keys)
358 d.addCallback(self.checkReturnForFailure)
361 d.addCallback(self.checkReturnForFailure)
359 d.addCallback(pickle.loads)
362 d.addCallback(pickle.loads)
360 # The usage of globals() here is an attempt to bind any pickled functions
363 # The usage of globals() here is an attempt to bind any pickled functions
361 # to the globals of this module. What we really want is to have it bound
364 # to the globals of this module. What we really want is to have it bound
362 # to the globals of the callers module. This will require walking the
365 # to the globals of the callers module. This will require walking the
363 # stack. BG 10/3/07.
366 # stack. BG 10/3/07.
364 if len(keys)==1:
367 if len(keys)==1:
365 d.addCallback(uncan, globals())
368 d.addCallback(uncan, globals())
366 elif len(keys)>1:
369 elif len(keys)>1:
367 d.addCallback(uncanSequence, globals())
370 d.addCallback(uncanSequence, globals())
368 return d
371 return d
369
372
370 #---------------------------------------------------------------------------
373 #---------------------------------------------------------------------------
371 # Other methods
374 # Other methods
372 #---------------------------------------------------------------------------
375 #---------------------------------------------------------------------------
373
376
374 def get_result(self, i=None):
377 def get_result(self, i=None):
375 return self.callRemote('get_result', i).addCallback(self.checkReturnForFailure)
378 return self.callRemote('get_result', i).addCallback(self.checkReturnForFailure)
376
379
377 def reset(self):
380 def reset(self):
378 self._refreshProperties = True
381 self._refreshProperties = True
379 d = self.callRemote('reset')
382 d = self.callRemote('reset')
380 d.addCallback(self.syncProperties)
383 d.addCallback(self.syncProperties)
381 return d.addCallback(self.checkReturnForFailure)
384 return d.addCallback(self.checkReturnForFailure)
382
385
383 def kill(self):
386 def kill(self):
384 #this will raise pb.PBConnectionLost on success
387 #this will raise pb.PBConnectionLost on success
385 d = self.callRemote('kill')
388 d = self.callRemote('kill')
386 d.addCallback(self.syncProperties)
389 d.addCallback(self.syncProperties)
387 d.addCallback(self.checkReturnForFailure)
390 d.addCallback(self.checkReturnForFailure)
388 d.addErrback(self.killBack)
391 d.addErrback(self.killBack)
389 return d
392 return d
390
393
391 def killBack(self, f):
394 def killBack(self, f):
392 log.msg('filling engine: %s' % f)
395 log.msg('filling engine: %s' % f)
393 return None
396 return None
394
397
395 def keys(self):
398 def keys(self):
396 return self.callRemote('keys').addCallback(self.checkReturnForFailure)
399 return self.callRemote('keys').addCallback(self.checkReturnForFailure)
397
400
398 #---------------------------------------------------------------------------
401 #---------------------------------------------------------------------------
399 # Properties methods
402 # Properties methods
400 #---------------------------------------------------------------------------
403 #---------------------------------------------------------------------------
401
404
402 def set_properties(self, properties):
405 def set_properties(self, properties):
403 try:
406 try:
404 package = pickle.dumps(properties, 2)
407 package = pickle.dumps(properties, 2)
405 except:
408 except:
406 return defer.fail(failure.Failure())
409 return defer.fail(failure.Failure())
407 else:
410 else:
408 if isinstance(package, failure.Failure):
411 if isinstance(package, failure.Failure):
409 return defer.fail(package)
412 return defer.fail(package)
410 else:
413 else:
411 d = self.callRemote('set_properties', package)
414 d = self.callRemote('set_properties', package)
412 return d.addCallback(self.checkReturnForFailure)
415 return d.addCallback(self.checkReturnForFailure)
413 return d
416 return d
414
417
415 def get_properties(self, keys=None):
418 def get_properties(self, keys=None):
416 d = self.callRemote('get_properties', keys)
419 d = self.callRemote('get_properties', keys)
417 d.addCallback(self.checkReturnForFailure)
420 d.addCallback(self.checkReturnForFailure)
418 d.addCallback(pickle.loads)
421 d.addCallback(pickle.loads)
419 return d
422 return d
420
423
421 def has_properties(self, keys):
424 def has_properties(self, keys):
422 d = self.callRemote('has_properties', keys)
425 d = self.callRemote('has_properties', keys)
423 d.addCallback(self.checkReturnForFailure)
426 d.addCallback(self.checkReturnForFailure)
424 d.addCallback(pickle.loads)
427 d.addCallback(pickle.loads)
425 return d
428 return d
426
429
427 def del_properties(self, keys):
430 def del_properties(self, keys):
428 d = self.callRemote('del_properties', keys)
431 d = self.callRemote('del_properties', keys)
429 d.addCallback(self.checkReturnForFailure)
432 d.addCallback(self.checkReturnForFailure)
430 # d.addCallback(pickle.loads)
433 # d.addCallback(pickle.loads)
431 return d
434 return d
432
435
433 def clear_properties(self):
436 def clear_properties(self):
434 d = self.callRemote('clear_properties')
437 d = self.callRemote('clear_properties')
435 d.addCallback(self.checkReturnForFailure)
438 d.addCallback(self.checkReturnForFailure)
436 return d
439 return d
437
440
438 #---------------------------------------------------------------------------
441 #---------------------------------------------------------------------------
439 # push/pull_serialized
442 # push/pull_serialized
440 #---------------------------------------------------------------------------
443 #---------------------------------------------------------------------------
441
444
442 def push_serialized(self, namespace):
445 def push_serialized(self, namespace):
443 """Older version of pushSerialize."""
446 """Older version of pushSerialize."""
444 try:
447 try:
445 package = pickle.dumps(namespace, 2)
448 package = pickle.dumps(namespace, 2)
446 except:
449 except:
447 return defer.fail(failure.Failure())
450 return defer.fail(failure.Failure())
448 else:
451 else:
449 if isinstance(package, failure.Failure):
452 if isinstance(package, failure.Failure):
450 return defer.fail(package)
453 return defer.fail(package)
451 else:
454 else:
452 d = self.callRemote('push_serialized', package)
455 d = self.callRemote('push_serialized', package)
453 return d.addCallback(self.checkReturnForFailure)
456 return d.addCallback(self.checkReturnForFailure)
454
457
455 def pull_serialized(self, keys):
458 def pull_serialized(self, keys):
456 d = self.callRemote('pull_serialized', keys)
459 d = self.callRemote('pull_serialized', keys)
457 d.addCallback(self.checkReturnForFailure)
460 d.addCallback(self.checkReturnForFailure)
458 d.addCallback(pickle.loads)
461 d.addCallback(pickle.loads)
459 return d
462 return d
460
463
461 #---------------------------------------------------------------------------
464 #---------------------------------------------------------------------------
462 # Misc
465 # Misc
463 #---------------------------------------------------------------------------
466 #---------------------------------------------------------------------------
464
467
465 def checkReturnForFailure(self, r):
468 def checkReturnForFailure(self, r):
466 """See if a returned value is a pickled Failure object.
469 """See if a returned value is a pickled Failure object.
467
470
468 To distinguish between general pickled objects and pickled Failures, the
471 To distinguish between general pickled objects and pickled Failures, the
469 other side should prepend the string FAILURE: to any pickled Failure.
472 other side should prepend the string FAILURE: to any pickled Failure.
470 """
473 """
471 return unpackageFailure(r)
474 return unpackageFailure(r)
472
475
473
476
474 components.registerAdapter(EngineFromReference,
477 components.registerAdapter(EngineFromReference,
475 RemoteReference,
478 RemoteReference,
476 IEngineBase)
479 IEngineBase)
477
480
478
481
479 #-------------------------------------------------------------------------------
482 #-------------------------------------------------------------------------------
480 # Now adapt an IControllerBase to incoming FC connections
483 # Now adapt an IControllerBase to incoming FC connections
481 #-------------------------------------------------------------------------------
484 #-------------------------------------------------------------------------------
482
485
483
486
484 class IFCControllerBase(Interface):
487 class IFCControllerBase(Interface):
485 """
488 """
486 Interface that tells how an Engine sees a Controller.
489 Interface that tells how an Engine sees a Controller.
487
490
488 In our architecture, the Controller listens for Engines to connect
491 In our architecture, the Controller listens for Engines to connect
489 and register. This interface defines that registration method as it is
492 and register. This interface defines that registration method as it is
490 exposed over the Foolscap network protocol
493 exposed over the Foolscap network protocol
491 """
494 """
492
495
493 def remote_register_engine(self, engineReference, id=None, pid=None, pproperties=None):
496 def remote_register_engine(self, engineReference, id=None, pid=None, pproperties=None):
494 """
497 """
495 Register new engine on the controller.
498 Register new engine on the controller.
496
499
497 Engines must call this upon connecting to the controller if they
500 Engines must call this upon connecting to the controller if they
498 want to do work for the controller.
501 want to do work for the controller.
499
502
500 See the documentation of `IControllerCore` for more details.
503 See the documentation of `IControllerCore` for more details.
501 """
504 """
502
505
503
506
504 class FCRemoteEngineRefFromService(Referenceable):
507 class FCRemoteEngineRefFromService(Referenceable):
505 """
508 """
506 Adapt an `IControllerBase` to an `IFCControllerBase`.
509 Adapt an `IControllerBase` to an `IFCControllerBase`.
507 """
510 """
508
511
509 implements(IFCControllerBase)
512 implements(IFCControllerBase)
510
513
511 def __init__(self, service):
514 def __init__(self, service):
512 assert IControllerBase.providedBy(service), \
515 assert IControllerBase.providedBy(service), \
513 "IControllerBase is not provided by " + repr(service)
516 "IControllerBase is not provided by " + repr(service)
514 self.service = service
517 self.service = service
515
518
516 def remote_register_engine(self, engine_reference, id=None, pid=None, pproperties=None):
519 def remote_register_engine(self, engine_reference, id=None, pid=None, pproperties=None):
517 # First adapt the engine_reference to a basic non-queued engine
520 # First adapt the engine_reference to a basic non-queued engine
518 engine = IEngineBase(engine_reference)
521 engine = IEngineBase(engine_reference)
519 if pproperties:
522 if pproperties:
520 engine.properties = pickle.loads(pproperties)
523 engine.properties = pickle.loads(pproperties)
521 # Make it an IQueuedEngine before registration
524 # Make it an IQueuedEngine before registration
522 remote_engine = IEngineQueued(engine)
525 remote_engine = IEngineQueued(engine)
523 # Get the ip/port of the remote side
526 # Get the ip/port of the remote side
524 peer_address = engine_reference.tracker.broker.transport.getPeer()
527 peer_address = engine_reference.tracker.broker.transport.getPeer()
525 ip = peer_address.host
528 ip = peer_address.host
526 port = peer_address.port
529 port = peer_address.port
527 reg_dict = self.service.register_engine(remote_engine, id, ip, port, pid)
530 reg_dict = self.service.register_engine(remote_engine, id, ip, port, pid)
528 # Now setup callback for disconnect and unregistering the engine
531 # Now setup callback for disconnect and unregistering the engine
529 def notify(*args):
532 def notify(*args):
530 return self.service.unregister_engine(reg_dict['id'])
533 return self.service.unregister_engine(reg_dict['id'])
531 engine_reference.tracker.broker.notifyOnDisconnect(notify)
534 engine_reference.tracker.broker.notifyOnDisconnect(notify)
532
535
533 engine.notifier = notify
536 engine.notifier = notify
534 engine.stopNotifying = engine_reference.tracker.broker.dontNotifyOnDisconnect
537 engine.stopNotifying = engine_reference.tracker.broker.dontNotifyOnDisconnect
535
538
536 return reg_dict
539 return reg_dict
537
540
538
541
539 components.registerAdapter(FCRemoteEngineRefFromService,
542 components.registerAdapter(FCRemoteEngineRefFromService,
540 IControllerBase,
543 IControllerBase,
541 IFCControllerBase)
544 IFCControllerBase)
@@ -1,293 +1,296 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Foolscap related utilities.
4 Foolscap related utilities.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import tempfile
21 import tempfile
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.python import log
24 from twisted.python import log
25
25
26 import foolscap
26 import foolscap
27 try:
28 from foolscap.api import Tub, UnauthenticatedTub
29 except ImportError:
27 from foolscap import Tub, UnauthenticatedTub
30 from foolscap import Tub, UnauthenticatedTub
28
31
29 from IPython.config.loader import Config
32 from IPython.config.loader import Config
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
33 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31 from IPython.kernel.error import SecurityError
34 from IPython.kernel.error import SecurityError
32
35
33 from IPython.utils.importstring import import_item
36 from IPython.utils.importstring import import_item
34 from IPython.utils.path import expand_path
37 from IPython.utils.path import expand_path
35 from IPython.utils.traitlets import Int, Str, Bool, Instance
38 from IPython.utils.traitlets import Int, Str, Bool, Instance
36
39
37 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
38 # Code
41 # Code
39 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
40
43
41
44
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
45 # We do this so if a user doesn't have OpenSSL installed, it will try to use
43 # an UnauthenticatedTub. But, they will still run into problems if they
46 # an UnauthenticatedTub. But, they will still run into problems if they
44 # try to use encrypted furls.
47 # try to use encrypted furls.
45 try:
48 try:
46 import OpenSSL
49 import OpenSSL
47 except:
50 except:
48 Tub = UnauthenticatedTub
51 Tub = UnauthenticatedTub
49 have_crypto = False
52 have_crypto = False
50 else:
53 else:
51 have_crypto = True
54 have_crypto = True
52
55
53
56
54 class FURLError(Exception):
57 class FURLError(Exception):
55 pass
58 pass
56
59
57
60
58 def check_furl_file_security(furl_file, secure):
61 def check_furl_file_security(furl_file, secure):
59 """Remove the old furl_file if changing security modes."""
62 """Remove the old furl_file if changing security modes."""
60 furl_file = expand_path(furl_file)
63 furl_file = expand_path(furl_file)
61 if os.path.isfile(furl_file):
64 if os.path.isfile(furl_file):
62 with open(furl_file, 'r') as f:
65 with open(furl_file, 'r') as f:
63 oldfurl = f.read().strip()
66 oldfurl = f.read().strip()
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
67 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
65 os.remove(furl_file)
68 os.remove(furl_file)
66
69
67
70
68 def is_secure(furl):
71 def is_secure(furl):
69 """Is the given FURL secure or not."""
72 """Is the given FURL secure or not."""
70 if is_valid_furl(furl):
73 if is_valid_furl(furl):
71 if furl.startswith("pb://"):
74 if furl.startswith("pb://"):
72 return True
75 return True
73 elif furl.startswith("pbu://"):
76 elif furl.startswith("pbu://"):
74 return False
77 return False
75 else:
78 else:
76 raise FURLError("invalid FURL: %s" % furl)
79 raise FURLError("invalid FURL: %s" % furl)
77
80
78
81
79 def is_valid_furl(furl):
82 def is_valid_furl(furl):
80 """Is the str a valid FURL or not."""
83 """Is the str a valid FURL or not."""
81 if isinstance(furl, str):
84 if isinstance(furl, str):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
85 if furl.startswith("pb://") or furl.startswith("pbu://"):
83 return True
86 return True
84 else:
87 else:
85 return False
88 return False
86 else:
89 else:
87 return False
90 return False
88
91
89
92
90 def is_valid_furl_file(furl_or_file):
93 def is_valid_furl_file(furl_or_file):
91 """See if furl_or_file exists and contains a valid FURL.
94 """See if furl_or_file exists and contains a valid FURL.
92
95
93 This doesn't try to read the contents because often we have to validate
96 This doesn't try to read the contents because often we have to validate
94 FURL files that are created, but don't yet have a FURL written to them.
97 FURL files that are created, but don't yet have a FURL written to them.
95 """
98 """
96 if isinstance(furl_or_file, (str, unicode)):
99 if isinstance(furl_or_file, (str, unicode)):
97 path, furl_filename = os.path.split(furl_or_file)
100 path, furl_filename = os.path.split(furl_or_file)
98 if os.path.isdir(path) and furl_filename.endswith('.furl'):
101 if os.path.isdir(path) and furl_filename.endswith('.furl'):
99 return True
102 return True
100 return False
103 return False
101
104
102
105
103 def find_furl(furl_or_file):
106 def find_furl(furl_or_file):
104 """Find, validate and return a FURL in a string or file.
107 """Find, validate and return a FURL in a string or file.
105
108
106 This calls :func:`IPython.utils.path.expand_path` on the argument to
109 This calls :func:`IPython.utils.path.expand_path` on the argument to
107 properly handle ``~`` and ``$`` variables in the path.
110 properly handle ``~`` and ``$`` variables in the path.
108 """
111 """
109 if is_valid_furl(furl_or_file):
112 if is_valid_furl(furl_or_file):
110 return furl_or_file
113 return furl_or_file
111 furl_or_file = expand_path(furl_or_file)
114 furl_or_file = expand_path(furl_or_file)
112 if is_valid_furl_file(furl_or_file):
115 if is_valid_furl_file(furl_or_file):
113 with open(furl_or_file, 'r') as f:
116 with open(furl_or_file, 'r') as f:
114 furl = f.read().strip()
117 furl = f.read().strip()
115 if is_valid_furl(furl):
118 if is_valid_furl(furl):
116 return furl
119 return furl
117 raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file)
120 raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file)
118
121
119
122
120 def is_valid_furl_or_file(furl_or_file):
123 def is_valid_furl_or_file(furl_or_file):
121 """Validate a FURL or a FURL file.
124 """Validate a FURL or a FURL file.
122
125
123 If ``furl_or_file`` looks like a file, we simply make sure its directory
126 If ``furl_or_file`` looks like a file, we simply make sure its directory
124 exists and that it has a ``.furl`` file extension. We don't try to see
127 exists and that it has a ``.furl`` file extension. We don't try to see
125 if the FURL file exists or to read its contents. This is useful for
128 if the FURL file exists or to read its contents. This is useful for
126 cases where auto re-connection is being used.
129 cases where auto re-connection is being used.
127 """
130 """
128 if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file):
131 if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file):
129 return True
132 return True
130 else:
133 else:
131 return False
134 return False
132
135
133
136
134 def validate_furl_or_file(furl_or_file):
137 def validate_furl_or_file(furl_or_file):
135 """Like :func:`is_valid_furl_or_file`, but raises an error."""
138 """Like :func:`is_valid_furl_or_file`, but raises an error."""
136 if not is_valid_furl_or_file(furl_or_file):
139 if not is_valid_furl_or_file(furl_or_file):
137 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
140 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
138
141
139
142
140 def get_temp_furlfile(filename):
143 def get_temp_furlfile(filename):
141 """Return a temporary FURL file."""
144 """Return a temporary FURL file."""
142 return tempfile.mktemp(dir=os.path.dirname(filename),
145 return tempfile.mktemp(dir=os.path.dirname(filename),
143 prefix=os.path.basename(filename))
146 prefix=os.path.basename(filename))
144
147
145
148
146 def make_tub(ip, port, secure, cert_file):
149 def make_tub(ip, port, secure, cert_file):
147 """Create a listening tub given an ip, port, and cert_file location.
150 """Create a listening tub given an ip, port, and cert_file location.
148
151
149 Parameters
152 Parameters
150 ----------
153 ----------
151 ip : str
154 ip : str
152 The ip address or hostname that the tub should listen on.
155 The ip address or hostname that the tub should listen on.
153 Empty means all interfaces.
156 Empty means all interfaces.
154 port : int
157 port : int
155 The port that the tub should listen on. A value of 0 means
158 The port that the tub should listen on. A value of 0 means
156 pick a random port
159 pick a random port
157 secure: bool
160 secure: bool
158 Will the connection be secure (in the Foolscap sense).
161 Will the connection be secure (in the Foolscap sense).
159 cert_file: str
162 cert_file: str
160 A filename of a file to be used for theSSL certificate.
163 A filename of a file to be used for theSSL certificate.
161
164
162 Returns
165 Returns
163 -------
166 -------
164 A tub, listener tuple.
167 A tub, listener tuple.
165 """
168 """
166 if secure:
169 if secure:
167 if have_crypto:
170 if have_crypto:
168 tub = Tub(certFile=cert_file)
171 tub = Tub(certFile=cert_file)
169 else:
172 else:
170 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
173 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
171 "can't run in secure mode. Try running without "
174 "can't run in secure mode. Try running without "
172 "security using 'ipcontroller -xy'.")
175 "security using 'ipcontroller -xy'.")
173 else:
176 else:
174 tub = UnauthenticatedTub()
177 tub = UnauthenticatedTub()
175
178
176 # Set the strport based on the ip and port and start listening
179 # Set the strport based on the ip and port and start listening
177 if ip == '':
180 if ip == '':
178 strport = "tcp:%i" % port
181 strport = "tcp:%i" % port
179 else:
182 else:
180 strport = "tcp:%i:interface=%s" % (port, ip)
183 strport = "tcp:%i:interface=%s" % (port, ip)
181 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
184 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
182 listener = tub.listenOn(strport)
185 listener = tub.listenOn(strport)
183
186
184 return tub, listener
187 return tub, listener
185
188
186
189
187 class FCServiceFactory(AdaptedConfiguredObjectFactory):
190 class FCServiceFactory(AdaptedConfiguredObjectFactory):
188 """This class creates a tub with various services running in it.
191 """This class creates a tub with various services running in it.
189
192
190 The basic idea is that :meth:`create` returns a running :class:`Tub`
193 The basic idea is that :meth:`create` returns a running :class:`Tub`
191 instance that has a number of Foolscap references registered in it.
194 instance that has a number of Foolscap references registered in it.
192 This class is a subclass of :class:`IPython.core.component.Component`
195 This class is a subclass of :class:`IPython.core.component.Component`
193 so the IPython configuration and component system are used.
196 so the IPython configuration and component system are used.
194
197
195 Attributes
198 Attributes
196 ----------
199 ----------
197 interfaces : Config
200 interfaces : Config
198 A Config instance whose values are sub-Config objects having two
201 A Config instance whose values are sub-Config objects having two
199 keys: furl_file and interface_chain.
202 keys: furl_file and interface_chain.
200
203
201 The other attributes are the standard ones for Foolscap.
204 The other attributes are the standard ones for Foolscap.
202 """
205 """
203
206
204 ip = Str('', config=True)
207 ip = Str('', config=True)
205 port = Int(0, config=True)
208 port = Int(0, config=True)
206 secure = Bool(True, config=True)
209 secure = Bool(True, config=True)
207 cert_file = Str('', config=True)
210 cert_file = Str('', config=True)
208 location = Str('', config=True)
211 location = Str('', config=True)
209 reuse_furls = Bool(False, config=True)
212 reuse_furls = Bool(False, config=True)
210 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
213 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
211
214
212 def __init__(self, config, adaptee):
215 def __init__(self, config, adaptee):
213 super(FCServiceFactory, self).__init__(config, adaptee)
216 super(FCServiceFactory, self).__init__(config, adaptee)
214 self._check_reuse_furls()
217 self._check_reuse_furls()
215
218
216 def _ip_changed(self, name, old, new):
219 def _ip_changed(self, name, old, new):
217 if new == 'localhost' or new == '127.0.0.1':
220 if new == 'localhost' or new == '127.0.0.1':
218 self.location = '127.0.0.1'
221 self.location = '127.0.0.1'
219
222
220 def _check_reuse_furls(self):
223 def _check_reuse_furls(self):
221 furl_files = [i.furl_file for i in self.interfaces.values()]
224 furl_files = [i.furl_file for i in self.interfaces.values()]
222 for ff in furl_files:
225 for ff in furl_files:
223 fullfile = self._get_security_file(ff)
226 fullfile = self._get_security_file(ff)
224 if self.reuse_furls:
227 if self.reuse_furls:
225 if self.port==0:
228 if self.port==0:
226 raise FURLError("You are trying to reuse the FURL file "
229 raise FURLError("You are trying to reuse the FURL file "
227 "for this connection, but the port for this connection "
230 "for this connection, but the port for this connection "
228 "is set to 0 (autoselect). To reuse the FURL file "
231 "is set to 0 (autoselect). To reuse the FURL file "
229 "you need to specify specific port to listen on."
232 "you need to specify specific port to listen on."
230 )
233 )
231 else:
234 else:
232 log.msg("Reusing FURL file: %s" % fullfile)
235 log.msg("Reusing FURL file: %s" % fullfile)
233 else:
236 else:
234 if os.path.isfile(fullfile):
237 if os.path.isfile(fullfile):
235 log.msg("Removing old FURL file: %s" % fullfile)
238 log.msg("Removing old FURL file: %s" % fullfile)
236 os.remove(fullfile)
239 os.remove(fullfile)
237
240
238 def _get_security_file(self, filename):
241 def _get_security_file(self, filename):
239 return os.path.join(self.config.Global.security_dir, filename)
242 return os.path.join(self.config.Global.security_dir, filename)
240
243
241 def create(self):
244 def create(self):
242 """Create and return the Foolscap tub with everything running."""
245 """Create and return the Foolscap tub with everything running."""
243
246
244 self.tub, self.listener = make_tub(
247 self.tub, self.listener = make_tub(
245 self.ip, self.port, self.secure,
248 self.ip, self.port, self.secure,
246 self._get_security_file(self.cert_file)
249 self._get_security_file(self.cert_file)
247 )
250 )
248 # log.msg("Interfaces to register [%r]: %r" % \
251 # log.msg("Interfaces to register [%r]: %r" % \
249 # (self.__class__, self.interfaces))
252 # (self.__class__, self.interfaces))
250 if not self.secure:
253 if not self.secure:
251 log.msg("WARNING: running with no security: %s" % \
254 log.msg("WARNING: running with no security: %s" % \
252 self.__class__.__name__)
255 self.__class__.__name__)
253 reactor.callWhenRunning(self.set_location_and_register)
256 reactor.callWhenRunning(self.set_location_and_register)
254 return self.tub
257 return self.tub
255
258
256 def set_location_and_register(self):
259 def set_location_and_register(self):
257 """Set the location for the tub and return a deferred."""
260 """Set the location for the tub and return a deferred."""
258
261
259 if self.location == '':
262 if self.location == '':
260 d = self.tub.setLocationAutomatically()
263 d = self.tub.setLocationAutomatically()
261 else:
264 else:
262 d = defer.maybeDeferred(self.tub.setLocation,
265 d = defer.maybeDeferred(self.tub.setLocation,
263 "%s:%i" % (self.location, self.listener.getPortnum()))
266 "%s:%i" % (self.location, self.listener.getPortnum()))
264 self.adapt_to_interfaces(d)
267 self.adapt_to_interfaces(d)
265
268
266 def adapt_to_interfaces(self, d):
269 def adapt_to_interfaces(self, d):
267 """Run through the interfaces, adapt and register."""
270 """Run through the interfaces, adapt and register."""
268
271
269 for ifname, ifconfig in self.interfaces.iteritems():
272 for ifname, ifconfig in self.interfaces.iteritems():
270 ff = self._get_security_file(ifconfig.furl_file)
273 ff = self._get_security_file(ifconfig.furl_file)
271 log.msg("Adapting [%s] to interface: %s" % \
274 log.msg("Adapting [%s] to interface: %s" % \
272 (self.adaptee.__class__.__name__, ifname))
275 (self.adaptee.__class__.__name__, ifname))
273 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
276 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
274 check_furl_file_security(ff, self.secure)
277 check_furl_file_security(ff, self.secure)
275 adaptee = self.adaptee
278 adaptee = self.adaptee
276 for i in ifconfig.interface_chain:
279 for i in ifconfig.interface_chain:
277 adaptee = import_item(i)(adaptee)
280 adaptee = import_item(i)(adaptee)
278 d.addCallback(self.register, adaptee, furl_file=ff)
281 d.addCallback(self.register, adaptee, furl_file=ff)
279
282
280 def register(self, empty, ref, furl_file):
283 def register(self, empty, ref, furl_file):
281 """Register the reference with the FURL file.
284 """Register the reference with the FURL file.
282
285
283 The FURL file is created and then moved to make sure that when the
286 The FURL file is created and then moved to make sure that when the
284 file appears, the buffer has been flushed and the file closed. This
287 file appears, the buffer has been flushed and the file closed. This
285 is not done if we are re-using FURLS however.
288 is not done if we are re-using FURLS however.
286 """
289 """
287 if self.reuse_furls:
290 if self.reuse_furls:
288 self.tub.registerReference(ref, furlFile=furl_file)
291 self.tub.registerReference(ref, furlFile=furl_file)
289 else:
292 else:
290 temp_furl_file = get_temp_furlfile(furl_file)
293 temp_furl_file = get_temp_furlfile(furl_file)
291 self.tub.registerReference(ref, furlFile=temp_furl_file)
294 self.tub.registerReference(ref, furlFile=temp_furl_file)
292 os.rename(temp_furl_file, furl_file)
295 os.rename(temp_furl_file, furl_file)
293
296
@@ -1,901 +1,905 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import warnings
20 import warnings
21
21
22 from twisted.python import components
22 from twisted.python import components
23 from twisted.python.failure import Failure
23 from twisted.python.failure import Failure
24 from zope.interface import Interface, implements, Attribute
24 from zope.interface import Interface, implements, Attribute
25
26 try:
27 from foolscap.api import DeadReferenceError
28 except ImportError:
25 from foolscap import DeadReferenceError
29 from foolscap import DeadReferenceError
26
30
27 from IPython.utils.coloransi import TermColors
31 from IPython.utils.coloransi import TermColors
28
32
29 from IPython.kernel.twistedutil import blockingCallFromThread
33 from IPython.kernel.twistedutil import blockingCallFromThread
30 from IPython.kernel import error
34 from IPython.kernel import error
31 from IPython.kernel.parallelfunction import ParallelFunction
35 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.mapper import (
36 from IPython.kernel.mapper import (
33 MultiEngineMapper,
37 MultiEngineMapper,
34 IMultiEngineMapperFactory,
38 IMultiEngineMapperFactory,
35 IMapper
39 IMapper
36 )
40 )
37
41
38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
42 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
39
43
40
44
41 #-------------------------------------------------------------------------------
45 #-------------------------------------------------------------------------------
42 # Pending Result things
46 # Pending Result things
43 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
44
48
45 class IPendingResult(Interface):
49 class IPendingResult(Interface):
46 """A representation of a result that is pending.
50 """A representation of a result that is pending.
47
51
48 This class is similar to Twisted's `Deferred` object, but is designed to be
52 This class is similar to Twisted's `Deferred` object, but is designed to be
49 used in a synchronous context.
53 used in a synchronous context.
50 """
54 """
51
55
52 result_id=Attribute("ID of the deferred on the other side")
56 result_id=Attribute("ID of the deferred on the other side")
53 client=Attribute("A client that I came from")
57 client=Attribute("A client that I came from")
54 r=Attribute("An attribute that is a property that calls and returns get_result")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
55
59
56 def get_result(default=None, block=True):
60 def get_result(default=None, block=True):
57 """
61 """
58 Get a result that is pending.
62 Get a result that is pending.
59
63
60 :Parameters:
64 :Parameters:
61 default
65 default
62 The value to return if the result is not ready.
66 The value to return if the result is not ready.
63 block : boolean
67 block : boolean
64 Should I block for the result.
68 Should I block for the result.
65
69
66 :Returns: The actual result or the default value.
70 :Returns: The actual result or the default value.
67 """
71 """
68
72
69 def add_callback(f, *args, **kwargs):
73 def add_callback(f, *args, **kwargs):
70 """
74 """
71 Add a callback that is called with the result.
75 Add a callback that is called with the result.
72
76
73 If the original result is foo, adding a callback will cause
77 If the original result is foo, adding a callback will cause
74 f(foo, *args, **kwargs) to be returned instead. If multiple
78 f(foo, *args, **kwargs) to be returned instead. If multiple
75 callbacks are registered, they are chained together: the result of
79 callbacks are registered, they are chained together: the result of
76 one is passed to the next and so on.
80 one is passed to the next and so on.
77
81
78 Unlike Twisted's Deferred object, there is no errback chain. Thus
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
79 any exception raised will not be caught and handled. User must
83 any exception raised will not be caught and handled. User must
80 catch these by hand when calling `get_result`.
84 catch these by hand when calling `get_result`.
81 """
85 """
82
86
83
87
84 class PendingResult(object):
88 class PendingResult(object):
85 """A representation of a result that is not yet ready.
89 """A representation of a result that is not yet ready.
86
90
87 A user should not create a `PendingResult` instance by hand.
91 A user should not create a `PendingResult` instance by hand.
88
92
89 Methods:
93 Methods:
90
94
91 * `get_result`
95 * `get_result`
92 * `add_callback`
96 * `add_callback`
93
97
94 Properties:
98 Properties:
95
99
96 * `r`
100 * `r`
97 """
101 """
98
102
99 def __init__(self, client, result_id):
103 def __init__(self, client, result_id):
100 """Create a PendingResult with a result_id and a client instance.
104 """Create a PendingResult with a result_id and a client instance.
101
105
102 The client should implement `_getPendingResult(result_id, block)`.
106 The client should implement `_getPendingResult(result_id, block)`.
103 """
107 """
104 self.client = client
108 self.client = client
105 self.result_id = result_id
109 self.result_id = result_id
106 self.called = False
110 self.called = False
107 self.raised = False
111 self.raised = False
108 self.callbacks = []
112 self.callbacks = []
109
113
110 def get_result(self, default=None, block=True):
114 def get_result(self, default=None, block=True):
111 """Get a result that is pending.
115 """Get a result that is pending.
112
116
113 This method will connect to an IMultiEngine adapted controller
117 This method will connect to an IMultiEngine adapted controller
114 and see if the result is ready. If the action triggers an exception
118 and see if the result is ready. If the action triggers an exception
115 raise it and record it. This method records the result/exception once it is
119 raise it and record it. This method records the result/exception once it is
116 retrieved. Calling `get_result` again will get this cached result or will
120 retrieved. Calling `get_result` again will get this cached result or will
117 re-raise the exception. The .r attribute is a property that calls
121 re-raise the exception. The .r attribute is a property that calls
118 `get_result` with block=True.
122 `get_result` with block=True.
119
123
120 :Parameters:
124 :Parameters:
121 default
125 default
122 The value to return if the result is not ready.
126 The value to return if the result is not ready.
123 block : boolean
127 block : boolean
124 Should I block for the result.
128 Should I block for the result.
125
129
126 :Returns: The actual result or the default value.
130 :Returns: The actual result or the default value.
127 """
131 """
128
132
129 if self.called:
133 if self.called:
130 if self.raised:
134 if self.raised:
131 raise self.result[0], self.result[1], self.result[2]
135 raise self.result[0], self.result[1], self.result[2]
132 else:
136 else:
133 return self.result
137 return self.result
134 try:
138 try:
135 result = self.client.get_pending_deferred(self.result_id, block)
139 result = self.client.get_pending_deferred(self.result_id, block)
136 except error.ResultNotCompleted:
140 except error.ResultNotCompleted:
137 return default
141 return default
138 except:
142 except:
139 # Reraise other error, but first record them so they can be reraised
143 # Reraise other error, but first record them so they can be reraised
140 # later if .r or get_result is called again.
144 # later if .r or get_result is called again.
141 self.result = sys.exc_info()
145 self.result = sys.exc_info()
142 self.called = True
146 self.called = True
143 self.raised = True
147 self.raised = True
144 raise
148 raise
145 else:
149 else:
146 for cb in self.callbacks:
150 for cb in self.callbacks:
147 result = cb[0](result, *cb[1], **cb[2])
151 result = cb[0](result, *cb[1], **cb[2])
148 self.result = result
152 self.result = result
149 self.called = True
153 self.called = True
150 return result
154 return result
151
155
152 def add_callback(self, f, *args, **kwargs):
156 def add_callback(self, f, *args, **kwargs):
153 """Add a callback that is called with the result.
157 """Add a callback that is called with the result.
154
158
155 If the original result is result, adding a callback will cause
159 If the original result is result, adding a callback will cause
156 f(result, *args, **kwargs) to be returned instead. If multiple
160 f(result, *args, **kwargs) to be returned instead. If multiple
157 callbacks are registered, they are chained together: the result of
161 callbacks are registered, they are chained together: the result of
158 one is passed to the next and so on.
162 one is passed to the next and so on.
159
163
160 Unlike Twisted's Deferred object, there is no errback chain. Thus
164 Unlike Twisted's Deferred object, there is no errback chain. Thus
161 any exception raised will not be caught and handled. User must
165 any exception raised will not be caught and handled. User must
162 catch these by hand when calling `get_result`.
166 catch these by hand when calling `get_result`.
163 """
167 """
164 assert callable(f)
168 assert callable(f)
165 self.callbacks.append((f, args, kwargs))
169 self.callbacks.append((f, args, kwargs))
166
170
167 def __cmp__(self, other):
171 def __cmp__(self, other):
168 if self.result_id < other.result_id:
172 if self.result_id < other.result_id:
169 return -1
173 return -1
170 else:
174 else:
171 return 1
175 return 1
172
176
173 def _get_r(self):
177 def _get_r(self):
174 return self.get_result(block=True)
178 return self.get_result(block=True)
175
179
176 r = property(_get_r)
180 r = property(_get_r)
177 """This property is a shortcut to a `get_result(block=True)`."""
181 """This property is a shortcut to a `get_result(block=True)`."""
178
182
179
183
180 #-------------------------------------------------------------------------------
184 #-------------------------------------------------------------------------------
181 # Pretty printing wrappers for certain lists
185 # Pretty printing wrappers for certain lists
182 #-------------------------------------------------------------------------------
186 #-------------------------------------------------------------------------------
183
187
184 class ResultList(list):
188 class ResultList(list):
185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
189 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
186
190
187 def __repr__(self):
191 def __repr__(self):
188 output = []
192 output = []
189 # These colored prompts were not working on Windows
193 # These colored prompts were not working on Windows
190 if sys.platform == 'win32':
194 if sys.platform == 'win32':
191 blue = normal = red = green = ''
195 blue = normal = red = green = ''
192 else:
196 else:
193 blue = TermColors.Blue
197 blue = TermColors.Blue
194 normal = TermColors.Normal
198 normal = TermColors.Normal
195 red = TermColors.Red
199 red = TermColors.Red
196 green = TermColors.Green
200 green = TermColors.Green
197 output.append("<Results List>\n")
201 output.append("<Results List>\n")
198 for cmd in self:
202 for cmd in self:
199 if isinstance(cmd, Failure):
203 if isinstance(cmd, Failure):
200 output.append(cmd)
204 output.append(cmd)
201 else:
205 else:
202 target = cmd.get('id',None)
206 target = cmd.get('id',None)
203 cmd_num = cmd.get('number',None)
207 cmd_num = cmd.get('number',None)
204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
208 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
205 cmd_stdout = cmd.get('stdout', None)
209 cmd_stdout = cmd.get('stdout', None)
206 cmd_stderr = cmd.get('stderr', None)
210 cmd_stderr = cmd.get('stderr', None)
207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
211 output.append("%s[%i]%s In [%i]:%s %s\n" % \
208 (green, target,
212 (green, target,
209 blue, cmd_num, normal, cmd_stdin))
213 blue, cmd_num, normal, cmd_stdin))
210 if cmd_stdout:
214 if cmd_stdout:
211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
215 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
212 (green, target,
216 (green, target,
213 red, cmd_num, normal, cmd_stdout))
217 red, cmd_num, normal, cmd_stdout))
214 if cmd_stderr:
218 if cmd_stderr:
215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
219 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
216 (green, target,
220 (green, target,
217 red, cmd_num, normal, cmd_stderr))
221 red, cmd_num, normal, cmd_stderr))
218 return ''.join(output)
222 return ''.join(output)
219
223
220
224
221 def wrapResultList(result):
225 def wrapResultList(result):
222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
226 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
223 if len(result) == 0:
227 if len(result) == 0:
224 result = [result]
228 result = [result]
225 return ResultList(result)
229 return ResultList(result)
226
230
227
231
228 class QueueStatusList(list):
232 class QueueStatusList(list):
229 """A subclass of list that pretty prints the output of `queue_status`."""
233 """A subclass of list that pretty prints the output of `queue_status`."""
230
234
231 def __repr__(self):
235 def __repr__(self):
232 output = []
236 output = []
233 output.append("<Queue Status List>\n")
237 output.append("<Queue Status List>\n")
234 for e in self:
238 for e in self:
235 output.append("Engine: %s\n" % repr(e[0]))
239 output.append("Engine: %s\n" % repr(e[0]))
236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
240 output.append(" Pending: %s\n" % repr(e[1]['pending']))
237 for q in e[1]['queue']:
241 for q in e[1]['queue']:
238 output.append(" Command: %s\n" % repr(q))
242 output.append(" Command: %s\n" % repr(q))
239 return ''.join(output)
243 return ''.join(output)
240
244
241
245
242 #-------------------------------------------------------------------------------
246 #-------------------------------------------------------------------------------
243 # InteractiveMultiEngineClient
247 # InteractiveMultiEngineClient
244 #-------------------------------------------------------------------------------
248 #-------------------------------------------------------------------------------
245
249
246 class InteractiveMultiEngineClient(object):
250 class InteractiveMultiEngineClient(object):
247 """A mixin class that add a few methods to a multiengine client.
251 """A mixin class that add a few methods to a multiengine client.
248
252
249 The methods in this mixin class are designed for interactive usage.
253 The methods in this mixin class are designed for interactive usage.
250 """
254 """
251
255
252 def activate(self):
256 def activate(self):
253 """Make this `MultiEngineClient` active for parallel magic commands.
257 """Make this `MultiEngineClient` active for parallel magic commands.
254
258
255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
259 IPython has a magic command syntax to work with `MultiEngineClient` objects.
256 In a given IPython session there is a single active one. While
260 In a given IPython session there is a single active one. While
257 there can be many `MultiEngineClient` created and used by the user,
261 there can be many `MultiEngineClient` created and used by the user,
258 there is only one active one. The active `MultiEngineClient` is used whenever
262 there is only one active one. The active `MultiEngineClient` is used whenever
259 the magic commands %px and %autopx are used.
263 the magic commands %px and %autopx are used.
260
264
261 The activate() method is called on a given `MultiEngineClient` to make it
265 The activate() method is called on a given `MultiEngineClient` to make it
262 active. Once this has been done, the magic commands can be used.
266 active. Once this has been done, the magic commands can be used.
263 """
267 """
264
268
265 try:
269 try:
266 # This is injected into __builtins__.
270 # This is injected into __builtins__.
267 ip = get_ipython()
271 ip = get_ipython()
268 except NameError:
272 except NameError:
269 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
273 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
270 else:
274 else:
271 pmagic = ip.get_component('parallel_magic')
275 pmagic = ip.get_component('parallel_magic')
272 if pmagic is not None:
276 if pmagic is not None:
273 pmagic.active_multiengine_client = self
277 pmagic.active_multiengine_client = self
274 else:
278 else:
275 print "You must first load the parallelmagic extension " \
279 print "You must first load the parallelmagic extension " \
276 "by doing '%load_ext parallelmagic'"
280 "by doing '%load_ext parallelmagic'"
277
281
278 def __setitem__(self, key, value):
282 def __setitem__(self, key, value):
279 """Add a dictionary interface for pushing/pulling.
283 """Add a dictionary interface for pushing/pulling.
280
284
281 This functions as a shorthand for `push`.
285 This functions as a shorthand for `push`.
282
286
283 :Parameters:
287 :Parameters:
284 key : str
288 key : str
285 What to call the remote object.
289 What to call the remote object.
286 value : object
290 value : object
287 The local Python object to push.
291 The local Python object to push.
288 """
292 """
289 targets, block = self._findTargetsAndBlock()
293 targets, block = self._findTargetsAndBlock()
290 return self.push({key:value}, targets=targets, block=block)
294 return self.push({key:value}, targets=targets, block=block)
291
295
292 def __getitem__(self, key):
296 def __getitem__(self, key):
293 """Add a dictionary interface for pushing/pulling.
297 """Add a dictionary interface for pushing/pulling.
294
298
295 This functions as a shorthand to `pull`.
299 This functions as a shorthand to `pull`.
296
300
297 :Parameters:
301 :Parameters:
298 - `key`: A string representing the key.
302 - `key`: A string representing the key.
299 """
303 """
300 if isinstance(key, str):
304 if isinstance(key, str):
301 targets, block = self._findTargetsAndBlock()
305 targets, block = self._findTargetsAndBlock()
302 return self.pull(key, targets=targets, block=block)
306 return self.pull(key, targets=targets, block=block)
303 else:
307 else:
304 raise TypeError("__getitem__ only takes strs")
308 raise TypeError("__getitem__ only takes strs")
305
309
306 def __len__(self):
310 def __len__(self):
307 """Return the number of available engines."""
311 """Return the number of available engines."""
308 return len(self.get_ids())
312 return len(self.get_ids())
309
313
310
314
311 #-------------------------------------------------------------------------------
315 #-------------------------------------------------------------------------------
312 # The top-level MultiEngine client adaptor
316 # The top-level MultiEngine client adaptor
313 #-------------------------------------------------------------------------------
317 #-------------------------------------------------------------------------------
314
318
315
319
316 _prop_warn = """\
320 _prop_warn = """\
317
321
318 We are currently refactoring the task dependency system. This might
322 We are currently refactoring the task dependency system. This might
319 involve the removal of this method and other methods related to engine
323 involve the removal of this method and other methods related to engine
320 properties. Please see the docstrings for IPython.kernel.TaskRejectError
324 properties. Please see the docstrings for IPython.kernel.TaskRejectError
321 for more information."""
325 for more information."""
322
326
323
327
324 class IFullBlockingMultiEngineClient(Interface):
328 class IFullBlockingMultiEngineClient(Interface):
325 pass
329 pass
326
330
327
331
328 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
332 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
329 """
333 """
330 A blocking client to the `IMultiEngine` controller interface.
334 A blocking client to the `IMultiEngine` controller interface.
331
335
332 This class allows users to use a set of engines for a parallel
336 This class allows users to use a set of engines for a parallel
333 computation through the `IMultiEngine` interface. In this interface,
337 computation through the `IMultiEngine` interface. In this interface,
334 each engine has a specific id (an int) that is used to refer to the
338 each engine has a specific id (an int) that is used to refer to the
335 engine, run code on it, etc.
339 engine, run code on it, etc.
336 """
340 """
337
341
338 implements(
342 implements(
339 IFullBlockingMultiEngineClient,
343 IFullBlockingMultiEngineClient,
340 IMultiEngineMapperFactory,
344 IMultiEngineMapperFactory,
341 IMapper
345 IMapper
342 )
346 )
343
347
344 def __init__(self, smultiengine):
348 def __init__(self, smultiengine):
345 self.smultiengine = smultiengine
349 self.smultiengine = smultiengine
346 self.block = True
350 self.block = True
347 self.targets = 'all'
351 self.targets = 'all'
348
352
349 def _findBlock(self, block=None):
353 def _findBlock(self, block=None):
350 if block is None:
354 if block is None:
351 return self.block
355 return self.block
352 else:
356 else:
353 if block in (True, False):
357 if block in (True, False):
354 return block
358 return block
355 else:
359 else:
356 raise ValueError("block must be True or False")
360 raise ValueError("block must be True or False")
357
361
358 def _findTargets(self, targets=None):
362 def _findTargets(self, targets=None):
359 if targets is None:
363 if targets is None:
360 return self.targets
364 return self.targets
361 else:
365 else:
362 if not isinstance(targets, (str,list,tuple,int)):
366 if not isinstance(targets, (str,list,tuple,int)):
363 raise ValueError("targets must be a str, list, tuple or int")
367 raise ValueError("targets must be a str, list, tuple or int")
364 return targets
368 return targets
365
369
366 def _findTargetsAndBlock(self, targets=None, block=None):
370 def _findTargetsAndBlock(self, targets=None, block=None):
367 return self._findTargets(targets), self._findBlock(block)
371 return self._findTargets(targets), self._findBlock(block)
368
372
369 def _bcft(self, *args, **kwargs):
373 def _bcft(self, *args, **kwargs):
370 try:
374 try:
371 result = blockingCallFromThread(*args, **kwargs)
375 result = blockingCallFromThread(*args, **kwargs)
372 except DeadReferenceError:
376 except DeadReferenceError:
373 raise error.ConnectionError(
377 raise error.ConnectionError(
374 """A connection error has occurred in trying to connect to the
378 """A connection error has occurred in trying to connect to the
375 controller. This is usually caused by the controller dying or
379 controller. This is usually caused by the controller dying or
376 being restarted. To resolve this issue try recreating the
380 being restarted. To resolve this issue try recreating the
377 multiengine client."""
381 multiengine client."""
378 )
382 )
379 else:
383 else:
380 return result
384 return result
381
385
382 def _blockFromThread(self, function, *args, **kwargs):
386 def _blockFromThread(self, function, *args, **kwargs):
383 block = kwargs.get('block', None)
387 block = kwargs.get('block', None)
384 if block is None:
388 if block is None:
385 raise error.MissingBlockArgument("'block' keyword argument is missing")
389 raise error.MissingBlockArgument("'block' keyword argument is missing")
386 result = self._bcft(function, *args, **kwargs)
390 result = self._bcft(function, *args, **kwargs)
387 if not block:
391 if not block:
388 result = PendingResult(self, result)
392 result = PendingResult(self, result)
389 return result
393 return result
390
394
391 def get_pending_deferred(self, deferredID, block):
395 def get_pending_deferred(self, deferredID, block):
392 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
396 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
393
397
394 def barrier(self, pendingResults):
398 def barrier(self, pendingResults):
395 """Synchronize a set of `PendingResults`.
399 """Synchronize a set of `PendingResults`.
396
400
397 This method is a synchronization primitive that waits for a set of
401 This method is a synchronization primitive that waits for a set of
398 `PendingResult` objects to complete. More specifically, barier does
402 `PendingResult` objects to complete. More specifically, barier does
399 the following.
403 the following.
400
404
401 * The `PendingResult`s are sorted by result_id.
405 * The `PendingResult`s are sorted by result_id.
402 * The `get_result` method is called for each `PendingResult` sequentially
406 * The `get_result` method is called for each `PendingResult` sequentially
403 with block=True.
407 with block=True.
404 * If a `PendingResult` gets a result that is an exception, it is
408 * If a `PendingResult` gets a result that is an exception, it is
405 trapped and can be re-raised later by calling `get_result` again.
409 trapped and can be re-raised later by calling `get_result` again.
406 * The `PendingResult`s are flushed from the controller.
410 * The `PendingResult`s are flushed from the controller.
407
411
408 After barrier has been called on a `PendingResult`, its results can
412 After barrier has been called on a `PendingResult`, its results can
409 be retrieved by calling `get_result` again or accesing the `r` attribute
413 be retrieved by calling `get_result` again or accesing the `r` attribute
410 of the instance.
414 of the instance.
411 """
415 """
412
416
413 # Convert to list for sorting and check class type
417 # Convert to list for sorting and check class type
414 prList = list(pendingResults)
418 prList = list(pendingResults)
415 for pr in prList:
419 for pr in prList:
416 if not isinstance(pr, PendingResult):
420 if not isinstance(pr, PendingResult):
417 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
421 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
418
422
419 # Sort the PendingResults so they are in order
423 # Sort the PendingResults so they are in order
420 prList.sort()
424 prList.sort()
421 # Block on each PendingResult object
425 # Block on each PendingResult object
422 for pr in prList:
426 for pr in prList:
423 try:
427 try:
424 result = pr.get_result(block=True)
428 result = pr.get_result(block=True)
425 except Exception:
429 except Exception:
426 pass
430 pass
427
431
428 def flush(self):
432 def flush(self):
429 """
433 """
430 Clear all pending deferreds/results from the controller.
434 Clear all pending deferreds/results from the controller.
431
435
432 For each `PendingResult` that is created by this client, the controller
436 For each `PendingResult` that is created by this client, the controller
433 holds on to the result for that `PendingResult`. This can be a problem
437 holds on to the result for that `PendingResult`. This can be a problem
434 if there are a large number of `PendingResult` objects that are created.
438 if there are a large number of `PendingResult` objects that are created.
435
439
436 Once the result of the `PendingResult` has been retrieved, the result
440 Once the result of the `PendingResult` has been retrieved, the result
437 is removed from the controller, but if a user doesn't get a result (
441 is removed from the controller, but if a user doesn't get a result (
438 they just ignore the `PendingResult`) the result is kept forever on the
442 they just ignore the `PendingResult`) the result is kept forever on the
439 controller. This method allows the user to clear out all un-retrieved
443 controller. This method allows the user to clear out all un-retrieved
440 results on the controller.
444 results on the controller.
441 """
445 """
442 r = self._bcft(self.smultiengine.clear_pending_deferreds)
446 r = self._bcft(self.smultiengine.clear_pending_deferreds)
443 return r
447 return r
444
448
445 clear_pending_results = flush
449 clear_pending_results = flush
446
450
447 #---------------------------------------------------------------------------
451 #---------------------------------------------------------------------------
448 # IEngineMultiplexer related methods
452 # IEngineMultiplexer related methods
449 #---------------------------------------------------------------------------
453 #---------------------------------------------------------------------------
450
454
451 def execute(self, lines, targets=None, block=None):
455 def execute(self, lines, targets=None, block=None):
452 """
456 """
453 Execute code on a set of engines.
457 Execute code on a set of engines.
454
458
455 :Parameters:
459 :Parameters:
456 lines : str
460 lines : str
457 The Python code to execute as a string
461 The Python code to execute as a string
458 targets : id or list of ids
462 targets : id or list of ids
459 The engine to use for the execution
463 The engine to use for the execution
460 block : boolean
464 block : boolean
461 If False, this method will return the actual result. If False,
465 If False, this method will return the actual result. If False,
462 a `PendingResult` is returned which can be used to get the result
466 a `PendingResult` is returned which can be used to get the result
463 at a later time.
467 at a later time.
464 """
468 """
465 targets, block = self._findTargetsAndBlock(targets, block)
469 targets, block = self._findTargetsAndBlock(targets, block)
466 result = self._bcft(self.smultiengine.execute, lines,
470 result = self._bcft(self.smultiengine.execute, lines,
467 targets=targets, block=block)
471 targets=targets, block=block)
468 if block:
472 if block:
469 result = ResultList(result)
473 result = ResultList(result)
470 else:
474 else:
471 result = PendingResult(self, result)
475 result = PendingResult(self, result)
472 result.add_callback(wrapResultList)
476 result.add_callback(wrapResultList)
473 return result
477 return result
474
478
475 def push(self, namespace, targets=None, block=None):
479 def push(self, namespace, targets=None, block=None):
476 """
480 """
477 Push a dictionary of keys and values to engines namespace.
481 Push a dictionary of keys and values to engines namespace.
478
482
479 Each engine has a persistent namespace. This method is used to push
483 Each engine has a persistent namespace. This method is used to push
480 Python objects into that namespace.
484 Python objects into that namespace.
481
485
482 The objects in the namespace must be pickleable.
486 The objects in the namespace must be pickleable.
483
487
484 :Parameters:
488 :Parameters:
485 namespace : dict
489 namespace : dict
486 A dict that contains Python objects to be injected into
490 A dict that contains Python objects to be injected into
487 the engine persistent namespace.
491 the engine persistent namespace.
488 targets : id or list of ids
492 targets : id or list of ids
489 The engine to use for the execution
493 The engine to use for the execution
490 block : boolean
494 block : boolean
491 If False, this method will return the actual result. If False,
495 If False, this method will return the actual result. If False,
492 a `PendingResult` is returned which can be used to get the result
496 a `PendingResult` is returned which can be used to get the result
493 at a later time.
497 at a later time.
494 """
498 """
495 targets, block = self._findTargetsAndBlock(targets, block)
499 targets, block = self._findTargetsAndBlock(targets, block)
496 return self._blockFromThread(self.smultiengine.push, namespace,
500 return self._blockFromThread(self.smultiengine.push, namespace,
497 targets=targets, block=block)
501 targets=targets, block=block)
498
502
499 def pull(self, keys, targets=None, block=None):
503 def pull(self, keys, targets=None, block=None):
500 """
504 """
501 Pull Python objects by key out of engines namespaces.
505 Pull Python objects by key out of engines namespaces.
502
506
503 :Parameters:
507 :Parameters:
504 keys : str or list of str
508 keys : str or list of str
505 The names of the variables to be pulled
509 The names of the variables to be pulled
506 targets : id or list of ids
510 targets : id or list of ids
507 The engine to use for the execution
511 The engine to use for the execution
508 block : boolean
512 block : boolean
509 If False, this method will return the actual result. If False,
513 If False, this method will return the actual result. If False,
510 a `PendingResult` is returned which can be used to get the result
514 a `PendingResult` is returned which can be used to get the result
511 at a later time.
515 at a later time.
512 """
516 """
513 targets, block = self._findTargetsAndBlock(targets, block)
517 targets, block = self._findTargetsAndBlock(targets, block)
514 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
518 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
515
519
516 def push_function(self, namespace, targets=None, block=None):
520 def push_function(self, namespace, targets=None, block=None):
517 """
521 """
518 Push a Python function to an engine.
522 Push a Python function to an engine.
519
523
520 This method is used to push a Python function to an engine. This
524 This method is used to push a Python function to an engine. This
521 method can then be used in code on the engines. Closures are not supported.
525 method can then be used in code on the engines. Closures are not supported.
522
526
523 :Parameters:
527 :Parameters:
524 namespace : dict
528 namespace : dict
525 A dict whose values are the functions to be pushed. The keys give
529 A dict whose values are the functions to be pushed. The keys give
526 that names that the function will appear as in the engines
530 that names that the function will appear as in the engines
527 namespace.
531 namespace.
528 targets : id or list of ids
532 targets : id or list of ids
529 The engine to use for the execution
533 The engine to use for the execution
530 block : boolean
534 block : boolean
531 If False, this method will return the actual result. If False,
535 If False, this method will return the actual result. If False,
532 a `PendingResult` is returned which can be used to get the result
536 a `PendingResult` is returned which can be used to get the result
533 at a later time.
537 at a later time.
534 """
538 """
535 targets, block = self._findTargetsAndBlock(targets, block)
539 targets, block = self._findTargetsAndBlock(targets, block)
536 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
540 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
537
541
538 def pull_function(self, keys, targets=None, block=None):
542 def pull_function(self, keys, targets=None, block=None):
539 """
543 """
540 Pull a Python function from an engine.
544 Pull a Python function from an engine.
541
545
542 This method is used to pull a Python function from an engine.
546 This method is used to pull a Python function from an engine.
543 Closures are not supported.
547 Closures are not supported.
544
548
545 :Parameters:
549 :Parameters:
546 keys : str or list of str
550 keys : str or list of str
547 The names of the functions to be pulled
551 The names of the functions to be pulled
548 targets : id or list of ids
552 targets : id or list of ids
549 The engine to use for the execution
553 The engine to use for the execution
550 block : boolean
554 block : boolean
551 If False, this method will return the actual result. If False,
555 If False, this method will return the actual result. If False,
552 a `PendingResult` is returned which can be used to get the result
556 a `PendingResult` is returned which can be used to get the result
553 at a later time.
557 at a later time.
554 """
558 """
555 targets, block = self._findTargetsAndBlock(targets, block)
559 targets, block = self._findTargetsAndBlock(targets, block)
556 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
560 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
557
561
558 def push_serialized(self, namespace, targets=None, block=None):
562 def push_serialized(self, namespace, targets=None, block=None):
559 targets, block = self._findTargetsAndBlock(targets, block)
563 targets, block = self._findTargetsAndBlock(targets, block)
560 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
564 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
561
565
562 def pull_serialized(self, keys, targets=None, block=None):
566 def pull_serialized(self, keys, targets=None, block=None):
563 targets, block = self._findTargetsAndBlock(targets, block)
567 targets, block = self._findTargetsAndBlock(targets, block)
564 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
568 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
565
569
566 def get_result(self, i=None, targets=None, block=None):
570 def get_result(self, i=None, targets=None, block=None):
567 """
571 """
568 Get a previous result.
572 Get a previous result.
569
573
570 When code is executed in an engine, a dict is created and returned. This
574 When code is executed in an engine, a dict is created and returned. This
571 method retrieves that dict for previous commands.
575 method retrieves that dict for previous commands.
572
576
573 :Parameters:
577 :Parameters:
574 i : int
578 i : int
575 The number of the result to get
579 The number of the result to get
576 targets : id or list of ids
580 targets : id or list of ids
577 The engine to use for the execution
581 The engine to use for the execution
578 block : boolean
582 block : boolean
579 If False, this method will return the actual result. If False,
583 If False, this method will return the actual result. If False,
580 a `PendingResult` is returned which can be used to get the result
584 a `PendingResult` is returned which can be used to get the result
581 at a later time.
585 at a later time.
582 """
586 """
583 targets, block = self._findTargetsAndBlock(targets, block)
587 targets, block = self._findTargetsAndBlock(targets, block)
584 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
588 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
585 if block:
589 if block:
586 result = ResultList(result)
590 result = ResultList(result)
587 else:
591 else:
588 result = PendingResult(self, result)
592 result = PendingResult(self, result)
589 result.add_callback(wrapResultList)
593 result.add_callback(wrapResultList)
590 return result
594 return result
591
595
592 def reset(self, targets=None, block=None):
596 def reset(self, targets=None, block=None):
593 """
597 """
594 Reset an engine.
598 Reset an engine.
595
599
596 This method clears out the namespace of an engine.
600 This method clears out the namespace of an engine.
597
601
598 :Parameters:
602 :Parameters:
599 targets : id or list of ids
603 targets : id or list of ids
600 The engine to use for the execution
604 The engine to use for the execution
601 block : boolean
605 block : boolean
602 If False, this method will return the actual result. If False,
606 If False, this method will return the actual result. If False,
603 a `PendingResult` is returned which can be used to get the result
607 a `PendingResult` is returned which can be used to get the result
604 at a later time.
608 at a later time.
605 """
609 """
606 targets, block = self._findTargetsAndBlock(targets, block)
610 targets, block = self._findTargetsAndBlock(targets, block)
607 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
611 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
608
612
609 def keys(self, targets=None, block=None):
613 def keys(self, targets=None, block=None):
610 """
614 """
611 Get a list of all the variables in an engine's namespace.
615 Get a list of all the variables in an engine's namespace.
612
616
613 :Parameters:
617 :Parameters:
614 targets : id or list of ids
618 targets : id or list of ids
615 The engine to use for the execution
619 The engine to use for the execution
616 block : boolean
620 block : boolean
617 If False, this method will return the actual result. If False,
621 If False, this method will return the actual result. If False,
618 a `PendingResult` is returned which can be used to get the result
622 a `PendingResult` is returned which can be used to get the result
619 at a later time.
623 at a later time.
620 """
624 """
621 targets, block = self._findTargetsAndBlock(targets, block)
625 targets, block = self._findTargetsAndBlock(targets, block)
622 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
626 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
623
627
624 def kill(self, controller=False, targets=None, block=None):
628 def kill(self, controller=False, targets=None, block=None):
625 """
629 """
626 Kill the engines and controller.
630 Kill the engines and controller.
627
631
628 This method is used to stop the engine and controller by calling
632 This method is used to stop the engine and controller by calling
629 `reactor.stop`.
633 `reactor.stop`.
630
634
631 :Parameters:
635 :Parameters:
632 controller : boolean
636 controller : boolean
633 If True, kill the engines and controller. If False, just the
637 If True, kill the engines and controller. If False, just the
634 engines
638 engines
635 targets : id or list of ids
639 targets : id or list of ids
636 The engine to use for the execution
640 The engine to use for the execution
637 block : boolean
641 block : boolean
638 If False, this method will return the actual result. If False,
642 If False, this method will return the actual result. If False,
639 a `PendingResult` is returned which can be used to get the result
643 a `PendingResult` is returned which can be used to get the result
640 at a later time.
644 at a later time.
641 """
645 """
642 targets, block = self._findTargetsAndBlock(targets, block)
646 targets, block = self._findTargetsAndBlock(targets, block)
643 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
647 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
644
648
645 def clear_queue(self, targets=None, block=None):
649 def clear_queue(self, targets=None, block=None):
646 """
650 """
647 Clear out the controller's queue for an engine.
651 Clear out the controller's queue for an engine.
648
652
649 The controller maintains a queue for each engine. This clear it out.
653 The controller maintains a queue for each engine. This clear it out.
650
654
651 :Parameters:
655 :Parameters:
652 targets : id or list of ids
656 targets : id or list of ids
653 The engine to use for the execution
657 The engine to use for the execution
654 block : boolean
658 block : boolean
655 If False, this method will return the actual result. If False,
659 If False, this method will return the actual result. If False,
656 a `PendingResult` is returned which can be used to get the result
660 a `PendingResult` is returned which can be used to get the result
657 at a later time.
661 at a later time.
658 """
662 """
659 targets, block = self._findTargetsAndBlock(targets, block)
663 targets, block = self._findTargetsAndBlock(targets, block)
660 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
664 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
661
665
662 def queue_status(self, targets=None, block=None):
666 def queue_status(self, targets=None, block=None):
663 """
667 """
664 Get the status of an engines queue.
668 Get the status of an engines queue.
665
669
666 :Parameters:
670 :Parameters:
667 targets : id or list of ids
671 targets : id or list of ids
668 The engine to use for the execution
672 The engine to use for the execution
669 block : boolean
673 block : boolean
670 If False, this method will return the actual result. If False,
674 If False, this method will return the actual result. If False,
671 a `PendingResult` is returned which can be used to get the result
675 a `PendingResult` is returned which can be used to get the result
672 at a later time.
676 at a later time.
673 """
677 """
674 targets, block = self._findTargetsAndBlock(targets, block)
678 targets, block = self._findTargetsAndBlock(targets, block)
675 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
679 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
676
680
677 def set_properties(self, properties, targets=None, block=None):
681 def set_properties(self, properties, targets=None, block=None):
678 warnings.warn(_prop_warn)
682 warnings.warn(_prop_warn)
679 targets, block = self._findTargetsAndBlock(targets, block)
683 targets, block = self._findTargetsAndBlock(targets, block)
680 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
684 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
681
685
682 def get_properties(self, keys=None, targets=None, block=None):
686 def get_properties(self, keys=None, targets=None, block=None):
683 warnings.warn(_prop_warn)
687 warnings.warn(_prop_warn)
684 targets, block = self._findTargetsAndBlock(targets, block)
688 targets, block = self._findTargetsAndBlock(targets, block)
685 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
689 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
686
690
687 def has_properties(self, keys, targets=None, block=None):
691 def has_properties(self, keys, targets=None, block=None):
688 warnings.warn(_prop_warn)
692 warnings.warn(_prop_warn)
689 targets, block = self._findTargetsAndBlock(targets, block)
693 targets, block = self._findTargetsAndBlock(targets, block)
690 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
694 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
691
695
692 def del_properties(self, keys, targets=None, block=None):
696 def del_properties(self, keys, targets=None, block=None):
693 warnings.warn(_prop_warn)
697 warnings.warn(_prop_warn)
694 targets, block = self._findTargetsAndBlock(targets, block)
698 targets, block = self._findTargetsAndBlock(targets, block)
695 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
699 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
696
700
697 def clear_properties(self, targets=None, block=None):
701 def clear_properties(self, targets=None, block=None):
698 warnings.warn(_prop_warn)
702 warnings.warn(_prop_warn)
699 targets, block = self._findTargetsAndBlock(targets, block)
703 targets, block = self._findTargetsAndBlock(targets, block)
700 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
704 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
701
705
702 #---------------------------------------------------------------------------
706 #---------------------------------------------------------------------------
703 # IMultiEngine related methods
707 # IMultiEngine related methods
704 #---------------------------------------------------------------------------
708 #---------------------------------------------------------------------------
705
709
706 def get_ids(self):
710 def get_ids(self):
707 """
711 """
708 Returns the ids of currently registered engines.
712 Returns the ids of currently registered engines.
709 """
713 """
710 result = self._bcft(self.smultiengine.get_ids)
714 result = self._bcft(self.smultiengine.get_ids)
711 return result
715 return result
712
716
713 #---------------------------------------------------------------------------
717 #---------------------------------------------------------------------------
714 # IMultiEngineCoordinator
718 # IMultiEngineCoordinator
715 #---------------------------------------------------------------------------
719 #---------------------------------------------------------------------------
716
720
717 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
721 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
718 """
722 """
719 Partition a Python sequence and send the partitions to a set of engines.
723 Partition a Python sequence and send the partitions to a set of engines.
720 """
724 """
721 targets, block = self._findTargetsAndBlock(targets, block)
725 targets, block = self._findTargetsAndBlock(targets, block)
722 return self._blockFromThread(self.smultiengine.scatter, key, seq,
726 return self._blockFromThread(self.smultiengine.scatter, key, seq,
723 dist, flatten, targets=targets, block=block)
727 dist, flatten, targets=targets, block=block)
724
728
725 def gather(self, key, dist='b', targets=None, block=None):
729 def gather(self, key, dist='b', targets=None, block=None):
726 """
730 """
727 Gather a partitioned sequence on a set of engines as a single local seq.
731 Gather a partitioned sequence on a set of engines as a single local seq.
728 """
732 """
729 targets, block = self._findTargetsAndBlock(targets, block)
733 targets, block = self._findTargetsAndBlock(targets, block)
730 return self._blockFromThread(self.smultiengine.gather, key, dist,
734 return self._blockFromThread(self.smultiengine.gather, key, dist,
731 targets=targets, block=block)
735 targets=targets, block=block)
732
736
733 def raw_map(self, func, seq, dist='b', targets=None, block=None):
737 def raw_map(self, func, seq, dist='b', targets=None, block=None):
734 """
738 """
735 A parallelized version of Python's builtin map.
739 A parallelized version of Python's builtin map.
736
740
737 This has a slightly different syntax than the builtin `map`.
741 This has a slightly different syntax than the builtin `map`.
738 This is needed because we need to have keyword arguments and thus
742 This is needed because we need to have keyword arguments and thus
739 can't use *args to capture all the sequences. Instead, they must
743 can't use *args to capture all the sequences. Instead, they must
740 be passed in a list or tuple.
744 be passed in a list or tuple.
741
745
742 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
746 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
743
747
744 Most users will want to use parallel functions or the `mapper`
748 Most users will want to use parallel functions or the `mapper`
745 and `map` methods for an API that follows that of the builtin
749 and `map` methods for an API that follows that of the builtin
746 `map`.
750 `map`.
747 """
751 """
748 targets, block = self._findTargetsAndBlock(targets, block)
752 targets, block = self._findTargetsAndBlock(targets, block)
749 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
753 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
750 dist, targets=targets, block=block)
754 dist, targets=targets, block=block)
751
755
752 def map(self, func, *sequences):
756 def map(self, func, *sequences):
753 """
757 """
754 A parallel version of Python's builtin `map` function.
758 A parallel version of Python's builtin `map` function.
755
759
756 This method applies a function to sequences of arguments. It
760 This method applies a function to sequences of arguments. It
757 follows the same syntax as the builtin `map`.
761 follows the same syntax as the builtin `map`.
758
762
759 This method creates a mapper objects by calling `self.mapper` with
763 This method creates a mapper objects by calling `self.mapper` with
760 no arguments and then uses that mapper to do the mapping. See
764 no arguments and then uses that mapper to do the mapping. See
761 the documentation of `mapper` for more details.
765 the documentation of `mapper` for more details.
762 """
766 """
763 return self.mapper().map(func, *sequences)
767 return self.mapper().map(func, *sequences)
764
768
765 def mapper(self, dist='b', targets='all', block=None):
769 def mapper(self, dist='b', targets='all', block=None):
766 """
770 """
767 Create a mapper object that has a `map` method.
771 Create a mapper object that has a `map` method.
768
772
769 This method returns an object that implements the `IMapper`
773 This method returns an object that implements the `IMapper`
770 interface. This method is a factory that is used to control how
774 interface. This method is a factory that is used to control how
771 the map happens.
775 the map happens.
772
776
773 :Parameters:
777 :Parameters:
774 dist : str
778 dist : str
775 What decomposition to use, 'b' is the only one supported
779 What decomposition to use, 'b' is the only one supported
776 currently
780 currently
777 targets : str, int, sequence of ints
781 targets : str, int, sequence of ints
778 Which engines to use for the map
782 Which engines to use for the map
779 block : boolean
783 block : boolean
780 Should calls to `map` block or not
784 Should calls to `map` block or not
781 """
785 """
782 return MultiEngineMapper(self, dist, targets, block)
786 return MultiEngineMapper(self, dist, targets, block)
783
787
784 def parallel(self, dist='b', targets=None, block=None):
788 def parallel(self, dist='b', targets=None, block=None):
785 """
789 """
786 A decorator that turns a function into a parallel function.
790 A decorator that turns a function into a parallel function.
787
791
788 This can be used as:
792 This can be used as:
789
793
790 @parallel()
794 @parallel()
791 def f(x, y)
795 def f(x, y)
792 ...
796 ...
793
797
794 f(range(10), range(10))
798 f(range(10), range(10))
795
799
796 This causes f(0,0), f(1,1), ... to be called in parallel.
800 This causes f(0,0), f(1,1), ... to be called in parallel.
797
801
798 :Parameters:
802 :Parameters:
799 dist : str
803 dist : str
800 What decomposition to use, 'b' is the only one supported
804 What decomposition to use, 'b' is the only one supported
801 currently
805 currently
802 targets : str, int, sequence of ints
806 targets : str, int, sequence of ints
803 Which engines to use for the map
807 Which engines to use for the map
804 block : boolean
808 block : boolean
805 Should calls to `map` block or not
809 Should calls to `map` block or not
806 """
810 """
807 targets, block = self._findTargetsAndBlock(targets, block)
811 targets, block = self._findTargetsAndBlock(targets, block)
808 mapper = self.mapper(dist, targets, block)
812 mapper = self.mapper(dist, targets, block)
809 pf = ParallelFunction(mapper)
813 pf = ParallelFunction(mapper)
810 return pf
814 return pf
811
815
812 #---------------------------------------------------------------------------
816 #---------------------------------------------------------------------------
813 # IMultiEngineExtras
817 # IMultiEngineExtras
814 #---------------------------------------------------------------------------
818 #---------------------------------------------------------------------------
815
819
816 def zip_pull(self, keys, targets=None, block=None):
820 def zip_pull(self, keys, targets=None, block=None):
817 targets, block = self._findTargetsAndBlock(targets, block)
821 targets, block = self._findTargetsAndBlock(targets, block)
818 return self._blockFromThread(self.smultiengine.zip_pull, keys,
822 return self._blockFromThread(self.smultiengine.zip_pull, keys,
819 targets=targets, block=block)
823 targets=targets, block=block)
820
824
821 def run(self, filename, targets=None, block=None):
825 def run(self, filename, targets=None, block=None):
822 """
826 """
823 Run a Python code in a file on the engines.
827 Run a Python code in a file on the engines.
824
828
825 :Parameters:
829 :Parameters:
826 filename : str
830 filename : str
827 The name of the local file to run
831 The name of the local file to run
828 targets : id or list of ids
832 targets : id or list of ids
829 The engine to use for the execution
833 The engine to use for the execution
830 block : boolean
834 block : boolean
831 If False, this method will return the actual result. If False,
835 If False, this method will return the actual result. If False,
832 a `PendingResult` is returned which can be used to get the result
836 a `PendingResult` is returned which can be used to get the result
833 at a later time.
837 at a later time.
834 """
838 """
835 targets, block = self._findTargetsAndBlock(targets, block)
839 targets, block = self._findTargetsAndBlock(targets, block)
836 return self._blockFromThread(self.smultiengine.run, filename,
840 return self._blockFromThread(self.smultiengine.run, filename,
837 targets=targets, block=block)
841 targets=targets, block=block)
838
842
839 def benchmark(self, push_size=10000):
843 def benchmark(self, push_size=10000):
840 """
844 """
841 Run performance benchmarks for the current IPython cluster.
845 Run performance benchmarks for the current IPython cluster.
842
846
843 This method tests both the latency of sending command and data to the
847 This method tests both the latency of sending command and data to the
844 engines as well as the throughput of sending large objects to the
848 engines as well as the throughput of sending large objects to the
845 engines using push. The latency is measured by having one or more
849 engines using push. The latency is measured by having one or more
846 engines execute the command 'pass'. The throughput is measure by
850 engines execute the command 'pass'. The throughput is measure by
847 sending an NumPy array of size `push_size` to one or more engines.
851 sending an NumPy array of size `push_size` to one or more engines.
848
852
849 These benchmarks will vary widely on different hardware and networks
853 These benchmarks will vary widely on different hardware and networks
850 and thus can be used to get an idea of the performance characteristics
854 and thus can be used to get an idea of the performance characteristics
851 of a particular configuration of an IPython controller and engines.
855 of a particular configuration of an IPython controller and engines.
852
856
853 This function is not testable within our current testing framework.
857 This function is not testable within our current testing framework.
854 """
858 """
855 import timeit, __builtin__
859 import timeit, __builtin__
856 __builtin__._mec_self = self
860 __builtin__._mec_self = self
857 benchmarks = {}
861 benchmarks = {}
858 repeat = 3
862 repeat = 3
859 count = 10
863 count = 10
860
864
861 timer = timeit.Timer('_mec_self.execute("pass",0)')
865 timer = timeit.Timer('_mec_self.execute("pass",0)')
862 result = 1000*min(timer.repeat(repeat,count))/count
866 result = 1000*min(timer.repeat(repeat,count))/count
863 benchmarks['single_engine_latency'] = (result,'msec')
867 benchmarks['single_engine_latency'] = (result,'msec')
864
868
865 timer = timeit.Timer('_mec_self.execute("pass")')
869 timer = timeit.Timer('_mec_self.execute("pass")')
866 result = 1000*min(timer.repeat(repeat,count))/count
870 result = 1000*min(timer.repeat(repeat,count))/count
867 benchmarks['all_engine_latency'] = (result,'msec')
871 benchmarks['all_engine_latency'] = (result,'msec')
868
872
869 try:
873 try:
870 import numpy as np
874 import numpy as np
871 except:
875 except:
872 pass
876 pass
873 else:
877 else:
874 timer = timeit.Timer(
878 timer = timeit.Timer(
875 "_mec_self.push(d)",
879 "_mec_self.push(d)",
876 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
880 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
877 )
881 )
878 result = min(timer.repeat(repeat,count))/count
882 result = min(timer.repeat(repeat,count))/count
879 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
883 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
880
884
881 try:
885 try:
882 import numpy as np
886 import numpy as np
883 except:
887 except:
884 pass
888 pass
885 else:
889 else:
886 timer = timeit.Timer(
890 timer = timeit.Timer(
887 "_mec_self.push(d,0)",
891 "_mec_self.push(d,0)",
888 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
892 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
889 )
893 )
890 result = min(timer.repeat(repeat,count))/count
894 result = min(timer.repeat(repeat,count))/count
891 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
895 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
892
896
893 return benchmarks
897 return benchmarks
894
898
895
899
896 components.registerAdapter(FullBlockingMultiEngineClient,
900 components.registerAdapter(FullBlockingMultiEngineClient,
897 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
901 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
898
902
899
903
900
904
901
905
@@ -1,757 +1,760 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """
3 """
4 Expose the multiengine controller over the Foolscap network protocol.
4 Expose the multiengine controller over the Foolscap network protocol.
5 """
5 """
6
6
7 __docformat__ = "restructuredtext en"
7 __docformat__ = "restructuredtext en"
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Copyright (C) 2008 The IPython Development Team
10 # Copyright (C) 2008 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15
15
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19
19
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22
22
23 from zope.interface import Interface, implements
23 from zope.interface import Interface, implements
24 from twisted.internet import defer
24 from twisted.internet import defer
25 from twisted.python import components, failure
25 from twisted.python import components, failure
26
26
27 try:
28 from foolscap.api import Referenceable
29 except ImportError:
27 from foolscap import Referenceable
30 from foolscap import Referenceable
28
31
29 from IPython.kernel import error
32 from IPython.kernel import error
30 from IPython.kernel import map as Map
33 from IPython.kernel import map as Map
31 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.mapper import (
35 from IPython.kernel.mapper import (
33 MultiEngineMapper,
36 MultiEngineMapper,
34 IMultiEngineMapperFactory,
37 IMultiEngineMapperFactory,
35 IMapper
38 IMapper
36 )
39 )
37 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel.twistedutil import gatherBoth
38 from IPython.kernel.multiengine import (
41 from IPython.kernel.multiengine import (
39 IMultiEngine,
42 IMultiEngine,
40 IFullSynchronousMultiEngine,
43 IFullSynchronousMultiEngine,
41 ISynchronousMultiEngine)
44 ISynchronousMultiEngine)
42 from IPython.kernel.pendingdeferred import PendingDeferredManager
45 from IPython.kernel.pendingdeferred import PendingDeferredManager
43 from IPython.kernel.pickleutil import (
46 from IPython.kernel.pickleutil import (
44 canDict,
47 canDict,
45 canSequence, uncanDict, uncanSequence
48 canSequence, uncanDict, uncanSequence
46 )
49 )
47
50
48 from IPython.kernel.clientinterfaces import (
51 from IPython.kernel.clientinterfaces import (
49 IFCClientInterfaceProvider,
52 IFCClientInterfaceProvider,
50 IBlockingClientAdaptor
53 IBlockingClientAdaptor
51 )
54 )
52
55
53 # Needed to access the true globals from __main__.__dict__
56 # Needed to access the true globals from __main__.__dict__
54 import __main__
57 import __main__
55
58
56 #-------------------------------------------------------------------------------
59 #-------------------------------------------------------------------------------
57 # The Controller side of things
60 # The Controller side of things
58 #-------------------------------------------------------------------------------
61 #-------------------------------------------------------------------------------
59
62
60 def packageResult(wrappedMethod):
63 def packageResult(wrappedMethod):
61
64
62 def wrappedPackageResult(self, *args, **kwargs):
65 def wrappedPackageResult(self, *args, **kwargs):
63 d = wrappedMethod(self, *args, **kwargs)
66 d = wrappedMethod(self, *args, **kwargs)
64 d.addCallback(self.packageSuccess)
67 d.addCallback(self.packageSuccess)
65 d.addErrback(self.packageFailure)
68 d.addErrback(self.packageFailure)
66 return d
69 return d
67 return wrappedPackageResult
70 return wrappedPackageResult
68
71
69
72
70 class IFCSynchronousMultiEngine(Interface):
73 class IFCSynchronousMultiEngine(Interface):
71 """Foolscap interface to `ISynchronousMultiEngine`.
74 """Foolscap interface to `ISynchronousMultiEngine`.
72
75
73 The methods in this interface are similar to those of
76 The methods in this interface are similar to those of
74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
77 `ISynchronousMultiEngine`, but their arguments and return values are pickled
75 if they are not already simple Python types that can be send over XML-RPC.
78 if they are not already simple Python types that can be send over XML-RPC.
76
79
77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
80 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
78 documentation about the methods.
81 documentation about the methods.
79
82
80 Most methods in this interface act like the `ISynchronousMultiEngine`
83 Most methods in this interface act like the `ISynchronousMultiEngine`
81 versions and can be called in blocking or non-blocking mode.
84 versions and can be called in blocking or non-blocking mode.
82 """
85 """
83 pass
86 pass
84
87
85
88
86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
89 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
90 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
88 """
91 """
89
92
90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
93 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
91
94
92 addSlash = True
95 addSlash = True
93
96
94 def __init__(self, multiengine):
97 def __init__(self, multiengine):
95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
98 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
96 # it. This allow this class to do two adaptation steps.
99 # it. This allow this class to do two adaptation steps.
97 self.smultiengine = ISynchronousMultiEngine(multiengine)
100 self.smultiengine = ISynchronousMultiEngine(multiengine)
98 self._deferredIDCallbacks = {}
101 self._deferredIDCallbacks = {}
99
102
100 #---------------------------------------------------------------------------
103 #---------------------------------------------------------------------------
101 # Non interface methods
104 # Non interface methods
102 #---------------------------------------------------------------------------
105 #---------------------------------------------------------------------------
103
106
104 def packageFailure(self, f):
107 def packageFailure(self, f):
105 f.cleanFailure()
108 f.cleanFailure()
106 return self.packageSuccess(f)
109 return self.packageSuccess(f)
107
110
108 def packageSuccess(self, obj):
111 def packageSuccess(self, obj):
109 serial = pickle.dumps(obj, 2)
112 serial = pickle.dumps(obj, 2)
110 return serial
113 return serial
111
114
112 #---------------------------------------------------------------------------
115 #---------------------------------------------------------------------------
113 # Things related to PendingDeferredManager
116 # Things related to PendingDeferredManager
114 #---------------------------------------------------------------------------
117 #---------------------------------------------------------------------------
115
118
116 @packageResult
119 @packageResult
117 def remote_get_pending_deferred(self, deferredID, block):
120 def remote_get_pending_deferred(self, deferredID, block):
118 d = self.smultiengine.get_pending_deferred(deferredID, block)
121 d = self.smultiengine.get_pending_deferred(deferredID, block)
119 try:
122 try:
120 callback = self._deferredIDCallbacks.pop(deferredID)
123 callback = self._deferredIDCallbacks.pop(deferredID)
121 except KeyError:
124 except KeyError:
122 callback = None
125 callback = None
123 if callback is not None:
126 if callback is not None:
124 d.addCallback(callback[0], *callback[1], **callback[2])
127 d.addCallback(callback[0], *callback[1], **callback[2])
125 return d
128 return d
126
129
127 @packageResult
130 @packageResult
128 def remote_clear_pending_deferreds(self):
131 def remote_clear_pending_deferreds(self):
129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
132 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
130
133
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
134 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
135 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 return did
136 return did
134
137
135 #---------------------------------------------------------------------------
138 #---------------------------------------------------------------------------
136 # IEngineMultiplexer related methods
139 # IEngineMultiplexer related methods
137 #---------------------------------------------------------------------------
140 #---------------------------------------------------------------------------
138
141
139 @packageResult
142 @packageResult
140 def remote_execute(self, lines, targets, block):
143 def remote_execute(self, lines, targets, block):
141 return self.smultiengine.execute(lines, targets=targets, block=block)
144 return self.smultiengine.execute(lines, targets=targets, block=block)
142
145
143 @packageResult
146 @packageResult
144 def remote_push(self, binaryNS, targets, block):
147 def remote_push(self, binaryNS, targets, block):
145 try:
148 try:
146 namespace = pickle.loads(binaryNS)
149 namespace = pickle.loads(binaryNS)
147 except:
150 except:
148 d = defer.fail(failure.Failure())
151 d = defer.fail(failure.Failure())
149 else:
152 else:
150 d = self.smultiengine.push(namespace, targets=targets, block=block)
153 d = self.smultiengine.push(namespace, targets=targets, block=block)
151 return d
154 return d
152
155
153 @packageResult
156 @packageResult
154 def remote_pull(self, keys, targets, block):
157 def remote_pull(self, keys, targets, block):
155 d = self.smultiengine.pull(keys, targets=targets, block=block)
158 d = self.smultiengine.pull(keys, targets=targets, block=block)
156 return d
159 return d
157
160
158 @packageResult
161 @packageResult
159 def remote_push_function(self, binaryNS, targets, block):
162 def remote_push_function(self, binaryNS, targets, block):
160 try:
163 try:
161 namespace = pickle.loads(binaryNS)
164 namespace = pickle.loads(binaryNS)
162 except:
165 except:
163 d = defer.fail(failure.Failure())
166 d = defer.fail(failure.Failure())
164 else:
167 else:
165 namespace = uncanDict(namespace)
168 namespace = uncanDict(namespace)
166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
169 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
167 return d
170 return d
168
171
169 def _canMultipleKeys(self, result):
172 def _canMultipleKeys(self, result):
170 return [canSequence(r) for r in result]
173 return [canSequence(r) for r in result]
171
174
172 @packageResult
175 @packageResult
173 def remote_pull_function(self, keys, targets, block):
176 def remote_pull_function(self, keys, targets, block):
174 def can_functions(r, keys):
177 def can_functions(r, keys):
175 if len(keys)==1 or isinstance(keys, str):
178 if len(keys)==1 or isinstance(keys, str):
176 result = canSequence(r)
179 result = canSequence(r)
177 elif len(keys)>1:
180 elif len(keys)>1:
178 result = [canSequence(s) for s in r]
181 result = [canSequence(s) for s in r]
179 return result
182 return result
180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
183 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
181 if block:
184 if block:
182 d.addCallback(can_functions, keys)
185 d.addCallback(can_functions, keys)
183 else:
186 else:
184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
187 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
185 return d
188 return d
186
189
187 @packageResult
190 @packageResult
188 def remote_push_serialized(self, binaryNS, targets, block):
191 def remote_push_serialized(self, binaryNS, targets, block):
189 try:
192 try:
190 namespace = pickle.loads(binaryNS)
193 namespace = pickle.loads(binaryNS)
191 except:
194 except:
192 d = defer.fail(failure.Failure())
195 d = defer.fail(failure.Failure())
193 else:
196 else:
194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
197 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
195 return d
198 return d
196
199
197 @packageResult
200 @packageResult
198 def remote_pull_serialized(self, keys, targets, block):
201 def remote_pull_serialized(self, keys, targets, block):
199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
202 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
200 return d
203 return d
201
204
202 @packageResult
205 @packageResult
203 def remote_get_result(self, i, targets, block):
206 def remote_get_result(self, i, targets, block):
204 if i == 'None':
207 if i == 'None':
205 i = None
208 i = None
206 return self.smultiengine.get_result(i, targets=targets, block=block)
209 return self.smultiengine.get_result(i, targets=targets, block=block)
207
210
208 @packageResult
211 @packageResult
209 def remote_reset(self, targets, block):
212 def remote_reset(self, targets, block):
210 return self.smultiengine.reset(targets=targets, block=block)
213 return self.smultiengine.reset(targets=targets, block=block)
211
214
212 @packageResult
215 @packageResult
213 def remote_keys(self, targets, block):
216 def remote_keys(self, targets, block):
214 return self.smultiengine.keys(targets=targets, block=block)
217 return self.smultiengine.keys(targets=targets, block=block)
215
218
216 @packageResult
219 @packageResult
217 def remote_kill(self, controller, targets, block):
220 def remote_kill(self, controller, targets, block):
218 return self.smultiengine.kill(controller, targets=targets, block=block)
221 return self.smultiengine.kill(controller, targets=targets, block=block)
219
222
220 @packageResult
223 @packageResult
221 def remote_clear_queue(self, targets, block):
224 def remote_clear_queue(self, targets, block):
222 return self.smultiengine.clear_queue(targets=targets, block=block)
225 return self.smultiengine.clear_queue(targets=targets, block=block)
223
226
224 @packageResult
227 @packageResult
225 def remote_queue_status(self, targets, block):
228 def remote_queue_status(self, targets, block):
226 return self.smultiengine.queue_status(targets=targets, block=block)
229 return self.smultiengine.queue_status(targets=targets, block=block)
227
230
228 @packageResult
231 @packageResult
229 def remote_set_properties(self, binaryNS, targets, block):
232 def remote_set_properties(self, binaryNS, targets, block):
230 try:
233 try:
231 ns = pickle.loads(binaryNS)
234 ns = pickle.loads(binaryNS)
232 except:
235 except:
233 d = defer.fail(failure.Failure())
236 d = defer.fail(failure.Failure())
234 else:
237 else:
235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
238 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
236 return d
239 return d
237
240
238 @packageResult
241 @packageResult
239 def remote_get_properties(self, keys, targets, block):
242 def remote_get_properties(self, keys, targets, block):
240 if keys=='None':
243 if keys=='None':
241 keys=None
244 keys=None
242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
245 return self.smultiengine.get_properties(keys, targets=targets, block=block)
243
246
244 @packageResult
247 @packageResult
245 def remote_has_properties(self, keys, targets, block):
248 def remote_has_properties(self, keys, targets, block):
246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
249 return self.smultiengine.has_properties(keys, targets=targets, block=block)
247
250
248 @packageResult
251 @packageResult
249 def remote_del_properties(self, keys, targets, block):
252 def remote_del_properties(self, keys, targets, block):
250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
253 return self.smultiengine.del_properties(keys, targets=targets, block=block)
251
254
252 @packageResult
255 @packageResult
253 def remote_clear_properties(self, targets, block):
256 def remote_clear_properties(self, targets, block):
254 return self.smultiengine.clear_properties(targets=targets, block=block)
257 return self.smultiengine.clear_properties(targets=targets, block=block)
255
258
256 #---------------------------------------------------------------------------
259 #---------------------------------------------------------------------------
257 # IMultiEngine related methods
260 # IMultiEngine related methods
258 #---------------------------------------------------------------------------
261 #---------------------------------------------------------------------------
259
262
260 def remote_get_ids(self):
263 def remote_get_ids(self):
261 """Get the ids of the registered engines.
264 """Get the ids of the registered engines.
262
265
263 This method always blocks.
266 This method always blocks.
264 """
267 """
265 return self.smultiengine.get_ids()
268 return self.smultiengine.get_ids()
266
269
267 #---------------------------------------------------------------------------
270 #---------------------------------------------------------------------------
268 # IFCClientInterfaceProvider related methods
271 # IFCClientInterfaceProvider related methods
269 #---------------------------------------------------------------------------
272 #---------------------------------------------------------------------------
270
273
271 def remote_get_client_name(self):
274 def remote_get_client_name(self):
272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
275 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
273
276
274
277
275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
278 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
279 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
277 # two phase adaptation.
280 # two phase adaptation.
278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
281 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
279 IMultiEngine, IFCSynchronousMultiEngine)
282 IMultiEngine, IFCSynchronousMultiEngine)
280
283
281
284
282 #-------------------------------------------------------------------------------
285 #-------------------------------------------------------------------------------
283 # The Client side of things
286 # The Client side of things
284 #-------------------------------------------------------------------------------
287 #-------------------------------------------------------------------------------
285
288
286
289
287 class FCFullSynchronousMultiEngineClient(object):
290 class FCFullSynchronousMultiEngineClient(object):
288
291
289 implements(
292 implements(
290 IFullSynchronousMultiEngine,
293 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
294 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
295 IMultiEngineMapperFactory,
293 IMapper
296 IMapper
294 )
297 )
295
298
296 def __init__(self, remote_reference):
299 def __init__(self, remote_reference):
297 self.remote_reference = remote_reference
300 self.remote_reference = remote_reference
298 self._deferredIDCallbacks = {}
301 self._deferredIDCallbacks = {}
299 # This class manages some pending deferreds through this instance. This
302 # This class manages some pending deferreds through this instance. This
300 # is required for methods like gather/scatter as it enables us to
303 # is required for methods like gather/scatter as it enables us to
301 # create our own pending deferreds for composite operations.
304 # create our own pending deferreds for composite operations.
302 self.pdm = PendingDeferredManager()
305 self.pdm = PendingDeferredManager()
303
306
304 #---------------------------------------------------------------------------
307 #---------------------------------------------------------------------------
305 # Non interface methods
308 # Non interface methods
306 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
307
310
308 def unpackage(self, r):
311 def unpackage(self, r):
309 return pickle.loads(r)
312 return pickle.loads(r)
310
313
311 #---------------------------------------------------------------------------
314 #---------------------------------------------------------------------------
312 # Things related to PendingDeferredManager
315 # Things related to PendingDeferredManager
313 #---------------------------------------------------------------------------
316 #---------------------------------------------------------------------------
314
317
315 def get_pending_deferred(self, deferredID, block=True):
318 def get_pending_deferred(self, deferredID, block=True):
316
319
317 # Because we are managing some pending deferreds locally (through
320 # Because we are managing some pending deferreds locally (through
318 # self.pdm) and some remotely (on the controller), we first try the
321 # self.pdm) and some remotely (on the controller), we first try the
319 # local one and then the remote one.
322 # local one and then the remote one.
320 if self.pdm.quick_has_id(deferredID):
323 if self.pdm.quick_has_id(deferredID):
321 d = self.pdm.get_pending_deferred(deferredID, block)
324 d = self.pdm.get_pending_deferred(deferredID, block)
322 return d
325 return d
323 else:
326 else:
324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
327 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
325 d.addCallback(self.unpackage)
328 d.addCallback(self.unpackage)
326 try:
329 try:
327 callback = self._deferredIDCallbacks.pop(deferredID)
330 callback = self._deferredIDCallbacks.pop(deferredID)
328 except KeyError:
331 except KeyError:
329 callback = None
332 callback = None
330 if callback is not None:
333 if callback is not None:
331 d.addCallback(callback[0], *callback[1], **callback[2])
334 d.addCallback(callback[0], *callback[1], **callback[2])
332 return d
335 return d
333
336
334 def clear_pending_deferreds(self):
337 def clear_pending_deferreds(self):
335
338
336 # This clear both the local (self.pdm) and remote pending deferreds
339 # This clear both the local (self.pdm) and remote pending deferreds
337 self.pdm.clear_pending_deferreds()
340 self.pdm.clear_pending_deferreds()
338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
341 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
339 d2.addCallback(self.unpackage)
342 d2.addCallback(self.unpackage)
340 return d2
343 return d2
341
344
342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
345 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
346 self._deferredIDCallbacks[did] = (callback, args, kwargs)
344 return did
347 return did
345
348
346 #---------------------------------------------------------------------------
349 #---------------------------------------------------------------------------
347 # IEngineMultiplexer related methods
350 # IEngineMultiplexer related methods
348 #---------------------------------------------------------------------------
351 #---------------------------------------------------------------------------
349
352
350 def execute(self, lines, targets='all', block=True):
353 def execute(self, lines, targets='all', block=True):
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
354 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 d.addCallback(self.unpackage)
355 d.addCallback(self.unpackage)
353 return d
356 return d
354
357
355 def push(self, namespace, targets='all', block=True):
358 def push(self, namespace, targets='all', block=True):
356 serial = pickle.dumps(namespace, 2)
359 serial = pickle.dumps(namespace, 2)
357 d = self.remote_reference.callRemote('push', serial, targets, block)
360 d = self.remote_reference.callRemote('push', serial, targets, block)
358 d.addCallback(self.unpackage)
361 d.addCallback(self.unpackage)
359 return d
362 return d
360
363
361 def pull(self, keys, targets='all', block=True):
364 def pull(self, keys, targets='all', block=True):
362 d = self.remote_reference.callRemote('pull', keys, targets, block)
365 d = self.remote_reference.callRemote('pull', keys, targets, block)
363 d.addCallback(self.unpackage)
366 d.addCallback(self.unpackage)
364 return d
367 return d
365
368
366 def push_function(self, namespace, targets='all', block=True):
369 def push_function(self, namespace, targets='all', block=True):
367 cannedNamespace = canDict(namespace)
370 cannedNamespace = canDict(namespace)
368 serial = pickle.dumps(cannedNamespace, 2)
371 serial = pickle.dumps(cannedNamespace, 2)
369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
372 d = self.remote_reference.callRemote('push_function', serial, targets, block)
370 d.addCallback(self.unpackage)
373 d.addCallback(self.unpackage)
371 return d
374 return d
372
375
373 def pull_function(self, keys, targets='all', block=True):
376 def pull_function(self, keys, targets='all', block=True):
374 def uncan_functions(r, keys):
377 def uncan_functions(r, keys):
375 if len(keys)==1 or isinstance(keys, str):
378 if len(keys)==1 or isinstance(keys, str):
376 return uncanSequence(r)
379 return uncanSequence(r)
377 elif len(keys)>1:
380 elif len(keys)>1:
378 return [uncanSequence(s) for s in r]
381 return [uncanSequence(s) for s in r]
379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
382 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
380 if block:
383 if block:
381 d.addCallback(self.unpackage)
384 d.addCallback(self.unpackage)
382 d.addCallback(uncan_functions, keys)
385 d.addCallback(uncan_functions, keys)
383 else:
386 else:
384 d.addCallback(self.unpackage)
387 d.addCallback(self.unpackage)
385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
388 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
386 return d
389 return d
387
390
388 def push_serialized(self, namespace, targets='all', block=True):
391 def push_serialized(self, namespace, targets='all', block=True):
389 cannedNamespace = canDict(namespace)
392 cannedNamespace = canDict(namespace)
390 serial = pickle.dumps(cannedNamespace, 2)
393 serial = pickle.dumps(cannedNamespace, 2)
391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
394 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
392 d.addCallback(self.unpackage)
395 d.addCallback(self.unpackage)
393 return d
396 return d
394
397
395 def pull_serialized(self, keys, targets='all', block=True):
398 def pull_serialized(self, keys, targets='all', block=True):
396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
399 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
397 d.addCallback(self.unpackage)
400 d.addCallback(self.unpackage)
398 return d
401 return d
399
402
400 def get_result(self, i=None, targets='all', block=True):
403 def get_result(self, i=None, targets='all', block=True):
401 if i is None: # This is because None cannot be marshalled by xml-rpc
404 if i is None: # This is because None cannot be marshalled by xml-rpc
402 i = 'None'
405 i = 'None'
403 d = self.remote_reference.callRemote('get_result', i, targets, block)
406 d = self.remote_reference.callRemote('get_result', i, targets, block)
404 d.addCallback(self.unpackage)
407 d.addCallback(self.unpackage)
405 return d
408 return d
406
409
407 def reset(self, targets='all', block=True):
410 def reset(self, targets='all', block=True):
408 d = self.remote_reference.callRemote('reset', targets, block)
411 d = self.remote_reference.callRemote('reset', targets, block)
409 d.addCallback(self.unpackage)
412 d.addCallback(self.unpackage)
410 return d
413 return d
411
414
412 def keys(self, targets='all', block=True):
415 def keys(self, targets='all', block=True):
413 d = self.remote_reference.callRemote('keys', targets, block)
416 d = self.remote_reference.callRemote('keys', targets, block)
414 d.addCallback(self.unpackage)
417 d.addCallback(self.unpackage)
415 return d
418 return d
416
419
417 def kill(self, controller=False, targets='all', block=True):
420 def kill(self, controller=False, targets='all', block=True):
418 d = self.remote_reference.callRemote('kill', controller, targets, block)
421 d = self.remote_reference.callRemote('kill', controller, targets, block)
419 d.addCallback(self.unpackage)
422 d.addCallback(self.unpackage)
420 return d
423 return d
421
424
422 def clear_queue(self, targets='all', block=True):
425 def clear_queue(self, targets='all', block=True):
423 d = self.remote_reference.callRemote('clear_queue', targets, block)
426 d = self.remote_reference.callRemote('clear_queue', targets, block)
424 d.addCallback(self.unpackage)
427 d.addCallback(self.unpackage)
425 return d
428 return d
426
429
427 def queue_status(self, targets='all', block=True):
430 def queue_status(self, targets='all', block=True):
428 d = self.remote_reference.callRemote('queue_status', targets, block)
431 d = self.remote_reference.callRemote('queue_status', targets, block)
429 d.addCallback(self.unpackage)
432 d.addCallback(self.unpackage)
430 return d
433 return d
431
434
432 def set_properties(self, properties, targets='all', block=True):
435 def set_properties(self, properties, targets='all', block=True):
433 serial = pickle.dumps(properties, 2)
436 serial = pickle.dumps(properties, 2)
434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
437 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
435 d.addCallback(self.unpackage)
438 d.addCallback(self.unpackage)
436 return d
439 return d
437
440
438 def get_properties(self, keys=None, targets='all', block=True):
441 def get_properties(self, keys=None, targets='all', block=True):
439 if keys==None:
442 if keys==None:
440 keys='None'
443 keys='None'
441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
444 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
442 d.addCallback(self.unpackage)
445 d.addCallback(self.unpackage)
443 return d
446 return d
444
447
445 def has_properties(self, keys, targets='all', block=True):
448 def has_properties(self, keys, targets='all', block=True):
446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
449 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
447 d.addCallback(self.unpackage)
450 d.addCallback(self.unpackage)
448 return d
451 return d
449
452
450 def del_properties(self, keys, targets='all', block=True):
453 def del_properties(self, keys, targets='all', block=True):
451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
454 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
452 d.addCallback(self.unpackage)
455 d.addCallback(self.unpackage)
453 return d
456 return d
454
457
455 def clear_properties(self, targets='all', block=True):
458 def clear_properties(self, targets='all', block=True):
456 d = self.remote_reference.callRemote('clear_properties', targets, block)
459 d = self.remote_reference.callRemote('clear_properties', targets, block)
457 d.addCallback(self.unpackage)
460 d.addCallback(self.unpackage)
458 return d
461 return d
459
462
460 #---------------------------------------------------------------------------
463 #---------------------------------------------------------------------------
461 # IMultiEngine related methods
464 # IMultiEngine related methods
462 #---------------------------------------------------------------------------
465 #---------------------------------------------------------------------------
463
466
464 def get_ids(self):
467 def get_ids(self):
465 d = self.remote_reference.callRemote('get_ids')
468 d = self.remote_reference.callRemote('get_ids')
466 return d
469 return d
467
470
468 #---------------------------------------------------------------------------
471 #---------------------------------------------------------------------------
469 # ISynchronousMultiEngineCoordinator related methods
472 # ISynchronousMultiEngineCoordinator related methods
470 #---------------------------------------------------------------------------
473 #---------------------------------------------------------------------------
471
474
472 def _process_targets(self, targets):
475 def _process_targets(self, targets):
473 def create_targets(ids):
476 def create_targets(ids):
474 if isinstance(targets, int):
477 if isinstance(targets, int):
475 engines = [targets]
478 engines = [targets]
476 elif targets=='all':
479 elif targets=='all':
477 engines = ids
480 engines = ids
478 elif isinstance(targets, (list, tuple)):
481 elif isinstance(targets, (list, tuple)):
479 engines = targets
482 engines = targets
480 for t in engines:
483 for t in engines:
481 if not t in ids:
484 if not t in ids:
482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
485 raise error.InvalidEngineID("engine with id %r does not exist"%t)
483 return engines
486 return engines
484
487
485 d = self.get_ids()
488 d = self.get_ids()
486 d.addCallback(create_targets)
489 d.addCallback(create_targets)
487 return d
490 return d
488
491
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
492 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
490
493
491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
494 # Note: scatter and gather handle pending deferreds locally through self.pdm.
492 # This enables us to collect a bunch fo deferred ids and make a secondary
495 # This enables us to collect a bunch fo deferred ids and make a secondary
493 # deferred id that corresponds to the entire group. This logic is extremely
496 # deferred id that corresponds to the entire group. This logic is extremely
494 # difficult to get right though.
497 # difficult to get right though.
495 def do_scatter(engines):
498 def do_scatter(engines):
496 nEngines = len(engines)
499 nEngines = len(engines)
497 mapClass = Map.dists[dist]
500 mapClass = Map.dists[dist]
498 mapObject = mapClass()
501 mapObject = mapClass()
499 d_list = []
502 d_list = []
500 # Loop through and push to each engine in non-blocking mode.
503 # Loop through and push to each engine in non-blocking mode.
501 # This returns a set of deferreds to deferred_ids
504 # This returns a set of deferreds to deferred_ids
502 for index, engineid in enumerate(engines):
505 for index, engineid in enumerate(engines):
503 partition = mapObject.getPartition(seq, index, nEngines)
506 partition = mapObject.getPartition(seq, index, nEngines)
504 if flatten and len(partition) == 1:
507 if flatten and len(partition) == 1:
505 d = self.push({key: partition[0]}, targets=engineid, block=False)
508 d = self.push({key: partition[0]}, targets=engineid, block=False)
506 else:
509 else:
507 d = self.push({key: partition}, targets=engineid, block=False)
510 d = self.push({key: partition}, targets=engineid, block=False)
508 d_list.append(d)
511 d_list.append(d)
509 # Collect the deferred to deferred_ids
512 # Collect the deferred to deferred_ids
510 d = gatherBoth(d_list,
513 d = gatherBoth(d_list,
511 fireOnOneErrback=0,
514 fireOnOneErrback=0,
512 consumeErrors=1,
515 consumeErrors=1,
513 logErrors=0)
516 logErrors=0)
514 # Now d has a list of deferred_ids or Failures coming
517 # Now d has a list of deferred_ids or Failures coming
515 d.addCallback(error.collect_exceptions, 'scatter')
518 d.addCallback(error.collect_exceptions, 'scatter')
516 def process_did_list(did_list):
519 def process_did_list(did_list):
517 """Turn a list of deferred_ids into a final result or failure."""
520 """Turn a list of deferred_ids into a final result or failure."""
518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
521 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
519 final_d = gatherBoth(new_d_list,
522 final_d = gatherBoth(new_d_list,
520 fireOnOneErrback=0,
523 fireOnOneErrback=0,
521 consumeErrors=1,
524 consumeErrors=1,
522 logErrors=0)
525 logErrors=0)
523 final_d.addCallback(error.collect_exceptions, 'scatter')
526 final_d.addCallback(error.collect_exceptions, 'scatter')
524 final_d.addCallback(lambda lop: [i[0] for i in lop])
527 final_d.addCallback(lambda lop: [i[0] for i in lop])
525 return final_d
528 return final_d
526 # Now, depending on block, we need to handle the list deferred_ids
529 # Now, depending on block, we need to handle the list deferred_ids
527 # coming down the pipe diferently.
530 # coming down the pipe diferently.
528 if block:
531 if block:
529 # If we are blocking register a callback that will transform the
532 # If we are blocking register a callback that will transform the
530 # list of deferred_ids into the final result.
533 # list of deferred_ids into the final result.
531 d.addCallback(process_did_list)
534 d.addCallback(process_did_list)
532 return d
535 return d
533 else:
536 else:
534 # Here we are going to use a _local_ PendingDeferredManager.
537 # Here we are going to use a _local_ PendingDeferredManager.
535 deferred_id = self.pdm.get_deferred_id()
538 deferred_id = self.pdm.get_deferred_id()
536 # This is the deferred we will return to the user that will fire
539 # This is the deferred we will return to the user that will fire
537 # with the local deferred_id AFTER we have received the list of
540 # with the local deferred_id AFTER we have received the list of
538 # primary deferred_ids
541 # primary deferred_ids
539 d_to_return = defer.Deferred()
542 d_to_return = defer.Deferred()
540 def do_it(did_list):
543 def do_it(did_list):
541 """Produce a deferred to the final result, but first fire the
544 """Produce a deferred to the final result, but first fire the
542 deferred we will return to the user that has the local
545 deferred we will return to the user that has the local
543 deferred id."""
546 deferred id."""
544 d_to_return.callback(deferred_id)
547 d_to_return.callback(deferred_id)
545 return process_did_list(did_list)
548 return process_did_list(did_list)
546 d.addCallback(do_it)
549 d.addCallback(do_it)
547 # Now save the deferred to the final result
550 # Now save the deferred to the final result
548 self.pdm.save_pending_deferred(d, deferred_id)
551 self.pdm.save_pending_deferred(d, deferred_id)
549 return d_to_return
552 return d_to_return
550
553
551 d = self._process_targets(targets)
554 d = self._process_targets(targets)
552 d.addCallback(do_scatter)
555 d.addCallback(do_scatter)
553 return d
556 return d
554
557
555 def gather(self, key, dist='b', targets='all', block=True):
558 def gather(self, key, dist='b', targets='all', block=True):
556
559
557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
560 # Note: scatter and gather handle pending deferreds locally through self.pdm.
558 # This enables us to collect a bunch fo deferred ids and make a secondary
561 # This enables us to collect a bunch fo deferred ids and make a secondary
559 # deferred id that corresponds to the entire group. This logic is extremely
562 # deferred id that corresponds to the entire group. This logic is extremely
560 # difficult to get right though.
563 # difficult to get right though.
561 def do_gather(engines):
564 def do_gather(engines):
562 nEngines = len(engines)
565 nEngines = len(engines)
563 mapClass = Map.dists[dist]
566 mapClass = Map.dists[dist]
564 mapObject = mapClass()
567 mapObject = mapClass()
565 d_list = []
568 d_list = []
566 # Loop through and push to each engine in non-blocking mode.
569 # Loop through and push to each engine in non-blocking mode.
567 # This returns a set of deferreds to deferred_ids
570 # This returns a set of deferreds to deferred_ids
568 for index, engineid in enumerate(engines):
571 for index, engineid in enumerate(engines):
569 d = self.pull(key, targets=engineid, block=False)
572 d = self.pull(key, targets=engineid, block=False)
570 d_list.append(d)
573 d_list.append(d)
571 # Collect the deferred to deferred_ids
574 # Collect the deferred to deferred_ids
572 d = gatherBoth(d_list,
575 d = gatherBoth(d_list,
573 fireOnOneErrback=0,
576 fireOnOneErrback=0,
574 consumeErrors=1,
577 consumeErrors=1,
575 logErrors=0)
578 logErrors=0)
576 # Now d has a list of deferred_ids or Failures coming
579 # Now d has a list of deferred_ids or Failures coming
577 d.addCallback(error.collect_exceptions, 'scatter')
580 d.addCallback(error.collect_exceptions, 'scatter')
578 def process_did_list(did_list):
581 def process_did_list(did_list):
579 """Turn a list of deferred_ids into a final result or failure."""
582 """Turn a list of deferred_ids into a final result or failure."""
580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
583 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
581 final_d = gatherBoth(new_d_list,
584 final_d = gatherBoth(new_d_list,
582 fireOnOneErrback=0,
585 fireOnOneErrback=0,
583 consumeErrors=1,
586 consumeErrors=1,
584 logErrors=0)
587 logErrors=0)
585 final_d.addCallback(error.collect_exceptions, 'gather')
588 final_d.addCallback(error.collect_exceptions, 'gather')
586 final_d.addCallback(lambda lop: [i[0] for i in lop])
589 final_d.addCallback(lambda lop: [i[0] for i in lop])
587 final_d.addCallback(mapObject.joinPartitions)
590 final_d.addCallback(mapObject.joinPartitions)
588 return final_d
591 return final_d
589 # Now, depending on block, we need to handle the list deferred_ids
592 # Now, depending on block, we need to handle the list deferred_ids
590 # coming down the pipe diferently.
593 # coming down the pipe diferently.
591 if block:
594 if block:
592 # If we are blocking register a callback that will transform the
595 # If we are blocking register a callback that will transform the
593 # list of deferred_ids into the final result.
596 # list of deferred_ids into the final result.
594 d.addCallback(process_did_list)
597 d.addCallback(process_did_list)
595 return d
598 return d
596 else:
599 else:
597 # Here we are going to use a _local_ PendingDeferredManager.
600 # Here we are going to use a _local_ PendingDeferredManager.
598 deferred_id = self.pdm.get_deferred_id()
601 deferred_id = self.pdm.get_deferred_id()
599 # This is the deferred we will return to the user that will fire
602 # This is the deferred we will return to the user that will fire
600 # with the local deferred_id AFTER we have received the list of
603 # with the local deferred_id AFTER we have received the list of
601 # primary deferred_ids
604 # primary deferred_ids
602 d_to_return = defer.Deferred()
605 d_to_return = defer.Deferred()
603 def do_it(did_list):
606 def do_it(did_list):
604 """Produce a deferred to the final result, but first fire the
607 """Produce a deferred to the final result, but first fire the
605 deferred we will return to the user that has the local
608 deferred we will return to the user that has the local
606 deferred id."""
609 deferred id."""
607 d_to_return.callback(deferred_id)
610 d_to_return.callback(deferred_id)
608 return process_did_list(did_list)
611 return process_did_list(did_list)
609 d.addCallback(do_it)
612 d.addCallback(do_it)
610 # Now save the deferred to the final result
613 # Now save the deferred to the final result
611 self.pdm.save_pending_deferred(d, deferred_id)
614 self.pdm.save_pending_deferred(d, deferred_id)
612 return d_to_return
615 return d_to_return
613
616
614 d = self._process_targets(targets)
617 d = self._process_targets(targets)
615 d.addCallback(do_gather)
618 d.addCallback(do_gather)
616 return d
619 return d
617
620
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
621 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 """
622 """
620 A parallelized version of Python's builtin map.
623 A parallelized version of Python's builtin map.
621
624
622 This has a slightly different syntax than the builtin `map`.
625 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
626 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
627 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
628 be passed in a list or tuple.
626
629
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
630 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
631
629 Most users will want to use parallel functions or the `mapper`
632 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
633 and `map` methods for an API that follows that of the builtin
631 `map`.
634 `map`.
632 """
635 """
633 if not isinstance(sequences, (list, tuple)):
636 if not isinstance(sequences, (list, tuple)):
634 raise TypeError('sequences must be a list or tuple')
637 raise TypeError('sequences must be a list or tuple')
635 max_len = max(len(s) for s in sequences)
638 max_len = max(len(s) for s in sequences)
636 for s in sequences:
639 for s in sequences:
637 if len(s)!=max_len:
640 if len(s)!=max_len:
638 raise ValueError('all sequences must have equal length')
641 raise ValueError('all sequences must have equal length')
639 if isinstance(func, FunctionType):
642 if isinstance(func, FunctionType):
640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
643 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
644 d.addCallback(lambda did: self.get_pending_deferred(did, True))
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
645 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
643 elif isinstance(func, str):
646 elif isinstance(func, str):
644 d = defer.succeed(None)
647 d = defer.succeed(None)
645 sourceToRun = \
648 sourceToRun = \
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
649 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
647 else:
650 else:
648 raise TypeError("func must be a function or str")
651 raise TypeError("func must be a function or str")
649
652
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
653 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
654 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
655 d.addCallback(lambda did: self.get_pending_deferred(did, True))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
656 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
654 return d
657 return d
655
658
656 def map(self, func, *sequences):
659 def map(self, func, *sequences):
657 """
660 """
658 A parallel version of Python's builtin `map` function.
661 A parallel version of Python's builtin `map` function.
659
662
660 This method applies a function to sequences of arguments. It
663 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
664 follows the same syntax as the builtin `map`.
662
665
663 This method creates a mapper objects by calling `self.mapper` with
666 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
667 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
668 the documentation of `mapper` for more details.
666 """
669 """
667 return self.mapper().map(func, *sequences)
670 return self.mapper().map(func, *sequences)
668
671
669 def mapper(self, dist='b', targets='all', block=True):
672 def mapper(self, dist='b', targets='all', block=True):
670 """
673 """
671 Create a mapper object that has a `map` method.
674 Create a mapper object that has a `map` method.
672
675
673 This method returns an object that implements the `IMapper`
676 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
677 interface. This method is a factory that is used to control how
675 the map happens.
678 the map happens.
676
679
677 :Parameters:
680 :Parameters:
678 dist : str
681 dist : str
679 What decomposition to use, 'b' is the only one supported
682 What decomposition to use, 'b' is the only one supported
680 currently
683 currently
681 targets : str, int, sequence of ints
684 targets : str, int, sequence of ints
682 Which engines to use for the map
685 Which engines to use for the map
683 block : boolean
686 block : boolean
684 Should calls to `map` block or not
687 Should calls to `map` block or not
685 """
688 """
686 return MultiEngineMapper(self, dist, targets, block)
689 return MultiEngineMapper(self, dist, targets, block)
687
690
688 def parallel(self, dist='b', targets='all', block=True):
691 def parallel(self, dist='b', targets='all', block=True):
689 """
692 """
690 A decorator that turns a function into a parallel function.
693 A decorator that turns a function into a parallel function.
691
694
692 This can be used as:
695 This can be used as:
693
696
694 @parallel()
697 @parallel()
695 def f(x, y)
698 def f(x, y)
696 ...
699 ...
697
700
698 f(range(10), range(10))
701 f(range(10), range(10))
699
702
700 This causes f(0,0), f(1,1), ... to be called in parallel.
703 This causes f(0,0), f(1,1), ... to be called in parallel.
701
704
702 :Parameters:
705 :Parameters:
703 dist : str
706 dist : str
704 What decomposition to use, 'b' is the only one supported
707 What decomposition to use, 'b' is the only one supported
705 currently
708 currently
706 targets : str, int, sequence of ints
709 targets : str, int, sequence of ints
707 Which engines to use for the map
710 Which engines to use for the map
708 block : boolean
711 block : boolean
709 Should calls to `map` block or not
712 Should calls to `map` block or not
710 """
713 """
711 mapper = self.mapper(dist, targets, block)
714 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
715 pf = ParallelFunction(mapper)
713 return pf
716 return pf
714
717
715 #---------------------------------------------------------------------------
718 #---------------------------------------------------------------------------
716 # ISynchronousMultiEngineExtras related methods
719 # ISynchronousMultiEngineExtras related methods
717 #---------------------------------------------------------------------------
720 #---------------------------------------------------------------------------
718
721
719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
722 def _transformPullResult(self, pushResult, multitargets, lenKeys):
720 if not multitargets:
723 if not multitargets:
721 result = pushResult[0]
724 result = pushResult[0]
722 elif lenKeys > 1:
725 elif lenKeys > 1:
723 result = zip(*pushResult)
726 result = zip(*pushResult)
724 elif lenKeys is 1:
727 elif lenKeys is 1:
725 result = list(pushResult)
728 result = list(pushResult)
726 return result
729 return result
727
730
728 def zip_pull(self, keys, targets='all', block=True):
731 def zip_pull(self, keys, targets='all', block=True):
729 multitargets = not isinstance(targets, int) and len(targets) > 1
732 multitargets = not isinstance(targets, int) and len(targets) > 1
730 lenKeys = len(keys)
733 lenKeys = len(keys)
731 d = self.pull(keys, targets=targets, block=block)
734 d = self.pull(keys, targets=targets, block=block)
732 if block:
735 if block:
733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
736 d.addCallback(self._transformPullResult, multitargets, lenKeys)
734 else:
737 else:
735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
738 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
736 return d
739 return d
737
740
738 def run(self, fname, targets='all', block=True):
741 def run(self, fname, targets='all', block=True):
739 fileobj = open(fname,'r')
742 fileobj = open(fname,'r')
740 source = fileobj.read()
743 source = fileobj.read()
741 fileobj.close()
744 fileobj.close()
742 # if the compilation blows, we get a local error right away
745 # if the compilation blows, we get a local error right away
743 try:
746 try:
744 code = compile(source,fname,'exec')
747 code = compile(source,fname,'exec')
745 except:
748 except:
746 return defer.fail(failure.Failure())
749 return defer.fail(failure.Failure())
747 # Now run the code
750 # Now run the code
748 d = self.execute(source, targets=targets, block=block)
751 d = self.execute(source, targets=targets, block=block)
749 return d
752 return d
750
753
751 #---------------------------------------------------------------------------
754 #---------------------------------------------------------------------------
752 # IBlockingClientAdaptor related methods
755 # IBlockingClientAdaptor related methods
753 #---------------------------------------------------------------------------
756 #---------------------------------------------------------------------------
754
757
755 def adapt_to_blocking_client(self):
758 def adapt_to_blocking_client(self):
756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
759 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
757 return IFullBlockingMultiEngineClient(self)
760 return IFullBlockingMultiEngineClient(self)
@@ -1,194 +1,198 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """
4 """
5 A blocking version of the task client.
5 A blocking version of the task client.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 from zope.interface import Interface, implements
21 from zope.interface import Interface, implements
22 from twisted.python import components
22 from twisted.python import components
23
24 try:
25 from foolscap.api import DeadReferenceError
26 except ImportError:
23 from foolscap import DeadReferenceError
27 from foolscap import DeadReferenceError
24
28
25 from IPython.kernel.twistedutil import blockingCallFromThread
29 from IPython.kernel.twistedutil import blockingCallFromThread
26 from IPython.kernel import task, error
30 from IPython.kernel import task, error
27 from IPython.kernel.mapper import (
31 from IPython.kernel.mapper import (
28 SynchronousTaskMapper,
32 SynchronousTaskMapper,
29 ITaskMapperFactory,
33 ITaskMapperFactory,
30 IMapper
34 IMapper
31 )
35 )
32 from IPython.kernel.parallelfunction import (
36 from IPython.kernel.parallelfunction import (
33 ParallelFunction,
37 ParallelFunction,
34 ITaskParallelDecorator
38 ITaskParallelDecorator
35 )
39 )
36
40
37 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
38 # The task client
42 # The task client
39 #-------------------------------------------------------------------------------
43 #-------------------------------------------------------------------------------
40
44
41 class IBlockingTaskClient(Interface):
45 class IBlockingTaskClient(Interface):
42 """
46 """
43 A vague interface of the blocking task client
47 A vague interface of the blocking task client
44 """
48 """
45 pass
49 pass
46
50
47 class BlockingTaskClient(object):
51 class BlockingTaskClient(object):
48 """
52 """
49 A blocking task client that adapts a non-blocking one.
53 A blocking task client that adapts a non-blocking one.
50 """
54 """
51
55
52 implements(
56 implements(
53 IBlockingTaskClient,
57 IBlockingTaskClient,
54 ITaskMapperFactory,
58 ITaskMapperFactory,
55 IMapper,
59 IMapper,
56 ITaskParallelDecorator
60 ITaskParallelDecorator
57 )
61 )
58
62
59 def __init__(self, task_controller):
63 def __init__(self, task_controller):
60 self.task_controller = task_controller
64 self.task_controller = task_controller
61 self.block = True
65 self.block = True
62
66
63 def _bcft(self, *args, **kwargs):
67 def _bcft(self, *args, **kwargs):
64 try:
68 try:
65 result = blockingCallFromThread(*args, **kwargs)
69 result = blockingCallFromThread(*args, **kwargs)
66 except DeadReferenceError:
70 except DeadReferenceError:
67 raise error.ConnectionError(
71 raise error.ConnectionError(
68 """A connection error has occurred in trying to connect to the
72 """A connection error has occurred in trying to connect to the
69 controller. This is usually caused by the controller dying or
73 controller. This is usually caused by the controller dying or
70 being restarted. To resolve this issue try recreating the
74 being restarted. To resolve this issue try recreating the
71 task client."""
75 task client."""
72 )
76 )
73 else:
77 else:
74 return result
78 return result
75
79
76 def run(self, task, block=False):
80 def run(self, task, block=False):
77 """Run a task on the `TaskController`.
81 """Run a task on the `TaskController`.
78
82
79 See the documentation of the `MapTask` and `StringTask` classes for
83 See the documentation of the `MapTask` and `StringTask` classes for
80 details on how to build a task of different types.
84 details on how to build a task of different types.
81
85
82 :Parameters:
86 :Parameters:
83 task : an `ITask` implementer
87 task : an `ITask` implementer
84
88
85 :Returns: The int taskid of the submitted task. Pass this to
89 :Returns: The int taskid of the submitted task. Pass this to
86 `get_task_result` to get the `TaskResult` object.
90 `get_task_result` to get the `TaskResult` object.
87 """
91 """
88 tid = self._bcft(self.task_controller.run, task)
92 tid = self._bcft(self.task_controller.run, task)
89 if block:
93 if block:
90 return self.get_task_result(tid, block=True)
94 return self.get_task_result(tid, block=True)
91 else:
95 else:
92 return tid
96 return tid
93
97
94 def get_task_result(self, taskid, block=False):
98 def get_task_result(self, taskid, block=False):
95 """
99 """
96 Get a task result by taskid.
100 Get a task result by taskid.
97
101
98 :Parameters:
102 :Parameters:
99 taskid : int
103 taskid : int
100 The taskid of the task to be retrieved.
104 The taskid of the task to be retrieved.
101 block : boolean
105 block : boolean
102 Should I block until the task is done?
106 Should I block until the task is done?
103
107
104 :Returns: A `TaskResult` object that encapsulates the task result.
108 :Returns: A `TaskResult` object that encapsulates the task result.
105 """
109 """
106 return self._bcft(self.task_controller.get_task_result,
110 return self._bcft(self.task_controller.get_task_result,
107 taskid, block)
111 taskid, block)
108
112
109 def abort(self, taskid):
113 def abort(self, taskid):
110 """
114 """
111 Abort a task by taskid.
115 Abort a task by taskid.
112
116
113 :Parameters:
117 :Parameters:
114 taskid : int
118 taskid : int
115 The taskid of the task to be aborted.
119 The taskid of the task to be aborted.
116 """
120 """
117 return self._bcft(self.task_controller.abort, taskid)
121 return self._bcft(self.task_controller.abort, taskid)
118
122
119 def barrier(self, taskids):
123 def barrier(self, taskids):
120 """Block until a set of tasks are completed.
124 """Block until a set of tasks are completed.
121
125
122 :Parameters:
126 :Parameters:
123 taskids : list, tuple
127 taskids : list, tuple
124 A sequence of taskids to block on.
128 A sequence of taskids to block on.
125 """
129 """
126 return self._bcft(self.task_controller.barrier, taskids)
130 return self._bcft(self.task_controller.barrier, taskids)
127
131
128 def spin(self):
132 def spin(self):
129 """
133 """
130 Touch the scheduler, to resume scheduling without submitting a task.
134 Touch the scheduler, to resume scheduling without submitting a task.
131
135
132 This method only needs to be called in unusual situations where the
136 This method only needs to be called in unusual situations where the
133 scheduler is idle for some reason.
137 scheduler is idle for some reason.
134 """
138 """
135 return self._bcft(self.task_controller.spin)
139 return self._bcft(self.task_controller.spin)
136
140
137 def queue_status(self, verbose=False):
141 def queue_status(self, verbose=False):
138 """
142 """
139 Get a dictionary with the current state of the task queue.
143 Get a dictionary with the current state of the task queue.
140
144
141 :Parameters:
145 :Parameters:
142 verbose : boolean
146 verbose : boolean
143 If True, return a list of taskids. If False, simply give
147 If True, return a list of taskids. If False, simply give
144 the number of tasks with each status.
148 the number of tasks with each status.
145
149
146 :Returns:
150 :Returns:
147 A dict with the queue status.
151 A dict with the queue status.
148 """
152 """
149 return self._bcft(self.task_controller.queue_status, verbose)
153 return self._bcft(self.task_controller.queue_status, verbose)
150
154
151 def clear(self):
155 def clear(self):
152 """
156 """
153 Clear all previously run tasks from the task controller.
157 Clear all previously run tasks from the task controller.
154
158
155 This is needed because the task controller keep all task results
159 This is needed because the task controller keep all task results
156 in memory. This can be a problem is there are many completed
160 in memory. This can be a problem is there are many completed
157 tasks. Users should call this periodically to clean out these
161 tasks. Users should call this periodically to clean out these
158 cached task results.
162 cached task results.
159 """
163 """
160 return self._bcft(self.task_controller.clear)
164 return self._bcft(self.task_controller.clear)
161
165
162 def map(self, func, *sequences):
166 def map(self, func, *sequences):
163 """
167 """
164 Apply func to *sequences elementwise. Like Python's builtin map.
168 Apply func to *sequences elementwise. Like Python's builtin map.
165
169
166 This version is load balanced.
170 This version is load balanced.
167 """
171 """
168 return self.mapper().map(func, *sequences)
172 return self.mapper().map(func, *sequences)
169
173
170 def mapper(self, clear_before=False, clear_after=False, retries=0,
174 def mapper(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
175 recovery_task=None, depend=None, block=True):
172 """
176 """
173 Create an `IMapper` implementer with a given set of arguments.
177 Create an `IMapper` implementer with a given set of arguments.
174
178
175 The `IMapper` created using a task controller is load balanced.
179 The `IMapper` created using a task controller is load balanced.
176
180
177 See the documentation for `IPython.kernel.task.BaseTask` for
181 See the documentation for `IPython.kernel.task.BaseTask` for
178 documentation on the arguments to this method.
182 documentation on the arguments to this method.
179 """
183 """
180 return SynchronousTaskMapper(self, clear_before=clear_before,
184 return SynchronousTaskMapper(self, clear_before=clear_before,
181 clear_after=clear_after, retries=retries,
185 clear_after=clear_after, retries=retries,
182 recovery_task=recovery_task, depend=depend, block=block)
186 recovery_task=recovery_task, depend=depend, block=block)
183
187
184 def parallel(self, clear_before=False, clear_after=False, retries=0,
188 def parallel(self, clear_before=False, clear_after=False, retries=0,
185 recovery_task=None, depend=None, block=True):
189 recovery_task=None, depend=None, block=True):
186 mapper = self.mapper(clear_before, clear_after, retries,
190 mapper = self.mapper(clear_before, clear_after, retries,
187 recovery_task, depend, block)
191 recovery_task, depend, block)
188 pf = ParallelFunction(mapper)
192 pf = ParallelFunction(mapper)
189 return pf
193 return pf
190
194
191 components.registerAdapter(BlockingTaskClient,
195 components.registerAdapter(BlockingTaskClient,
192 task.ITaskController, IBlockingTaskClient)
196 task.ITaskController, IBlockingTaskClient)
193
197
194
198
@@ -1,326 +1,329 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
3 """A Foolscap interface to a TaskController.
3 """A Foolscap interface to a TaskController.
4
4
5 This class lets Foolscap clients talk to a TaskController.
5 This class lets Foolscap clients talk to a TaskController.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 import cPickle as pickle
21 import cPickle as pickle
22
22
23 from zope.interface import Interface, implements
23 from zope.interface import Interface, implements
24 from twisted.internet import defer
24 from twisted.internet import defer
25 from twisted.python import components
25 from twisted.python import components
26
26
27 try:
28 from foolscap.api import Referenceable
29 except ImportError:
27 from foolscap import Referenceable
30 from foolscap import Referenceable
28
31
29 from IPython.kernel import task as taskmodule
32 from IPython.kernel import task as taskmodule
30 from IPython.kernel.clientinterfaces import (
33 from IPython.kernel.clientinterfaces import (
31 IFCClientInterfaceProvider,
34 IFCClientInterfaceProvider,
32 IBlockingClientAdaptor
35 IBlockingClientAdaptor
33 )
36 )
34 from IPython.kernel.mapper import (
37 from IPython.kernel.mapper import (
35 TaskMapper,
38 TaskMapper,
36 ITaskMapperFactory,
39 ITaskMapperFactory,
37 IMapper
40 IMapper
38 )
41 )
39 from IPython.kernel.parallelfunction import (
42 from IPython.kernel.parallelfunction import (
40 ParallelFunction,
43 ParallelFunction,
41 ITaskParallelDecorator
44 ITaskParallelDecorator
42 )
45 )
43
46
44 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
45 # The Controller side of things
48 # The Controller side of things
46 #-------------------------------------------------------------------------------
49 #-------------------------------------------------------------------------------
47
50
48
51
49 class IFCTaskController(Interface):
52 class IFCTaskController(Interface):
50 """Foolscap interface to task controller.
53 """Foolscap interface to task controller.
51
54
52 See the documentation of `ITaskController` for more information.
55 See the documentation of `ITaskController` for more information.
53 """
56 """
54 def remote_run(binTask):
57 def remote_run(binTask):
55 """"""
58 """"""
56
59
57 def remote_abort(taskid):
60 def remote_abort(taskid):
58 """"""
61 """"""
59
62
60 def remote_get_task_result(taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
61 """"""
64 """"""
62
65
63 def remote_barrier(taskids):
66 def remote_barrier(taskids):
64 """"""
67 """"""
65
68
66 def remote_spin():
69 def remote_spin():
67 """"""
70 """"""
68
71
69 def remote_queue_status(verbose):
72 def remote_queue_status(verbose):
70 """"""
73 """"""
71
74
72 def remote_clear():
75 def remote_clear():
73 """"""
76 """"""
74
77
75
78
76 class FCTaskControllerFromTaskController(Referenceable):
79 class FCTaskControllerFromTaskController(Referenceable):
77 """
80 """
78 Adapt a `TaskController` to an `IFCTaskController`
81 Adapt a `TaskController` to an `IFCTaskController`
79
82
80 This class is used to expose a `TaskController` over the wire using
83 This class is used to expose a `TaskController` over the wire using
81 the Foolscap network protocol.
84 the Foolscap network protocol.
82 """
85 """
83
86
84 implements(IFCTaskController, IFCClientInterfaceProvider)
87 implements(IFCTaskController, IFCClientInterfaceProvider)
85
88
86 def __init__(self, taskController):
89 def __init__(self, taskController):
87 self.taskController = taskController
90 self.taskController = taskController
88
91
89 #---------------------------------------------------------------------------
92 #---------------------------------------------------------------------------
90 # Non interface methods
93 # Non interface methods
91 #---------------------------------------------------------------------------
94 #---------------------------------------------------------------------------
92
95
93 def packageFailure(self, f):
96 def packageFailure(self, f):
94 f.cleanFailure()
97 f.cleanFailure()
95 return self.packageSuccess(f)
98 return self.packageSuccess(f)
96
99
97 def packageSuccess(self, obj):
100 def packageSuccess(self, obj):
98 serial = pickle.dumps(obj, 2)
101 serial = pickle.dumps(obj, 2)
99 return serial
102 return serial
100
103
101 #---------------------------------------------------------------------------
104 #---------------------------------------------------------------------------
102 # ITaskController related methods
105 # ITaskController related methods
103 #---------------------------------------------------------------------------
106 #---------------------------------------------------------------------------
104
107
105 def remote_run(self, ptask):
108 def remote_run(self, ptask):
106 try:
109 try:
107 task = pickle.loads(ptask)
110 task = pickle.loads(ptask)
108 task.uncan_task()
111 task.uncan_task()
109 except:
112 except:
110 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
111 else:
114 else:
112 d = self.taskController.run(task)
115 d = self.taskController.run(task)
113 d.addCallback(self.packageSuccess)
116 d.addCallback(self.packageSuccess)
114 d.addErrback(self.packageFailure)
117 d.addErrback(self.packageFailure)
115 return d
118 return d
116
119
117 def remote_abort(self, taskid):
120 def remote_abort(self, taskid):
118 d = self.taskController.abort(taskid)
121 d = self.taskController.abort(taskid)
119 d.addCallback(self.packageSuccess)
122 d.addCallback(self.packageSuccess)
120 d.addErrback(self.packageFailure)
123 d.addErrback(self.packageFailure)
121 return d
124 return d
122
125
123 def remote_get_task_result(self, taskid, block=False):
126 def remote_get_task_result(self, taskid, block=False):
124 d = self.taskController.get_task_result(taskid, block)
127 d = self.taskController.get_task_result(taskid, block)
125 d.addCallback(self.packageSuccess)
128 d.addCallback(self.packageSuccess)
126 d.addErrback(self.packageFailure)
129 d.addErrback(self.packageFailure)
127 return d
130 return d
128
131
129 def remote_barrier(self, taskids):
132 def remote_barrier(self, taskids):
130 d = self.taskController.barrier(taskids)
133 d = self.taskController.barrier(taskids)
131 d.addCallback(self.packageSuccess)
134 d.addCallback(self.packageSuccess)
132 d.addErrback(self.packageFailure)
135 d.addErrback(self.packageFailure)
133 return d
136 return d
134
137
135 def remote_spin(self):
138 def remote_spin(self):
136 d = self.taskController.spin()
139 d = self.taskController.spin()
137 d.addCallback(self.packageSuccess)
140 d.addCallback(self.packageSuccess)
138 d.addErrback(self.packageFailure)
141 d.addErrback(self.packageFailure)
139 return d
142 return d
140
143
141 def remote_queue_status(self, verbose):
144 def remote_queue_status(self, verbose):
142 d = self.taskController.queue_status(verbose)
145 d = self.taskController.queue_status(verbose)
143 d.addCallback(self.packageSuccess)
146 d.addCallback(self.packageSuccess)
144 d.addErrback(self.packageFailure)
147 d.addErrback(self.packageFailure)
145 return d
148 return d
146
149
147 def remote_clear(self):
150 def remote_clear(self):
148 return self.taskController.clear()
151 return self.taskController.clear()
149
152
150 def remote_get_client_name(self):
153 def remote_get_client_name(self):
151 return 'IPython.kernel.taskfc.FCTaskClient'
154 return 'IPython.kernel.taskfc.FCTaskClient'
152
155
153 components.registerAdapter(FCTaskControllerFromTaskController,
156 components.registerAdapter(FCTaskControllerFromTaskController,
154 taskmodule.ITaskController, IFCTaskController)
157 taskmodule.ITaskController, IFCTaskController)
155
158
156
159
157 #-------------------------------------------------------------------------------
160 #-------------------------------------------------------------------------------
158 # The Client side of things
161 # The Client side of things
159 #-------------------------------------------------------------------------------
162 #-------------------------------------------------------------------------------
160
163
161 class FCTaskClient(object):
164 class FCTaskClient(object):
162 """
165 """
163 Client class for Foolscap exposed `TaskController`.
166 Client class for Foolscap exposed `TaskController`.
164
167
165 This class is an adapter that makes a `RemoteReference` to a
168 This class is an adapter that makes a `RemoteReference` to a
166 `TaskController` look like an actual `ITaskController` on the client side.
169 `TaskController` look like an actual `ITaskController` on the client side.
167
170
168 This class also implements `IBlockingClientAdaptor` so that clients can
171 This class also implements `IBlockingClientAdaptor` so that clients can
169 automatically get a blocking version of this class.
172 automatically get a blocking version of this class.
170 """
173 """
171
174
172 implements(
175 implements(
173 taskmodule.ITaskController,
176 taskmodule.ITaskController,
174 IBlockingClientAdaptor,
177 IBlockingClientAdaptor,
175 ITaskMapperFactory,
178 ITaskMapperFactory,
176 IMapper,
179 IMapper,
177 ITaskParallelDecorator
180 ITaskParallelDecorator
178 )
181 )
179
182
180 def __init__(self, remote_reference):
183 def __init__(self, remote_reference):
181 self.remote_reference = remote_reference
184 self.remote_reference = remote_reference
182
185
183 #---------------------------------------------------------------------------
186 #---------------------------------------------------------------------------
184 # Non interface methods
187 # Non interface methods
185 #---------------------------------------------------------------------------
188 #---------------------------------------------------------------------------
186
189
187 def unpackage(self, r):
190 def unpackage(self, r):
188 return pickle.loads(r)
191 return pickle.loads(r)
189
192
190 #---------------------------------------------------------------------------
193 #---------------------------------------------------------------------------
191 # ITaskController related methods
194 # ITaskController related methods
192 #---------------------------------------------------------------------------
195 #---------------------------------------------------------------------------
193 def run(self, task):
196 def run(self, task):
194 """Run a task on the `TaskController`.
197 """Run a task on the `TaskController`.
195
198
196 See the documentation of the `MapTask` and `StringTask` classes for
199 See the documentation of the `MapTask` and `StringTask` classes for
197 details on how to build a task of different types.
200 details on how to build a task of different types.
198
201
199 :Parameters:
202 :Parameters:
200 task : an `ITask` implementer
203 task : an `ITask` implementer
201
204
202 :Returns: The int taskid of the submitted task. Pass this to
205 :Returns: The int taskid of the submitted task. Pass this to
203 `get_task_result` to get the `TaskResult` object.
206 `get_task_result` to get the `TaskResult` object.
204 """
207 """
205 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
206 task.can_task()
209 task.can_task()
207 ptask = pickle.dumps(task, 2)
210 ptask = pickle.dumps(task, 2)
208 task.uncan_task()
211 task.uncan_task()
209 d = self.remote_reference.callRemote('run', ptask)
212 d = self.remote_reference.callRemote('run', ptask)
210 d.addCallback(self.unpackage)
213 d.addCallback(self.unpackage)
211 return d
214 return d
212
215
213 def get_task_result(self, taskid, block=False):
216 def get_task_result(self, taskid, block=False):
214 """
217 """
215 Get a task result by taskid.
218 Get a task result by taskid.
216
219
217 :Parameters:
220 :Parameters:
218 taskid : int
221 taskid : int
219 The taskid of the task to be retrieved.
222 The taskid of the task to be retrieved.
220 block : boolean
223 block : boolean
221 Should I block until the task is done?
224 Should I block until the task is done?
222
225
223 :Returns: A `TaskResult` object that encapsulates the task result.
226 :Returns: A `TaskResult` object that encapsulates the task result.
224 """
227 """
225 d = self.remote_reference.callRemote('get_task_result', taskid, block)
228 d = self.remote_reference.callRemote('get_task_result', taskid, block)
226 d.addCallback(self.unpackage)
229 d.addCallback(self.unpackage)
227 return d
230 return d
228
231
229 def abort(self, taskid):
232 def abort(self, taskid):
230 """
233 """
231 Abort a task by taskid.
234 Abort a task by taskid.
232
235
233 :Parameters:
236 :Parameters:
234 taskid : int
237 taskid : int
235 The taskid of the task to be aborted.
238 The taskid of the task to be aborted.
236 """
239 """
237 d = self.remote_reference.callRemote('abort', taskid)
240 d = self.remote_reference.callRemote('abort', taskid)
238 d.addCallback(self.unpackage)
241 d.addCallback(self.unpackage)
239 return d
242 return d
240
243
241 def barrier(self, taskids):
244 def barrier(self, taskids):
242 """Block until a set of tasks are completed.
245 """Block until a set of tasks are completed.
243
246
244 :Parameters:
247 :Parameters:
245 taskids : list, tuple
248 taskids : list, tuple
246 A sequence of taskids to block on.
249 A sequence of taskids to block on.
247 """
250 """
248 d = self.remote_reference.callRemote('barrier', taskids)
251 d = self.remote_reference.callRemote('barrier', taskids)
249 d.addCallback(self.unpackage)
252 d.addCallback(self.unpackage)
250 return d
253 return d
251
254
252 def spin(self):
255 def spin(self):
253 """
256 """
254 Touch the scheduler, to resume scheduling without submitting a task.
257 Touch the scheduler, to resume scheduling without submitting a task.
255
258
256 This method only needs to be called in unusual situations where the
259 This method only needs to be called in unusual situations where the
257 scheduler is idle for some reason.
260 scheduler is idle for some reason.
258 """
261 """
259 d = self.remote_reference.callRemote('spin')
262 d = self.remote_reference.callRemote('spin')
260 d.addCallback(self.unpackage)
263 d.addCallback(self.unpackage)
261 return d
264 return d
262
265
263 def queue_status(self, verbose=False):
266 def queue_status(self, verbose=False):
264 """
267 """
265 Get a dictionary with the current state of the task queue.
268 Get a dictionary with the current state of the task queue.
266
269
267 :Parameters:
270 :Parameters:
268 verbose : boolean
271 verbose : boolean
269 If True, return a list of taskids. If False, simply give
272 If True, return a list of taskids. If False, simply give
270 the number of tasks with each status.
273 the number of tasks with each status.
271
274
272 :Returns:
275 :Returns:
273 A dict with the queue status.
276 A dict with the queue status.
274 """
277 """
275 d = self.remote_reference.callRemote('queue_status', verbose)
278 d = self.remote_reference.callRemote('queue_status', verbose)
276 d.addCallback(self.unpackage)
279 d.addCallback(self.unpackage)
277 return d
280 return d
278
281
279 def clear(self):
282 def clear(self):
280 """
283 """
281 Clear all previously run tasks from the task controller.
284 Clear all previously run tasks from the task controller.
282
285
283 This is needed because the task controller keep all task results
286 This is needed because the task controller keep all task results
284 in memory. This can be a problem is there are many completed
287 in memory. This can be a problem is there are many completed
285 tasks. Users should call this periodically to clean out these
288 tasks. Users should call this periodically to clean out these
286 cached task results.
289 cached task results.
287 """
290 """
288 d = self.remote_reference.callRemote('clear')
291 d = self.remote_reference.callRemote('clear')
289 return d
292 return d
290
293
291 def adapt_to_blocking_client(self):
294 def adapt_to_blocking_client(self):
292 """
295 """
293 Wrap self in a blocking version that implements `IBlockingTaskClient.
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
294 """
297 """
295 from IPython.kernel.taskclient import IBlockingTaskClient
298 from IPython.kernel.taskclient import IBlockingTaskClient
296 return IBlockingTaskClient(self)
299 return IBlockingTaskClient(self)
297
300
298 def map(self, func, *sequences):
301 def map(self, func, *sequences):
299 """
302 """
300 Apply func to *sequences elementwise. Like Python's builtin map.
303 Apply func to *sequences elementwise. Like Python's builtin map.
301
304
302 This version is load balanced.
305 This version is load balanced.
303 """
306 """
304 return self.mapper().map(func, *sequences)
307 return self.mapper().map(func, *sequences)
305
308
306 def mapper(self, clear_before=False, clear_after=False, retries=0,
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
307 recovery_task=None, depend=None, block=True):
310 recovery_task=None, depend=None, block=True):
308 """
311 """
309 Create an `IMapper` implementer with a given set of arguments.
312 Create an `IMapper` implementer with a given set of arguments.
310
313
311 The `IMapper` created using a task controller is load balanced.
314 The `IMapper` created using a task controller is load balanced.
312
315
313 See the documentation for `IPython.kernel.task.BaseTask` for
316 See the documentation for `IPython.kernel.task.BaseTask` for
314 documentation on the arguments to this method.
317 documentation on the arguments to this method.
315 """
318 """
316 return TaskMapper(self, clear_before=clear_before,
319 return TaskMapper(self, clear_before=clear_before,
317 clear_after=clear_after, retries=retries,
320 clear_after=clear_after, retries=retries,
318 recovery_task=recovery_task, depend=depend, block=block)
321 recovery_task=recovery_task, depend=depend, block=block)
319
322
320 def parallel(self, clear_before=False, clear_after=False, retries=0,
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
321 recovery_task=None, depend=None, block=True):
324 recovery_task=None, depend=None, block=True):
322 mapper = self.mapper(clear_before, clear_after, retries,
325 mapper = self.mapper(clear_before, clear_after, retries,
323 recovery_task, depend, block)
326 recovery_task, depend, block)
324 pf = ParallelFunction(mapper)
327 pf = ParallelFunction(mapper)
325 return pf
328 return pf
326
329
@@ -1,71 +1,148 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Run a Monte-Carlo options pricer in parallel."""
2 """Run a Monte-Carlo options pricer in parallel."""
3
3
4 #-----------------------------------------------------------------------------
5 # Imports
6 #-----------------------------------------------------------------------------
7
8 import sys
9 import time
4 from IPython.kernel import client
10 from IPython.kernel import client
5 import numpy as np
11 import numpy as np
6 from mcpricer import price_options
12 from mcpricer import price_options
13 from matplotlib import pyplot as plt
14
15 #-----------------------------------------------------------------------------
16 # Setup parameters for the run
17 #-----------------------------------------------------------------------------
18
19 def ask_question(text, the_type, default):
20 s = '%s [%r]: ' % (text, the_type(default))
21 result = raw_input(s)
22 if result:
23 return the_type(result)
24 else:
25 return the_type(default)
26
27 cluster_profile = ask_question("Cluster profile", str, "default")
28 price = ask_question("Initial price", float, 100.0)
29 rate = ask_question("Interest rate", float, 0.05)
30 days = ask_question("Days to expiration", int, 260)
31 paths = ask_question("Number of MC paths", int, 10000)
32 n_strikes = ask_question("Number of strike values", int, 5)
33 min_strike = ask_question("Min strike price", float, 90.0)
34 max_strike = ask_question("Max strike price", float, 110.0)
35 n_sigmas = ask_question("Number of volatility values", int, 5)
36 min_sigma = ask_question("Min volatility", float, 0.1)
37 max_sigma = ask_question("Max volatility", float, 0.4)
38
39 strike_vals = np.linspace(min_strike, max_strike, n_strikes)
40 sigma_vals = np.linspace(min_sigma, max_sigma, n_sigmas)
41
42 #-----------------------------------------------------------------------------
43 # Setup for parallel calculation
44 #-----------------------------------------------------------------------------
7
45
8 # The MultiEngineClient is used to setup the calculation and works with all
46 # The MultiEngineClient is used to setup the calculation and works with all
9 # engine.
47 # engine.
10 mec = client.MultiEngineClient(profile='mycluster')
48 mec = client.MultiEngineClient(profile=cluster_profile)
11
49
12 # The TaskClient is an interface to the engines that provides dynamic load
50 # The TaskClient is an interface to the engines that provides dynamic load
13 # balancing at the expense of not knowing which engine will execute the code.
51 # balancing at the expense of not knowing which engine will execute the code.
14 tc = client.TaskClient(profile='mycluster')
52 tc = client.TaskClient(profile=cluster_profile)
15
53
16 # Initialize the common code on the engines. This Python module has the
54 # Initialize the common code on the engines. This Python module has the
17 # price_options function that prices the options.
55 # price_options function that prices the options.
18 mec.run('mcpricer.py')
56 mec.run('mcpricer.py')
19
57
20 # Define the function that will make up our tasks. We basically want to
58 #-----------------------------------------------------------------------------
21 # call the price_options function with all but two arguments (K, sigma)
59 # Perform parallel calculation
22 # fixed.
60 #-----------------------------------------------------------------------------
23 def my_prices(K, sigma):
61
24 S = 100.0
62 print "Running parallel calculation over strike prices and volatilities..."
25 r = 0.05
63 print "Strike prices: ", strike_vals
26 days = 260
64 print "Volatilities: ", sigma_vals
27 paths = 100000
65 sys.stdout.flush()
28 return price_options(S, K, sigma, r, days, paths)
66
29
67 # Submit tasks to the TaskClient for each (strike, sigma) pair as a MapTask.
30 # Create arrays of strike prices and volatilities
68 t1 = time.time()
31 nK = 10
32 nsigma = 10
33 K_vals = np.linspace(90.0, 100.0, nK)
34 sigma_vals = np.linspace(0.1, 0.4, nsigma)
35
36 # Submit tasks to the TaskClient for each (K, sigma) pair as a MapTask.
37 # The MapTask simply applies a function (my_prices) to the arguments:
38 # my_prices(K, sigma) and returns the result.
39 taskids = []
69 taskids = []
40 for K in K_vals:
70 for strike in strike_vals:
41 for sigma in sigma_vals:
71 for sigma in sigma_vals:
42 t = client.MapTask(my_prices, args=(K, sigma))
72 t = client.MapTask(
73 price_options,
74 args=(price, strike, sigma, rate, days, paths)
75 )
43 taskids.append(tc.run(t))
76 taskids.append(tc.run(t))
44
77
45 print "Submitted tasks: ", len(taskids)
78 print "Submitted tasks: ", len(taskids)
79 sys.stdout.flush()
46
80
47 # Block until all tasks are completed.
81 # Block until all tasks are completed.
48 tc.barrier(taskids)
82 tc.barrier(taskids)
83 t2 = time.time()
84 t = t2-t1
85
86 print "Parallel calculation completed, time = %s s" % t
87 print "Collecting results..."
49
88
50 # Get the results using TaskClient.get_task_result.
89 # Get the results using TaskClient.get_task_result.
51 results = [tc.get_task_result(tid) for tid in taskids]
90 results = [tc.get_task_result(tid) for tid in taskids]
52
91
53 # Assemble the result into a structured NumPy array.
92 # Assemble the result into a structured NumPy array.
54 prices = np.empty(nK*nsigma,
93 prices = np.empty(n_strikes*n_sigmas,
55 dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)]
94 dtype=[('ecall',float),('eput',float),('acall',float),('aput',float)]
56 )
95 )
96
57 for i, price_tuple in enumerate(results):
97 for i, price_tuple in enumerate(results):
58 prices[i] = price_tuple
98 prices[i] = price_tuple
59 prices.shape = (nK, nsigma)
60 K_vals, sigma_vals = np.meshgrid(K_vals, sigma_vals)
61
99
62 def plot_options(sigma_vals, K_vals, prices):
100 prices.shape = (n_strikes, n_sigmas)
101 strike_mesh, sigma_mesh = np.meshgrid(strike_vals, sigma_vals)
102
103 print "Results are available: strike_mesh, sigma_mesh, prices"
104 print "To plot results type 'plot_options(sigma_mesh, strike_mesh, prices)'"
105
106 #-----------------------------------------------------------------------------
107 # Utilities
108 #-----------------------------------------------------------------------------
109
110 def plot_options(sigma_mesh, strike_mesh, prices):
63 """
111 """
64 Make a contour plot of the option price in (sigma, K) space.
112 Make a contour plot of the option price in (sigma, strike) space.
65 """
113 """
66 from matplotlib import pyplot as plt
114 plt.figure(1)
67 plt.contourf(sigma_vals, K_vals, prices)
115
116 plt.subplot(221)
117 plt.contourf(sigma_mesh, strike_mesh, prices['ecall'])
118 plt.axis('tight')
119 plt.colorbar()
120 plt.title('European Call')
121 plt.ylabel("Strike Price")
122
123 plt.subplot(222)
124 plt.contourf(sigma_mesh, strike_mesh, prices['acall'])
125 plt.axis('tight')
68 plt.colorbar()
126 plt.colorbar()
69 plt.title("Option Price")
127 plt.title("Asian Call")
128
129 plt.subplot(223)
130 plt.contourf(sigma_mesh, strike_mesh, prices['eput'])
131 plt.axis('tight')
132 plt.colorbar()
133 plt.title("European Put")
70 plt.xlabel("Volatility")
134 plt.xlabel("Volatility")
71 plt.ylabel("Strike Price")
135 plt.ylabel("Strike Price")
136
137 plt.subplot(224)
138 plt.contourf(sigma_mesh, strike_mesh, prices['aput'])
139 plt.axis('tight')
140 plt.colorbar()
141 plt.title("Asian Put")
142 plt.xlabel("Volatility")
143
144
145
146
147
148
General Comments 0
You need to be logged in to leave comments. Login now