##// END OF EJS Templates
Suppress foolscap deprecation warnings, bug reported by Satra....
Fernando Perez -
Show More
@@ -1,548 +1,554 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_enginepb -*-
2 # -*- test-case-name: IPython.kernel.test.test_enginepb -*-
3
3
4 """
4 """
5 Expose the IPython EngineService using the Foolscap network protocol.
5 Expose the IPython EngineService using the Foolscap network protocol.
6
6
7 Foolscap is a high-performance and secure network protocol.
7 Foolscap is a high-performance and secure network protocol.
8 """
8 """
9 __docformat__ = "restructuredtext en"
9 __docformat__ = "restructuredtext en"
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import os, time
22 import os, time
23 import cPickle as pickle
23 import cPickle as pickle
24
24
25 from twisted.python import components, log, failure
25 from twisted.python import components, log, failure
26 from twisted.python.failure import Failure
26 from twisted.python.failure import Failure
27 from twisted.internet import defer, reactor, threads
27 from twisted.internet import defer, reactor, threads
28 from twisted.internet.interfaces import IProtocolFactory
28 from twisted.internet.interfaces import IProtocolFactory
29 from zope.interface import Interface, implements, Attribute
29 from zope.interface import Interface, implements, Attribute
30
30
31 from twisted.internet.base import DelayedCall
31 from twisted.internet.base import DelayedCall
32 DelayedCall.debug = True
32 DelayedCall.debug = True
33
33
34 from foolscap import Referenceable, DeadReferenceError
34 try:
35 # This is preferred in foolscap v > 0.4.3
36 from foolscap.api import Referenceable, DeadReferenceError
37 except ImportError:
38 # Fallback for older versions
39 from foolscap import Referenceable, DeadReferenceError
40
35 from foolscap.referenceable import RemoteReference
41 from foolscap.referenceable import RemoteReference
36
42
37 from IPython.kernel.pbutil import packageFailure, unpackageFailure
43 from IPython.kernel.pbutil import packageFailure, unpackageFailure
38 from IPython.kernel.util import printer
44 from IPython.kernel.util import printer
39 from IPython.kernel.twistedutil import gatherBoth
45 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel import newserialized
46 from IPython.kernel import newserialized
41 from IPython.kernel.error import ProtocolError
47 from IPython.kernel.error import ProtocolError
42 from IPython.kernel import controllerservice
48 from IPython.kernel import controllerservice
43 from IPython.kernel.controllerservice import IControllerBase
49 from IPython.kernel.controllerservice import IControllerBase
44 from IPython.kernel.engineservice import \
50 from IPython.kernel.engineservice import \
45 IEngineBase, \
51 IEngineBase, \
46 IEngineQueued, \
52 IEngineQueued, \
47 EngineService, \
53 EngineService, \
48 StrictDict
54 StrictDict
49 from IPython.kernel.pickleutil import \
55 from IPython.kernel.pickleutil import \
50 can, \
56 can, \
51 canDict, \
57 canDict, \
52 canSequence, \
58 canSequence, \
53 uncan, \
59 uncan, \
54 uncanDict, \
60 uncanDict, \
55 uncanSequence
61 uncanSequence
56
62
57
63
58 #-------------------------------------------------------------------------------
64 #-------------------------------------------------------------------------------
59 # The client (Engine) side of things
65 # The client (Engine) side of things
60 #-------------------------------------------------------------------------------
66 #-------------------------------------------------------------------------------
61
67
62 # Expose a FC interface to the EngineService
68 # Expose a FC interface to the EngineService
63
69
64 class IFCEngine(Interface):
70 class IFCEngine(Interface):
65 """An interface that exposes an EngineService over Foolscap.
71 """An interface that exposes an EngineService over Foolscap.
66
72
67 The methods in this interface are similar to those from IEngine,
73 The methods in this interface are similar to those from IEngine,
68 but their arguments and return values slightly different to reflect
74 but their arguments and return values slightly different to reflect
69 that FC cannot send arbitrary objects. We handle this by pickling/
75 that FC cannot send arbitrary objects. We handle this by pickling/
70 unpickling that the two endpoints.
76 unpickling that the two endpoints.
71
77
72 If a remote or local exception is raised, the appropriate Failure
78 If a remote or local exception is raised, the appropriate Failure
73 will be returned instead.
79 will be returned instead.
74 """
80 """
75 pass
81 pass
76
82
77
83
78 class FCEngineReferenceFromService(Referenceable, object):
84 class FCEngineReferenceFromService(Referenceable, object):
79 """Adapt an `IEngineBase` to an `IFCEngine` implementer.
85 """Adapt an `IEngineBase` to an `IFCEngine` implementer.
80
86
81 This exposes an `IEngineBase` to foolscap by adapting it to a
87 This exposes an `IEngineBase` to foolscap by adapting it to a
82 `foolscap.Referenceable`.
88 `foolscap.Referenceable`.
83
89
84 See the documentation of the `IEngineBase` methods for more details.
90 See the documentation of the `IEngineBase` methods for more details.
85 """
91 """
86
92
87 implements(IFCEngine)
93 implements(IFCEngine)
88
94
89 def __init__(self, service):
95 def __init__(self, service):
90 assert IEngineBase.providedBy(service), \
96 assert IEngineBase.providedBy(service), \
91 "IEngineBase is not provided by" + repr(service)
97 "IEngineBase is not provided by" + repr(service)
92 self.service = service
98 self.service = service
93 self.collectors = {}
99 self.collectors = {}
94
100
95 def remote_get_id(self):
101 def remote_get_id(self):
96 return self.service.id
102 return self.service.id
97
103
98 def remote_set_id(self, id):
104 def remote_set_id(self, id):
99 self.service.id = id
105 self.service.id = id
100
106
101 def _checkProperties(self, result):
107 def _checkProperties(self, result):
102 dosync = self.service.properties.modified
108 dosync = self.service.properties.modified
103 self.service.properties.modified = False
109 self.service.properties.modified = False
104 return (dosync and pickle.dumps(self.service.properties, 2)), result
110 return (dosync and pickle.dumps(self.service.properties, 2)), result
105
111
106 def remote_execute(self, lines):
112 def remote_execute(self, lines):
107 d = self.service.execute(lines)
113 d = self.service.execute(lines)
108 d.addErrback(packageFailure)
114 d.addErrback(packageFailure)
109 d.addCallback(self._checkProperties)
115 d.addCallback(self._checkProperties)
110 d.addErrback(packageFailure)
116 d.addErrback(packageFailure)
111 #d.addCallback(lambda r: log.msg("Got result: " + str(r)))
117 #d.addCallback(lambda r: log.msg("Got result: " + str(r)))
112 return d
118 return d
113
119
114 #---------------------------------------------------------------------------
120 #---------------------------------------------------------------------------
115 # Old version of push
121 # Old version of push
116 #---------------------------------------------------------------------------
122 #---------------------------------------------------------------------------
117
123
118 def remote_push(self, pNamespace):
124 def remote_push(self, pNamespace):
119 try:
125 try:
120 namespace = pickle.loads(pNamespace)
126 namespace = pickle.loads(pNamespace)
121 except:
127 except:
122 return defer.fail(failure.Failure()).addErrback(packageFailure)
128 return defer.fail(failure.Failure()).addErrback(packageFailure)
123 else:
129 else:
124 return self.service.push(namespace).addErrback(packageFailure)
130 return self.service.push(namespace).addErrback(packageFailure)
125
131
126 #---------------------------------------------------------------------------
132 #---------------------------------------------------------------------------
127 # pull
133 # pull
128 #---------------------------------------------------------------------------
134 #---------------------------------------------------------------------------
129
135
130 def remote_pull(self, keys):
136 def remote_pull(self, keys):
131 d = self.service.pull(keys)
137 d = self.service.pull(keys)
132 d.addCallback(pickle.dumps, 2)
138 d.addCallback(pickle.dumps, 2)
133 d.addErrback(packageFailure)
139 d.addErrback(packageFailure)
134 return d
140 return d
135
141
136 #---------------------------------------------------------------------------
142 #---------------------------------------------------------------------------
137 # push/pullFuction
143 # push/pullFuction
138 #---------------------------------------------------------------------------
144 #---------------------------------------------------------------------------
139
145
140 def remote_push_function(self, pNamespace):
146 def remote_push_function(self, pNamespace):
141 try:
147 try:
142 namespace = pickle.loads(pNamespace)
148 namespace = pickle.loads(pNamespace)
143 except:
149 except:
144 return defer.fail(failure.Failure()).addErrback(packageFailure)
150 return defer.fail(failure.Failure()).addErrback(packageFailure)
145 else:
151 else:
146 # The usage of globals() here is an attempt to bind any pickled functions
152 # The usage of globals() here is an attempt to bind any pickled functions
147 # to the globals of this module. What we really want is to have it bound
153 # to the globals of this module. What we really want is to have it bound
148 # to the globals of the callers module. This will require walking the
154 # to the globals of the callers module. This will require walking the
149 # stack. BG 10/3/07.
155 # stack. BG 10/3/07.
150 namespace = uncanDict(namespace, globals())
156 namespace = uncanDict(namespace, globals())
151 return self.service.push_function(namespace).addErrback(packageFailure)
157 return self.service.push_function(namespace).addErrback(packageFailure)
152
158
153 def remote_pull_function(self, keys):
159 def remote_pull_function(self, keys):
154 d = self.service.pull_function(keys)
160 d = self.service.pull_function(keys)
155 if len(keys)>1:
161 if len(keys)>1:
156 d.addCallback(canSequence)
162 d.addCallback(canSequence)
157 elif len(keys)==1:
163 elif len(keys)==1:
158 d.addCallback(can)
164 d.addCallback(can)
159 d.addCallback(pickle.dumps, 2)
165 d.addCallback(pickle.dumps, 2)
160 d.addErrback(packageFailure)
166 d.addErrback(packageFailure)
161 return d
167 return d
162
168
163 #---------------------------------------------------------------------------
169 #---------------------------------------------------------------------------
164 # Other methods
170 # Other methods
165 #---------------------------------------------------------------------------
171 #---------------------------------------------------------------------------
166
172
167 def remote_get_result(self, i=None):
173 def remote_get_result(self, i=None):
168 return self.service.get_result(i).addErrback(packageFailure)
174 return self.service.get_result(i).addErrback(packageFailure)
169
175
170 def remote_reset(self):
176 def remote_reset(self):
171 return self.service.reset().addErrback(packageFailure)
177 return self.service.reset().addErrback(packageFailure)
172
178
173 def remote_kill(self):
179 def remote_kill(self):
174 return self.service.kill().addErrback(packageFailure)
180 return self.service.kill().addErrback(packageFailure)
175
181
176 def remote_keys(self):
182 def remote_keys(self):
177 return self.service.keys().addErrback(packageFailure)
183 return self.service.keys().addErrback(packageFailure)
178
184
179 #---------------------------------------------------------------------------
185 #---------------------------------------------------------------------------
180 # push/pull_serialized
186 # push/pull_serialized
181 #---------------------------------------------------------------------------
187 #---------------------------------------------------------------------------
182
188
183 def remote_push_serialized(self, pNamespace):
189 def remote_push_serialized(self, pNamespace):
184 try:
190 try:
185 namespace = pickle.loads(pNamespace)
191 namespace = pickle.loads(pNamespace)
186 except:
192 except:
187 return defer.fail(failure.Failure()).addErrback(packageFailure)
193 return defer.fail(failure.Failure()).addErrback(packageFailure)
188 else:
194 else:
189 d = self.service.push_serialized(namespace)
195 d = self.service.push_serialized(namespace)
190 return d.addErrback(packageFailure)
196 return d.addErrback(packageFailure)
191
197
192 def remote_pull_serialized(self, keys):
198 def remote_pull_serialized(self, keys):
193 d = self.service.pull_serialized(keys)
199 d = self.service.pull_serialized(keys)
194 d.addCallback(pickle.dumps, 2)
200 d.addCallback(pickle.dumps, 2)
195 d.addErrback(packageFailure)
201 d.addErrback(packageFailure)
196 return d
202 return d
197
203
198 #---------------------------------------------------------------------------
204 #---------------------------------------------------------------------------
199 # Properties interface
205 # Properties interface
200 #---------------------------------------------------------------------------
206 #---------------------------------------------------------------------------
201
207
202 def remote_set_properties(self, pNamespace):
208 def remote_set_properties(self, pNamespace):
203 try:
209 try:
204 namespace = pickle.loads(pNamespace)
210 namespace = pickle.loads(pNamespace)
205 except:
211 except:
206 return defer.fail(failure.Failure()).addErrback(packageFailure)
212 return defer.fail(failure.Failure()).addErrback(packageFailure)
207 else:
213 else:
208 return self.service.set_properties(namespace).addErrback(packageFailure)
214 return self.service.set_properties(namespace).addErrback(packageFailure)
209
215
210 def remote_get_properties(self, keys=None):
216 def remote_get_properties(self, keys=None):
211 d = self.service.get_properties(keys)
217 d = self.service.get_properties(keys)
212 d.addCallback(pickle.dumps, 2)
218 d.addCallback(pickle.dumps, 2)
213 d.addErrback(packageFailure)
219 d.addErrback(packageFailure)
214 return d
220 return d
215
221
216 def remote_has_properties(self, keys):
222 def remote_has_properties(self, keys):
217 d = self.service.has_properties(keys)
223 d = self.service.has_properties(keys)
218 d.addCallback(pickle.dumps, 2)
224 d.addCallback(pickle.dumps, 2)
219 d.addErrback(packageFailure)
225 d.addErrback(packageFailure)
220 return d
226 return d
221
227
222 def remote_del_properties(self, keys):
228 def remote_del_properties(self, keys):
223 d = self.service.del_properties(keys)
229 d = self.service.del_properties(keys)
224 d.addErrback(packageFailure)
230 d.addErrback(packageFailure)
225 return d
231 return d
226
232
227 def remote_clear_properties(self):
233 def remote_clear_properties(self):
228 d = self.service.clear_properties()
234 d = self.service.clear_properties()
229 d.addErrback(packageFailure)
235 d.addErrback(packageFailure)
230 return d
236 return d
231
237
232
238
233 components.registerAdapter(FCEngineReferenceFromService,
239 components.registerAdapter(FCEngineReferenceFromService,
234 IEngineBase,
240 IEngineBase,
235 IFCEngine)
241 IFCEngine)
236
242
237
243
238 #-------------------------------------------------------------------------------
244 #-------------------------------------------------------------------------------
239 # Now the server (Controller) side of things
245 # Now the server (Controller) side of things
240 #-------------------------------------------------------------------------------
246 #-------------------------------------------------------------------------------
241
247
242 class EngineFromReference(object):
248 class EngineFromReference(object):
243 """Adapt a `RemoteReference` to an `IEngineBase` implementing object.
249 """Adapt a `RemoteReference` to an `IEngineBase` implementing object.
244
250
245 When an engine connects to a controller, it calls the `register_engine`
251 When an engine connects to a controller, it calls the `register_engine`
246 method of the controller and passes the controller a `RemoteReference` to
252 method of the controller and passes the controller a `RemoteReference` to
247 itself. This class is used to adapt this `RemoteReference` to an object
253 itself. This class is used to adapt this `RemoteReference` to an object
248 that implements the full `IEngineBase` interface.
254 that implements the full `IEngineBase` interface.
249
255
250 See the documentation of `IEngineBase` for details on the methods.
256 See the documentation of `IEngineBase` for details on the methods.
251 """
257 """
252
258
253 implements(IEngineBase)
259 implements(IEngineBase)
254
260
255 def __init__(self, reference):
261 def __init__(self, reference):
256 self.reference = reference
262 self.reference = reference
257 self._id = None
263 self._id = None
258 self._properties = StrictDict()
264 self._properties = StrictDict()
259 self.currentCommand = None
265 self.currentCommand = None
260
266
261 def callRemote(self, *args, **kwargs):
267 def callRemote(self, *args, **kwargs):
262 try:
268 try:
263 return self.reference.callRemote(*args, **kwargs)
269 return self.reference.callRemote(*args, **kwargs)
264 except DeadReferenceError:
270 except DeadReferenceError:
265 self.notifier()
271 self.notifier()
266 self.stopNotifying(self.notifier)
272 self.stopNotifying(self.notifier)
267 return defer.fail()
273 return defer.fail()
268
274
269 def get_id(self):
275 def get_id(self):
270 """Return the Engines id."""
276 """Return the Engines id."""
271 return self._id
277 return self._id
272
278
273 def set_id(self, id):
279 def set_id(self, id):
274 """Set the Engines id."""
280 """Set the Engines id."""
275 self._id = id
281 self._id = id
276 return self.callRemote('set_id', id)
282 return self.callRemote('set_id', id)
277
283
278 id = property(get_id, set_id)
284 id = property(get_id, set_id)
279
285
280 def syncProperties(self, r):
286 def syncProperties(self, r):
281 try:
287 try:
282 psync, result = r
288 psync, result = r
283 except (ValueError, TypeError):
289 except (ValueError, TypeError):
284 return r
290 return r
285 else:
291 else:
286 if psync:
292 if psync:
287 log.msg("sync properties")
293 log.msg("sync properties")
288 pick = self.checkReturnForFailure(psync)
294 pick = self.checkReturnForFailure(psync)
289 if isinstance(pick, failure.Failure):
295 if isinstance(pick, failure.Failure):
290 self.properties = pick
296 self.properties = pick
291 return pick
297 return pick
292 else:
298 else:
293 self.properties = pickle.loads(pick)
299 self.properties = pickle.loads(pick)
294 return result
300 return result
295
301
296 def _set_properties(self, dikt):
302 def _set_properties(self, dikt):
297 self._properties.clear()
303 self._properties.clear()
298 self._properties.update(dikt)
304 self._properties.update(dikt)
299
305
300 def _get_properties(self):
306 def _get_properties(self):
301 if isinstance(self._properties, failure.Failure):
307 if isinstance(self._properties, failure.Failure):
302 self._properties.raiseException()
308 self._properties.raiseException()
303 return self._properties
309 return self._properties
304
310
305 properties = property(_get_properties, _set_properties)
311 properties = property(_get_properties, _set_properties)
306
312
307 #---------------------------------------------------------------------------
313 #---------------------------------------------------------------------------
308 # Methods from IEngine
314 # Methods from IEngine
309 #---------------------------------------------------------------------------
315 #---------------------------------------------------------------------------
310
316
311 #---------------------------------------------------------------------------
317 #---------------------------------------------------------------------------
312 # execute
318 # execute
313 #---------------------------------------------------------------------------
319 #---------------------------------------------------------------------------
314
320
315 def execute(self, lines):
321 def execute(self, lines):
316 # self._needProperties = True
322 # self._needProperties = True
317 d = self.callRemote('execute', lines)
323 d = self.callRemote('execute', lines)
318 d.addCallback(self.syncProperties)
324 d.addCallback(self.syncProperties)
319 return d.addCallback(self.checkReturnForFailure)
325 return d.addCallback(self.checkReturnForFailure)
320
326
321 #---------------------------------------------------------------------------
327 #---------------------------------------------------------------------------
322 # push
328 # push
323 #---------------------------------------------------------------------------
329 #---------------------------------------------------------------------------
324
330
325 def push(self, namespace):
331 def push(self, namespace):
326 try:
332 try:
327 package = pickle.dumps(namespace, 2)
333 package = pickle.dumps(namespace, 2)
328 except:
334 except:
329 return defer.fail(failure.Failure())
335 return defer.fail(failure.Failure())
330 else:
336 else:
331 if isinstance(package, failure.Failure):
337 if isinstance(package, failure.Failure):
332 return defer.fail(package)
338 return defer.fail(package)
333 else:
339 else:
334 d = self.callRemote('push', package)
340 d = self.callRemote('push', package)
335 return d.addCallback(self.checkReturnForFailure)
341 return d.addCallback(self.checkReturnForFailure)
336
342
337 #---------------------------------------------------------------------------
343 #---------------------------------------------------------------------------
338 # pull
344 # pull
339 #---------------------------------------------------------------------------
345 #---------------------------------------------------------------------------
340
346
341 def pull(self, keys):
347 def pull(self, keys):
342 d = self.callRemote('pull', keys)
348 d = self.callRemote('pull', keys)
343 d.addCallback(self.checkReturnForFailure)
349 d.addCallback(self.checkReturnForFailure)
344 d.addCallback(pickle.loads)
350 d.addCallback(pickle.loads)
345 return d
351 return d
346
352
347 #---------------------------------------------------------------------------
353 #---------------------------------------------------------------------------
348 # push/pull_function
354 # push/pull_function
349 #---------------------------------------------------------------------------
355 #---------------------------------------------------------------------------
350
356
351 def push_function(self, namespace):
357 def push_function(self, namespace):
352 try:
358 try:
353 package = pickle.dumps(canDict(namespace), 2)
359 package = pickle.dumps(canDict(namespace), 2)
354 except:
360 except:
355 return defer.fail(failure.Failure())
361 return defer.fail(failure.Failure())
356 else:
362 else:
357 if isinstance(package, failure.Failure):
363 if isinstance(package, failure.Failure):
358 return defer.fail(package)
364 return defer.fail(package)
359 else:
365 else:
360 d = self.callRemote('push_function', package)
366 d = self.callRemote('push_function', package)
361 return d.addCallback(self.checkReturnForFailure)
367 return d.addCallback(self.checkReturnForFailure)
362
368
363 def pull_function(self, keys):
369 def pull_function(self, keys):
364 d = self.callRemote('pull_function', keys)
370 d = self.callRemote('pull_function', keys)
365 d.addCallback(self.checkReturnForFailure)
371 d.addCallback(self.checkReturnForFailure)
366 d.addCallback(pickle.loads)
372 d.addCallback(pickle.loads)
367 # The usage of globals() here is an attempt to bind any pickled functions
373 # The usage of globals() here is an attempt to bind any pickled functions
368 # to the globals of this module. What we really want is to have it bound
374 # to the globals of this module. What we really want is to have it bound
369 # to the globals of the callers module. This will require walking the
375 # to the globals of the callers module. This will require walking the
370 # stack. BG 10/3/07.
376 # stack. BG 10/3/07.
371 if len(keys)==1:
377 if len(keys)==1:
372 d.addCallback(uncan, globals())
378 d.addCallback(uncan, globals())
373 elif len(keys)>1:
379 elif len(keys)>1:
374 d.addCallback(uncanSequence, globals())
380 d.addCallback(uncanSequence, globals())
375 return d
381 return d
376
382
377 #---------------------------------------------------------------------------
383 #---------------------------------------------------------------------------
378 # Other methods
384 # Other methods
379 #---------------------------------------------------------------------------
385 #---------------------------------------------------------------------------
380
386
381 def get_result(self, i=None):
387 def get_result(self, i=None):
382 return self.callRemote('get_result', i).addCallback(self.checkReturnForFailure)
388 return self.callRemote('get_result', i).addCallback(self.checkReturnForFailure)
383
389
384 def reset(self):
390 def reset(self):
385 self._refreshProperties = True
391 self._refreshProperties = True
386 d = self.callRemote('reset')
392 d = self.callRemote('reset')
387 d.addCallback(self.syncProperties)
393 d.addCallback(self.syncProperties)
388 return d.addCallback(self.checkReturnForFailure)
394 return d.addCallback(self.checkReturnForFailure)
389
395
390 def kill(self):
396 def kill(self):
391 #this will raise pb.PBConnectionLost on success
397 #this will raise pb.PBConnectionLost on success
392 d = self.callRemote('kill')
398 d = self.callRemote('kill')
393 d.addCallback(self.syncProperties)
399 d.addCallback(self.syncProperties)
394 d.addCallback(self.checkReturnForFailure)
400 d.addCallback(self.checkReturnForFailure)
395 d.addErrback(self.killBack)
401 d.addErrback(self.killBack)
396 return d
402 return d
397
403
398 def killBack(self, f):
404 def killBack(self, f):
399 log.msg('filling engine: %s' % f)
405 log.msg('filling engine: %s' % f)
400 return None
406 return None
401
407
402 def keys(self):
408 def keys(self):
403 return self.callRemote('keys').addCallback(self.checkReturnForFailure)
409 return self.callRemote('keys').addCallback(self.checkReturnForFailure)
404
410
405 #---------------------------------------------------------------------------
411 #---------------------------------------------------------------------------
406 # Properties methods
412 # Properties methods
407 #---------------------------------------------------------------------------
413 #---------------------------------------------------------------------------
408
414
409 def set_properties(self, properties):
415 def set_properties(self, properties):
410 try:
416 try:
411 package = pickle.dumps(properties, 2)
417 package = pickle.dumps(properties, 2)
412 except:
418 except:
413 return defer.fail(failure.Failure())
419 return defer.fail(failure.Failure())
414 else:
420 else:
415 if isinstance(package, failure.Failure):
421 if isinstance(package, failure.Failure):
416 return defer.fail(package)
422 return defer.fail(package)
417 else:
423 else:
418 d = self.callRemote('set_properties', package)
424 d = self.callRemote('set_properties', package)
419 return d.addCallback(self.checkReturnForFailure)
425 return d.addCallback(self.checkReturnForFailure)
420 return d
426 return d
421
427
422 def get_properties(self, keys=None):
428 def get_properties(self, keys=None):
423 d = self.callRemote('get_properties', keys)
429 d = self.callRemote('get_properties', keys)
424 d.addCallback(self.checkReturnForFailure)
430 d.addCallback(self.checkReturnForFailure)
425 d.addCallback(pickle.loads)
431 d.addCallback(pickle.loads)
426 return d
432 return d
427
433
428 def has_properties(self, keys):
434 def has_properties(self, keys):
429 d = self.callRemote('has_properties', keys)
435 d = self.callRemote('has_properties', keys)
430 d.addCallback(self.checkReturnForFailure)
436 d.addCallback(self.checkReturnForFailure)
431 d.addCallback(pickle.loads)
437 d.addCallback(pickle.loads)
432 return d
438 return d
433
439
434 def del_properties(self, keys):
440 def del_properties(self, keys):
435 d = self.callRemote('del_properties', keys)
441 d = self.callRemote('del_properties', keys)
436 d.addCallback(self.checkReturnForFailure)
442 d.addCallback(self.checkReturnForFailure)
437 # d.addCallback(pickle.loads)
443 # d.addCallback(pickle.loads)
438 return d
444 return d
439
445
440 def clear_properties(self):
446 def clear_properties(self):
441 d = self.callRemote('clear_properties')
447 d = self.callRemote('clear_properties')
442 d.addCallback(self.checkReturnForFailure)
448 d.addCallback(self.checkReturnForFailure)
443 return d
449 return d
444
450
445 #---------------------------------------------------------------------------
451 #---------------------------------------------------------------------------
446 # push/pull_serialized
452 # push/pull_serialized
447 #---------------------------------------------------------------------------
453 #---------------------------------------------------------------------------
448
454
449 def push_serialized(self, namespace):
455 def push_serialized(self, namespace):
450 """Older version of pushSerialize."""
456 """Older version of pushSerialize."""
451 try:
457 try:
452 package = pickle.dumps(namespace, 2)
458 package = pickle.dumps(namespace, 2)
453 except:
459 except:
454 return defer.fail(failure.Failure())
460 return defer.fail(failure.Failure())
455 else:
461 else:
456 if isinstance(package, failure.Failure):
462 if isinstance(package, failure.Failure):
457 return defer.fail(package)
463 return defer.fail(package)
458 else:
464 else:
459 d = self.callRemote('push_serialized', package)
465 d = self.callRemote('push_serialized', package)
460 return d.addCallback(self.checkReturnForFailure)
466 return d.addCallback(self.checkReturnForFailure)
461
467
462 def pull_serialized(self, keys):
468 def pull_serialized(self, keys):
463 d = self.callRemote('pull_serialized', keys)
469 d = self.callRemote('pull_serialized', keys)
464 d.addCallback(self.checkReturnForFailure)
470 d.addCallback(self.checkReturnForFailure)
465 d.addCallback(pickle.loads)
471 d.addCallback(pickle.loads)
466 return d
472 return d
467
473
468 #---------------------------------------------------------------------------
474 #---------------------------------------------------------------------------
469 # Misc
475 # Misc
470 #---------------------------------------------------------------------------
476 #---------------------------------------------------------------------------
471
477
472 def checkReturnForFailure(self, r):
478 def checkReturnForFailure(self, r):
473 """See if a returned value is a pickled Failure object.
479 """See if a returned value is a pickled Failure object.
474
480
475 To distinguish between general pickled objects and pickled Failures, the
481 To distinguish between general pickled objects and pickled Failures, the
476 other side should prepend the string FAILURE: to any pickled Failure.
482 other side should prepend the string FAILURE: to any pickled Failure.
477 """
483 """
478 return unpackageFailure(r)
484 return unpackageFailure(r)
479
485
480
486
481 components.registerAdapter(EngineFromReference,
487 components.registerAdapter(EngineFromReference,
482 RemoteReference,
488 RemoteReference,
483 IEngineBase)
489 IEngineBase)
484
490
485
491
486 #-------------------------------------------------------------------------------
492 #-------------------------------------------------------------------------------
487 # Now adapt an IControllerBase to incoming FC connections
493 # Now adapt an IControllerBase to incoming FC connections
488 #-------------------------------------------------------------------------------
494 #-------------------------------------------------------------------------------
489
495
490
496
491 class IFCControllerBase(Interface):
497 class IFCControllerBase(Interface):
492 """
498 """
493 Interface that tells how an Engine sees a Controller.
499 Interface that tells how an Engine sees a Controller.
494
500
495 In our architecture, the Controller listens for Engines to connect
501 In our architecture, the Controller listens for Engines to connect
496 and register. This interface defines that registration method as it is
502 and register. This interface defines that registration method as it is
497 exposed over the Foolscap network protocol
503 exposed over the Foolscap network protocol
498 """
504 """
499
505
500 def remote_register_engine(self, engineReference, id=None, pid=None, pproperties=None):
506 def remote_register_engine(self, engineReference, id=None, pid=None, pproperties=None):
501 """
507 """
502 Register new engine on the controller.
508 Register new engine on the controller.
503
509
504 Engines must call this upon connecting to the controller if they
510 Engines must call this upon connecting to the controller if they
505 want to do work for the controller.
511 want to do work for the controller.
506
512
507 See the documentation of `IControllerCore` for more details.
513 See the documentation of `IControllerCore` for more details.
508 """
514 """
509
515
510
516
511 class FCRemoteEngineRefFromService(Referenceable):
517 class FCRemoteEngineRefFromService(Referenceable):
512 """
518 """
513 Adapt an `IControllerBase` to an `IFCControllerBase`.
519 Adapt an `IControllerBase` to an `IFCControllerBase`.
514 """
520 """
515
521
516 implements(IFCControllerBase)
522 implements(IFCControllerBase)
517
523
518 def __init__(self, service):
524 def __init__(self, service):
519 assert IControllerBase.providedBy(service), \
525 assert IControllerBase.providedBy(service), \
520 "IControllerBase is not provided by " + repr(service)
526 "IControllerBase is not provided by " + repr(service)
521 self.service = service
527 self.service = service
522
528
523 def remote_register_engine(self, engine_reference, id=None, pid=None, pproperties=None):
529 def remote_register_engine(self, engine_reference, id=None, pid=None, pproperties=None):
524 # First adapt the engine_reference to a basic non-queued engine
530 # First adapt the engine_reference to a basic non-queued engine
525 engine = IEngineBase(engine_reference)
531 engine = IEngineBase(engine_reference)
526 if pproperties:
532 if pproperties:
527 engine.properties = pickle.loads(pproperties)
533 engine.properties = pickle.loads(pproperties)
528 # Make it an IQueuedEngine before registration
534 # Make it an IQueuedEngine before registration
529 remote_engine = IEngineQueued(engine)
535 remote_engine = IEngineQueued(engine)
530 # Get the ip/port of the remote side
536 # Get the ip/port of the remote side
531 peer_address = engine_reference.tracker.broker.transport.getPeer()
537 peer_address = engine_reference.tracker.broker.transport.getPeer()
532 ip = peer_address.host
538 ip = peer_address.host
533 port = peer_address.port
539 port = peer_address.port
534 reg_dict = self.service.register_engine(remote_engine, id, ip, port, pid)
540 reg_dict = self.service.register_engine(remote_engine, id, ip, port, pid)
535 # Now setup callback for disconnect and unregistering the engine
541 # Now setup callback for disconnect and unregistering the engine
536 def notify(*args):
542 def notify(*args):
537 return self.service.unregister_engine(reg_dict['id'])
543 return self.service.unregister_engine(reg_dict['id'])
538 engine_reference.tracker.broker.notifyOnDisconnect(notify)
544 engine_reference.tracker.broker.notifyOnDisconnect(notify)
539
545
540 engine.notifier = notify
546 engine.notifier = notify
541 engine.stopNotifying = engine_reference.tracker.broker.dontNotifyOnDisconnect
547 engine.stopNotifying = engine_reference.tracker.broker.dontNotifyOnDisconnect
542
548
543 return reg_dict
549 return reg_dict
544
550
545
551
546 components.registerAdapter(FCRemoteEngineRefFromService,
552 components.registerAdapter(FCRemoteEngineRefFromService,
547 IControllerBase,
553 IControllerBase,
548 IFCControllerBase)
554 IFCControllerBase)
@@ -1,69 +1,74 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Foolscap related utilities."""
3 """Foolscap related utilities."""
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 import os
18 import os
19
19
20 from foolscap import Tub, UnauthenticatedTub
20 try:
21 # This is preferred in foolscap v > 0.4.3
22 from foolscap.api import Tub, UnauthenticatedTub
23 except ImportError:
24 # Fallback for older versions
25 from foolscap import Tub, UnauthenticatedTub
21
26
22 def check_furl_file_security(furl_file, secure):
27 def check_furl_file_security(furl_file, secure):
23 """Remove the old furl_file if changing security modes."""
28 """Remove the old furl_file if changing security modes."""
24
29
25 if os.path.isfile(furl_file):
30 if os.path.isfile(furl_file):
26 f = open(furl_file, 'r')
31 f = open(furl_file, 'r')
27 oldfurl = f.read().strip()
32 oldfurl = f.read().strip()
28 f.close()
33 f.close()
29 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
34 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
30 os.remove(furl_file)
35 os.remove(furl_file)
31
36
32 def is_secure(furl):
37 def is_secure(furl):
33 if is_valid(furl):
38 if is_valid(furl):
34 if furl.startswith("pb://"):
39 if furl.startswith("pb://"):
35 return True
40 return True
36 elif furl.startswith("pbu://"):
41 elif furl.startswith("pbu://"):
37 return False
42 return False
38 else:
43 else:
39 raise ValueError("invalid furl: %s" % furl)
44 raise ValueError("invalid furl: %s" % furl)
40
45
41 def is_valid(furl):
46 def is_valid(furl):
42 if isinstance(furl, str):
47 if isinstance(furl, str):
43 if furl.startswith("pb://") or furl.startswith("pbu://"):
48 if furl.startswith("pb://") or furl.startswith("pbu://"):
44 return True
49 return True
45 else:
50 else:
46 return False
51 return False
47
52
48 def find_furl(furl_or_file):
53 def find_furl(furl_or_file):
49 if isinstance(furl_or_file, str):
54 if isinstance(furl_or_file, str):
50 if is_valid(furl_or_file):
55 if is_valid(furl_or_file):
51 return furl_or_file
56 return furl_or_file
52 if os.path.isfile(furl_or_file):
57 if os.path.isfile(furl_or_file):
53 furl = open(furl_or_file, 'r').read().strip()
58 furl = open(furl_or_file, 'r').read().strip()
54 if is_valid(furl):
59 if is_valid(furl):
55 return furl
60 return furl
56 raise ValueError("not a furl or a file containing a furl: %s" % furl_or_file)
61 raise ValueError("not a furl or a file containing a furl: %s" % furl_or_file)
57
62
58 # We do this so if a user doesn't have OpenSSL installed, it will try to use
63 # We do this so if a user doesn't have OpenSSL installed, it will try to use
59 # an UnauthenticatedTub. But, they will still run into problems if they
64 # an UnauthenticatedTub. But, they will still run into problems if they
60 # try to use encrypted furls.
65 # try to use encrypted furls.
61 try:
66 try:
62 import OpenSSL
67 import OpenSSL
63 except:
68 except:
64 Tub = UnauthenticatedTub
69 Tub = UnauthenticatedTub
65 have_crypto = False
70 have_crypto = False
66 else:
71 else:
67 have_crypto = True
72 have_crypto = True
68
73
69
74
@@ -1,757 +1,762 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """
3 """
4 Expose the multiengine controller over the Foolscap network protocol.
4 Expose the multiengine controller over the Foolscap network protocol.
5 """
5 """
6
6
7 __docformat__ = "restructuredtext en"
7 __docformat__ = "restructuredtext en"
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Copyright (C) 2008 The IPython Development Team
10 # Copyright (C) 2008 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-------------------------------------------------------------------------------
14 #-------------------------------------------------------------------------------
15
15
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19
19
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22
22
23 from zope.interface import Interface, implements
23 from zope.interface import Interface, implements
24 from twisted.internet import defer
24 from twisted.internet import defer
25 from twisted.python import components, failure, log
25 from twisted.python import components, failure, log
26
26
27 from foolscap import Referenceable
27 try:
28 # This is preferred in foolscap v > 0.4.3
29 from foolscap.api import Referenceable
30 except ImportError:
31 # Fallback for older versions
32 from foolscap import Referenceable
28
33
29 from IPython.kernel import error
34 from IPython.kernel import error
30 from IPython.kernel.util import printer
35 from IPython.kernel.util import printer
31 from IPython.kernel import map as Map
36 from IPython.kernel import map as Map
32 from IPython.kernel.parallelfunction import ParallelFunction
37 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import (
38 from IPython.kernel.mapper import (
34 MultiEngineMapper,
39 MultiEngineMapper,
35 IMultiEngineMapperFactory,
40 IMultiEngineMapperFactory,
36 IMapper
41 IMapper
37 )
42 )
38 from IPython.kernel.twistedutil import gatherBoth
43 from IPython.kernel.twistedutil import gatherBoth
39 from IPython.kernel.multiengine import (MultiEngine,
44 from IPython.kernel.multiengine import (MultiEngine,
40 IMultiEngine,
45 IMultiEngine,
41 IFullSynchronousMultiEngine,
46 IFullSynchronousMultiEngine,
42 ISynchronousMultiEngine)
47 ISynchronousMultiEngine)
43 from IPython.kernel.multiengineclient import wrapResultList
48 from IPython.kernel.multiengineclient import wrapResultList
44 from IPython.kernel.pendingdeferred import PendingDeferredManager
49 from IPython.kernel.pendingdeferred import PendingDeferredManager
45 from IPython.kernel.pickleutil import (can, canDict,
50 from IPython.kernel.pickleutil import (can, canDict,
46 canSequence, uncan, uncanDict, uncanSequence)
51 canSequence, uncan, uncanDict, uncanSequence)
47
52
48 from IPython.kernel.clientinterfaces import (
53 from IPython.kernel.clientinterfaces import (
49 IFCClientInterfaceProvider,
54 IFCClientInterfaceProvider,
50 IBlockingClientAdaptor
55 IBlockingClientAdaptor
51 )
56 )
52
57
53 # Needed to access the true globals from __main__.__dict__
58 # Needed to access the true globals from __main__.__dict__
54 import __main__
59 import __main__
55
60
56 #-------------------------------------------------------------------------------
61 #-------------------------------------------------------------------------------
57 # The Controller side of things
62 # The Controller side of things
58 #-------------------------------------------------------------------------------
63 #-------------------------------------------------------------------------------
59
64
60 def packageResult(wrappedMethod):
65 def packageResult(wrappedMethod):
61
66
62 def wrappedPackageResult(self, *args, **kwargs):
67 def wrappedPackageResult(self, *args, **kwargs):
63 d = wrappedMethod(self, *args, **kwargs)
68 d = wrappedMethod(self, *args, **kwargs)
64 d.addCallback(self.packageSuccess)
69 d.addCallback(self.packageSuccess)
65 d.addErrback(self.packageFailure)
70 d.addErrback(self.packageFailure)
66 return d
71 return d
67 return wrappedPackageResult
72 return wrappedPackageResult
68
73
69
74
70 class IFCSynchronousMultiEngine(Interface):
75 class IFCSynchronousMultiEngine(Interface):
71 """Foolscap interface to `ISynchronousMultiEngine`.
76 """Foolscap interface to `ISynchronousMultiEngine`.
72
77
73 The methods in this interface are similar to those of
78 The methods in this interface are similar to those of
74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
79 `ISynchronousMultiEngine`, but their arguments and return values are pickled
75 if they are not already simple Python types that can be send over XML-RPC.
80 if they are not already simple Python types that can be send over XML-RPC.
76
81
77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
82 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
78 documentation about the methods.
83 documentation about the methods.
79
84
80 Most methods in this interface act like the `ISynchronousMultiEngine`
85 Most methods in this interface act like the `ISynchronousMultiEngine`
81 versions and can be called in blocking or non-blocking mode.
86 versions and can be called in blocking or non-blocking mode.
82 """
87 """
83 pass
88 pass
84
89
85
90
86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
91 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
92 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
88 """
93 """
89
94
90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
95 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
91
96
92 addSlash = True
97 addSlash = True
93
98
94 def __init__(self, multiengine):
99 def __init__(self, multiengine):
95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
100 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
96 # it. This allow this class to do two adaptation steps.
101 # it. This allow this class to do two adaptation steps.
97 self.smultiengine = ISynchronousMultiEngine(multiengine)
102 self.smultiengine = ISynchronousMultiEngine(multiengine)
98 self._deferredIDCallbacks = {}
103 self._deferredIDCallbacks = {}
99
104
100 #---------------------------------------------------------------------------
105 #---------------------------------------------------------------------------
101 # Non interface methods
106 # Non interface methods
102 #---------------------------------------------------------------------------
107 #---------------------------------------------------------------------------
103
108
104 def packageFailure(self, f):
109 def packageFailure(self, f):
105 f.cleanFailure()
110 f.cleanFailure()
106 return self.packageSuccess(f)
111 return self.packageSuccess(f)
107
112
108 def packageSuccess(self, obj):
113 def packageSuccess(self, obj):
109 serial = pickle.dumps(obj, 2)
114 serial = pickle.dumps(obj, 2)
110 return serial
115 return serial
111
116
112 #---------------------------------------------------------------------------
117 #---------------------------------------------------------------------------
113 # Things related to PendingDeferredManager
118 # Things related to PendingDeferredManager
114 #---------------------------------------------------------------------------
119 #---------------------------------------------------------------------------
115
120
116 @packageResult
121 @packageResult
117 def remote_get_pending_deferred(self, deferredID, block):
122 def remote_get_pending_deferred(self, deferredID, block):
118 d = self.smultiengine.get_pending_deferred(deferredID, block)
123 d = self.smultiengine.get_pending_deferred(deferredID, block)
119 try:
124 try:
120 callback = self._deferredIDCallbacks.pop(deferredID)
125 callback = self._deferredIDCallbacks.pop(deferredID)
121 except KeyError:
126 except KeyError:
122 callback = None
127 callback = None
123 if callback is not None:
128 if callback is not None:
124 d.addCallback(callback[0], *callback[1], **callback[2])
129 d.addCallback(callback[0], *callback[1], **callback[2])
125 return d
130 return d
126
131
127 @packageResult
132 @packageResult
128 def remote_clear_pending_deferreds(self):
133 def remote_clear_pending_deferreds(self):
129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
134 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
130
135
131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
136 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
137 self._deferredIDCallbacks[did] = (callback, args, kwargs)
133 return did
138 return did
134
139
135 #---------------------------------------------------------------------------
140 #---------------------------------------------------------------------------
136 # IEngineMultiplexer related methods
141 # IEngineMultiplexer related methods
137 #---------------------------------------------------------------------------
142 #---------------------------------------------------------------------------
138
143
139 @packageResult
144 @packageResult
140 def remote_execute(self, lines, targets, block):
145 def remote_execute(self, lines, targets, block):
141 return self.smultiengine.execute(lines, targets=targets, block=block)
146 return self.smultiengine.execute(lines, targets=targets, block=block)
142
147
143 @packageResult
148 @packageResult
144 def remote_push(self, binaryNS, targets, block):
149 def remote_push(self, binaryNS, targets, block):
145 try:
150 try:
146 namespace = pickle.loads(binaryNS)
151 namespace = pickle.loads(binaryNS)
147 except:
152 except:
148 d = defer.fail(failure.Failure())
153 d = defer.fail(failure.Failure())
149 else:
154 else:
150 d = self.smultiengine.push(namespace, targets=targets, block=block)
155 d = self.smultiengine.push(namespace, targets=targets, block=block)
151 return d
156 return d
152
157
153 @packageResult
158 @packageResult
154 def remote_pull(self, keys, targets, block):
159 def remote_pull(self, keys, targets, block):
155 d = self.smultiengine.pull(keys, targets=targets, block=block)
160 d = self.smultiengine.pull(keys, targets=targets, block=block)
156 return d
161 return d
157
162
158 @packageResult
163 @packageResult
159 def remote_push_function(self, binaryNS, targets, block):
164 def remote_push_function(self, binaryNS, targets, block):
160 try:
165 try:
161 namespace = pickle.loads(binaryNS)
166 namespace = pickle.loads(binaryNS)
162 except:
167 except:
163 d = defer.fail(failure.Failure())
168 d = defer.fail(failure.Failure())
164 else:
169 else:
165 namespace = uncanDict(namespace)
170 namespace = uncanDict(namespace)
166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
171 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
167 return d
172 return d
168
173
169 def _canMultipleKeys(self, result):
174 def _canMultipleKeys(self, result):
170 return [canSequence(r) for r in result]
175 return [canSequence(r) for r in result]
171
176
172 @packageResult
177 @packageResult
173 def remote_pull_function(self, keys, targets, block):
178 def remote_pull_function(self, keys, targets, block):
174 def can_functions(r, keys):
179 def can_functions(r, keys):
175 if len(keys)==1 or isinstance(keys, str):
180 if len(keys)==1 or isinstance(keys, str):
176 result = canSequence(r)
181 result = canSequence(r)
177 elif len(keys)>1:
182 elif len(keys)>1:
178 result = [canSequence(s) for s in r]
183 result = [canSequence(s) for s in r]
179 return result
184 return result
180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
185 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
181 if block:
186 if block:
182 d.addCallback(can_functions, keys)
187 d.addCallback(can_functions, keys)
183 else:
188 else:
184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
189 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
185 return d
190 return d
186
191
187 @packageResult
192 @packageResult
188 def remote_push_serialized(self, binaryNS, targets, block):
193 def remote_push_serialized(self, binaryNS, targets, block):
189 try:
194 try:
190 namespace = pickle.loads(binaryNS)
195 namespace = pickle.loads(binaryNS)
191 except:
196 except:
192 d = defer.fail(failure.Failure())
197 d = defer.fail(failure.Failure())
193 else:
198 else:
194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
199 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
195 return d
200 return d
196
201
197 @packageResult
202 @packageResult
198 def remote_pull_serialized(self, keys, targets, block):
203 def remote_pull_serialized(self, keys, targets, block):
199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
204 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
200 return d
205 return d
201
206
202 @packageResult
207 @packageResult
203 def remote_get_result(self, i, targets, block):
208 def remote_get_result(self, i, targets, block):
204 if i == 'None':
209 if i == 'None':
205 i = None
210 i = None
206 return self.smultiengine.get_result(i, targets=targets, block=block)
211 return self.smultiengine.get_result(i, targets=targets, block=block)
207
212
208 @packageResult
213 @packageResult
209 def remote_reset(self, targets, block):
214 def remote_reset(self, targets, block):
210 return self.smultiengine.reset(targets=targets, block=block)
215 return self.smultiengine.reset(targets=targets, block=block)
211
216
212 @packageResult
217 @packageResult
213 def remote_keys(self, targets, block):
218 def remote_keys(self, targets, block):
214 return self.smultiengine.keys(targets=targets, block=block)
219 return self.smultiengine.keys(targets=targets, block=block)
215
220
216 @packageResult
221 @packageResult
217 def remote_kill(self, controller, targets, block):
222 def remote_kill(self, controller, targets, block):
218 return self.smultiengine.kill(controller, targets=targets, block=block)
223 return self.smultiengine.kill(controller, targets=targets, block=block)
219
224
220 @packageResult
225 @packageResult
221 def remote_clear_queue(self, targets, block):
226 def remote_clear_queue(self, targets, block):
222 return self.smultiengine.clear_queue(targets=targets, block=block)
227 return self.smultiengine.clear_queue(targets=targets, block=block)
223
228
224 @packageResult
229 @packageResult
225 def remote_queue_status(self, targets, block):
230 def remote_queue_status(self, targets, block):
226 return self.smultiengine.queue_status(targets=targets, block=block)
231 return self.smultiengine.queue_status(targets=targets, block=block)
227
232
228 @packageResult
233 @packageResult
229 def remote_set_properties(self, binaryNS, targets, block):
234 def remote_set_properties(self, binaryNS, targets, block):
230 try:
235 try:
231 ns = pickle.loads(binaryNS)
236 ns = pickle.loads(binaryNS)
232 except:
237 except:
233 d = defer.fail(failure.Failure())
238 d = defer.fail(failure.Failure())
234 else:
239 else:
235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
240 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
236 return d
241 return d
237
242
238 @packageResult
243 @packageResult
239 def remote_get_properties(self, keys, targets, block):
244 def remote_get_properties(self, keys, targets, block):
240 if keys=='None':
245 if keys=='None':
241 keys=None
246 keys=None
242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
247 return self.smultiengine.get_properties(keys, targets=targets, block=block)
243
248
244 @packageResult
249 @packageResult
245 def remote_has_properties(self, keys, targets, block):
250 def remote_has_properties(self, keys, targets, block):
246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
251 return self.smultiengine.has_properties(keys, targets=targets, block=block)
247
252
248 @packageResult
253 @packageResult
249 def remote_del_properties(self, keys, targets, block):
254 def remote_del_properties(self, keys, targets, block):
250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
255 return self.smultiengine.del_properties(keys, targets=targets, block=block)
251
256
252 @packageResult
257 @packageResult
253 def remote_clear_properties(self, targets, block):
258 def remote_clear_properties(self, targets, block):
254 return self.smultiengine.clear_properties(targets=targets, block=block)
259 return self.smultiengine.clear_properties(targets=targets, block=block)
255
260
256 #---------------------------------------------------------------------------
261 #---------------------------------------------------------------------------
257 # IMultiEngine related methods
262 # IMultiEngine related methods
258 #---------------------------------------------------------------------------
263 #---------------------------------------------------------------------------
259
264
260 def remote_get_ids(self):
265 def remote_get_ids(self):
261 """Get the ids of the registered engines.
266 """Get the ids of the registered engines.
262
267
263 This method always blocks.
268 This method always blocks.
264 """
269 """
265 return self.smultiengine.get_ids()
270 return self.smultiengine.get_ids()
266
271
267 #---------------------------------------------------------------------------
272 #---------------------------------------------------------------------------
268 # IFCClientInterfaceProvider related methods
273 # IFCClientInterfaceProvider related methods
269 #---------------------------------------------------------------------------
274 #---------------------------------------------------------------------------
270
275
271 def remote_get_client_name(self):
276 def remote_get_client_name(self):
272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
277 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
273
278
274
279
275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
280 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
281 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
277 # two phase adaptation.
282 # two phase adaptation.
278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
283 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
279 IMultiEngine, IFCSynchronousMultiEngine)
284 IMultiEngine, IFCSynchronousMultiEngine)
280
285
281
286
282 #-------------------------------------------------------------------------------
287 #-------------------------------------------------------------------------------
283 # The Client side of things
288 # The Client side of things
284 #-------------------------------------------------------------------------------
289 #-------------------------------------------------------------------------------
285
290
286
291
287 class FCFullSynchronousMultiEngineClient(object):
292 class FCFullSynchronousMultiEngineClient(object):
288
293
289 implements(
294 implements(
290 IFullSynchronousMultiEngine,
295 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
296 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
297 IMultiEngineMapperFactory,
293 IMapper
298 IMapper
294 )
299 )
295
300
296 def __init__(self, remote_reference):
301 def __init__(self, remote_reference):
297 self.remote_reference = remote_reference
302 self.remote_reference = remote_reference
298 self._deferredIDCallbacks = {}
303 self._deferredIDCallbacks = {}
299 # This class manages some pending deferreds through this instance. This
304 # This class manages some pending deferreds through this instance. This
300 # is required for methods like gather/scatter as it enables us to
305 # is required for methods like gather/scatter as it enables us to
301 # create our own pending deferreds for composite operations.
306 # create our own pending deferreds for composite operations.
302 self.pdm = PendingDeferredManager()
307 self.pdm = PendingDeferredManager()
303
308
304 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
305 # Non interface methods
310 # Non interface methods
306 #---------------------------------------------------------------------------
311 #---------------------------------------------------------------------------
307
312
308 def unpackage(self, r):
313 def unpackage(self, r):
309 return pickle.loads(r)
314 return pickle.loads(r)
310
315
311 #---------------------------------------------------------------------------
316 #---------------------------------------------------------------------------
312 # Things related to PendingDeferredManager
317 # Things related to PendingDeferredManager
313 #---------------------------------------------------------------------------
318 #---------------------------------------------------------------------------
314
319
315 def get_pending_deferred(self, deferredID, block=True):
320 def get_pending_deferred(self, deferredID, block=True):
316
321
317 # Because we are managing some pending deferreds locally (through
322 # Because we are managing some pending deferreds locally (through
318 # self.pdm) and some remotely (on the controller), we first try the
323 # self.pdm) and some remotely (on the controller), we first try the
319 # local one and then the remote one.
324 # local one and then the remote one.
320 if self.pdm.quick_has_id(deferredID):
325 if self.pdm.quick_has_id(deferredID):
321 d = self.pdm.get_pending_deferred(deferredID, block)
326 d = self.pdm.get_pending_deferred(deferredID, block)
322 return d
327 return d
323 else:
328 else:
324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
329 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
325 d.addCallback(self.unpackage)
330 d.addCallback(self.unpackage)
326 try:
331 try:
327 callback = self._deferredIDCallbacks.pop(deferredID)
332 callback = self._deferredIDCallbacks.pop(deferredID)
328 except KeyError:
333 except KeyError:
329 callback = None
334 callback = None
330 if callback is not None:
335 if callback is not None:
331 d.addCallback(callback[0], *callback[1], **callback[2])
336 d.addCallback(callback[0], *callback[1], **callback[2])
332 return d
337 return d
333
338
334 def clear_pending_deferreds(self):
339 def clear_pending_deferreds(self):
335
340
336 # This clear both the local (self.pdm) and remote pending deferreds
341 # This clear both the local (self.pdm) and remote pending deferreds
337 self.pdm.clear_pending_deferreds()
342 self.pdm.clear_pending_deferreds()
338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
343 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
339 d2.addCallback(self.unpackage)
344 d2.addCallback(self.unpackage)
340 return d2
345 return d2
341
346
342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
347 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
348 self._deferredIDCallbacks[did] = (callback, args, kwargs)
344 return did
349 return did
345
350
346 #---------------------------------------------------------------------------
351 #---------------------------------------------------------------------------
347 # IEngineMultiplexer related methods
352 # IEngineMultiplexer related methods
348 #---------------------------------------------------------------------------
353 #---------------------------------------------------------------------------
349
354
350 def execute(self, lines, targets='all', block=True):
355 def execute(self, lines, targets='all', block=True):
351 d = self.remote_reference.callRemote('execute', lines, targets, block)
356 d = self.remote_reference.callRemote('execute', lines, targets, block)
352 d.addCallback(self.unpackage)
357 d.addCallback(self.unpackage)
353 return d
358 return d
354
359
355 def push(self, namespace, targets='all', block=True):
360 def push(self, namespace, targets='all', block=True):
356 serial = pickle.dumps(namespace, 2)
361 serial = pickle.dumps(namespace, 2)
357 d = self.remote_reference.callRemote('push', serial, targets, block)
362 d = self.remote_reference.callRemote('push', serial, targets, block)
358 d.addCallback(self.unpackage)
363 d.addCallback(self.unpackage)
359 return d
364 return d
360
365
361 def pull(self, keys, targets='all', block=True):
366 def pull(self, keys, targets='all', block=True):
362 d = self.remote_reference.callRemote('pull', keys, targets, block)
367 d = self.remote_reference.callRemote('pull', keys, targets, block)
363 d.addCallback(self.unpackage)
368 d.addCallback(self.unpackage)
364 return d
369 return d
365
370
366 def push_function(self, namespace, targets='all', block=True):
371 def push_function(self, namespace, targets='all', block=True):
367 cannedNamespace = canDict(namespace)
372 cannedNamespace = canDict(namespace)
368 serial = pickle.dumps(cannedNamespace, 2)
373 serial = pickle.dumps(cannedNamespace, 2)
369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
374 d = self.remote_reference.callRemote('push_function', serial, targets, block)
370 d.addCallback(self.unpackage)
375 d.addCallback(self.unpackage)
371 return d
376 return d
372
377
373 def pull_function(self, keys, targets='all', block=True):
378 def pull_function(self, keys, targets='all', block=True):
374 def uncan_functions(r, keys):
379 def uncan_functions(r, keys):
375 if len(keys)==1 or isinstance(keys, str):
380 if len(keys)==1 or isinstance(keys, str):
376 return uncanSequence(r)
381 return uncanSequence(r)
377 elif len(keys)>1:
382 elif len(keys)>1:
378 return [uncanSequence(s) for s in r]
383 return [uncanSequence(s) for s in r]
379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
384 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
380 if block:
385 if block:
381 d.addCallback(self.unpackage)
386 d.addCallback(self.unpackage)
382 d.addCallback(uncan_functions, keys)
387 d.addCallback(uncan_functions, keys)
383 else:
388 else:
384 d.addCallback(self.unpackage)
389 d.addCallback(self.unpackage)
385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
390 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
386 return d
391 return d
387
392
388 def push_serialized(self, namespace, targets='all', block=True):
393 def push_serialized(self, namespace, targets='all', block=True):
389 cannedNamespace = canDict(namespace)
394 cannedNamespace = canDict(namespace)
390 serial = pickle.dumps(cannedNamespace, 2)
395 serial = pickle.dumps(cannedNamespace, 2)
391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
396 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
392 d.addCallback(self.unpackage)
397 d.addCallback(self.unpackage)
393 return d
398 return d
394
399
395 def pull_serialized(self, keys, targets='all', block=True):
400 def pull_serialized(self, keys, targets='all', block=True):
396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
401 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
397 d.addCallback(self.unpackage)
402 d.addCallback(self.unpackage)
398 return d
403 return d
399
404
400 def get_result(self, i=None, targets='all', block=True):
405 def get_result(self, i=None, targets='all', block=True):
401 if i is None: # This is because None cannot be marshalled by xml-rpc
406 if i is None: # This is because None cannot be marshalled by xml-rpc
402 i = 'None'
407 i = 'None'
403 d = self.remote_reference.callRemote('get_result', i, targets, block)
408 d = self.remote_reference.callRemote('get_result', i, targets, block)
404 d.addCallback(self.unpackage)
409 d.addCallback(self.unpackage)
405 return d
410 return d
406
411
407 def reset(self, targets='all', block=True):
412 def reset(self, targets='all', block=True):
408 d = self.remote_reference.callRemote('reset', targets, block)
413 d = self.remote_reference.callRemote('reset', targets, block)
409 d.addCallback(self.unpackage)
414 d.addCallback(self.unpackage)
410 return d
415 return d
411
416
412 def keys(self, targets='all', block=True):
417 def keys(self, targets='all', block=True):
413 d = self.remote_reference.callRemote('keys', targets, block)
418 d = self.remote_reference.callRemote('keys', targets, block)
414 d.addCallback(self.unpackage)
419 d.addCallback(self.unpackage)
415 return d
420 return d
416
421
417 def kill(self, controller=False, targets='all', block=True):
422 def kill(self, controller=False, targets='all', block=True):
418 d = self.remote_reference.callRemote('kill', controller, targets, block)
423 d = self.remote_reference.callRemote('kill', controller, targets, block)
419 d.addCallback(self.unpackage)
424 d.addCallback(self.unpackage)
420 return d
425 return d
421
426
422 def clear_queue(self, targets='all', block=True):
427 def clear_queue(self, targets='all', block=True):
423 d = self.remote_reference.callRemote('clear_queue', targets, block)
428 d = self.remote_reference.callRemote('clear_queue', targets, block)
424 d.addCallback(self.unpackage)
429 d.addCallback(self.unpackage)
425 return d
430 return d
426
431
427 def queue_status(self, targets='all', block=True):
432 def queue_status(self, targets='all', block=True):
428 d = self.remote_reference.callRemote('queue_status', targets, block)
433 d = self.remote_reference.callRemote('queue_status', targets, block)
429 d.addCallback(self.unpackage)
434 d.addCallback(self.unpackage)
430 return d
435 return d
431
436
432 def set_properties(self, properties, targets='all', block=True):
437 def set_properties(self, properties, targets='all', block=True):
433 serial = pickle.dumps(properties, 2)
438 serial = pickle.dumps(properties, 2)
434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
439 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
435 d.addCallback(self.unpackage)
440 d.addCallback(self.unpackage)
436 return d
441 return d
437
442
438 def get_properties(self, keys=None, targets='all', block=True):
443 def get_properties(self, keys=None, targets='all', block=True):
439 if keys==None:
444 if keys==None:
440 keys='None'
445 keys='None'
441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
446 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
442 d.addCallback(self.unpackage)
447 d.addCallback(self.unpackage)
443 return d
448 return d
444
449
445 def has_properties(self, keys, targets='all', block=True):
450 def has_properties(self, keys, targets='all', block=True):
446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
451 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
447 d.addCallback(self.unpackage)
452 d.addCallback(self.unpackage)
448 return d
453 return d
449
454
450 def del_properties(self, keys, targets='all', block=True):
455 def del_properties(self, keys, targets='all', block=True):
451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
456 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
452 d.addCallback(self.unpackage)
457 d.addCallback(self.unpackage)
453 return d
458 return d
454
459
455 def clear_properties(self, targets='all', block=True):
460 def clear_properties(self, targets='all', block=True):
456 d = self.remote_reference.callRemote('clear_properties', targets, block)
461 d = self.remote_reference.callRemote('clear_properties', targets, block)
457 d.addCallback(self.unpackage)
462 d.addCallback(self.unpackage)
458 return d
463 return d
459
464
460 #---------------------------------------------------------------------------
465 #---------------------------------------------------------------------------
461 # IMultiEngine related methods
466 # IMultiEngine related methods
462 #---------------------------------------------------------------------------
467 #---------------------------------------------------------------------------
463
468
464 def get_ids(self):
469 def get_ids(self):
465 d = self.remote_reference.callRemote('get_ids')
470 d = self.remote_reference.callRemote('get_ids')
466 return d
471 return d
467
472
468 #---------------------------------------------------------------------------
473 #---------------------------------------------------------------------------
469 # ISynchronousMultiEngineCoordinator related methods
474 # ISynchronousMultiEngineCoordinator related methods
470 #---------------------------------------------------------------------------
475 #---------------------------------------------------------------------------
471
476
472 def _process_targets(self, targets):
477 def _process_targets(self, targets):
473 def create_targets(ids):
478 def create_targets(ids):
474 if isinstance(targets, int):
479 if isinstance(targets, int):
475 engines = [targets]
480 engines = [targets]
476 elif targets=='all':
481 elif targets=='all':
477 engines = ids
482 engines = ids
478 elif isinstance(targets, (list, tuple)):
483 elif isinstance(targets, (list, tuple)):
479 engines = targets
484 engines = targets
480 for t in engines:
485 for t in engines:
481 if not t in ids:
486 if not t in ids:
482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
487 raise error.InvalidEngineID("engine with id %r does not exist"%t)
483 return engines
488 return engines
484
489
485 d = self.get_ids()
490 d = self.get_ids()
486 d.addCallback(create_targets)
491 d.addCallback(create_targets)
487 return d
492 return d
488
493
489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
494 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
490
495
491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
496 # Note: scatter and gather handle pending deferreds locally through self.pdm.
492 # This enables us to collect a bunch fo deferred ids and make a secondary
497 # This enables us to collect a bunch fo deferred ids and make a secondary
493 # deferred id that corresponds to the entire group. This logic is extremely
498 # deferred id that corresponds to the entire group. This logic is extremely
494 # difficult to get right though.
499 # difficult to get right though.
495 def do_scatter(engines):
500 def do_scatter(engines):
496 nEngines = len(engines)
501 nEngines = len(engines)
497 mapClass = Map.dists[dist]
502 mapClass = Map.dists[dist]
498 mapObject = mapClass()
503 mapObject = mapClass()
499 d_list = []
504 d_list = []
500 # Loop through and push to each engine in non-blocking mode.
505 # Loop through and push to each engine in non-blocking mode.
501 # This returns a set of deferreds to deferred_ids
506 # This returns a set of deferreds to deferred_ids
502 for index, engineid in enumerate(engines):
507 for index, engineid in enumerate(engines):
503 partition = mapObject.getPartition(seq, index, nEngines)
508 partition = mapObject.getPartition(seq, index, nEngines)
504 if flatten and len(partition) == 1:
509 if flatten and len(partition) == 1:
505 d = self.push({key: partition[0]}, targets=engineid, block=False)
510 d = self.push({key: partition[0]}, targets=engineid, block=False)
506 else:
511 else:
507 d = self.push({key: partition}, targets=engineid, block=False)
512 d = self.push({key: partition}, targets=engineid, block=False)
508 d_list.append(d)
513 d_list.append(d)
509 # Collect the deferred to deferred_ids
514 # Collect the deferred to deferred_ids
510 d = gatherBoth(d_list,
515 d = gatherBoth(d_list,
511 fireOnOneErrback=0,
516 fireOnOneErrback=0,
512 consumeErrors=1,
517 consumeErrors=1,
513 logErrors=0)
518 logErrors=0)
514 # Now d has a list of deferred_ids or Failures coming
519 # Now d has a list of deferred_ids or Failures coming
515 d.addCallback(error.collect_exceptions, 'scatter')
520 d.addCallback(error.collect_exceptions, 'scatter')
516 def process_did_list(did_list):
521 def process_did_list(did_list):
517 """Turn a list of deferred_ids into a final result or failure."""
522 """Turn a list of deferred_ids into a final result or failure."""
518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
523 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
519 final_d = gatherBoth(new_d_list,
524 final_d = gatherBoth(new_d_list,
520 fireOnOneErrback=0,
525 fireOnOneErrback=0,
521 consumeErrors=1,
526 consumeErrors=1,
522 logErrors=0)
527 logErrors=0)
523 final_d.addCallback(error.collect_exceptions, 'scatter')
528 final_d.addCallback(error.collect_exceptions, 'scatter')
524 final_d.addCallback(lambda lop: [i[0] for i in lop])
529 final_d.addCallback(lambda lop: [i[0] for i in lop])
525 return final_d
530 return final_d
526 # Now, depending on block, we need to handle the list deferred_ids
531 # Now, depending on block, we need to handle the list deferred_ids
527 # coming down the pipe diferently.
532 # coming down the pipe diferently.
528 if block:
533 if block:
529 # If we are blocking register a callback that will transform the
534 # If we are blocking register a callback that will transform the
530 # list of deferred_ids into the final result.
535 # list of deferred_ids into the final result.
531 d.addCallback(process_did_list)
536 d.addCallback(process_did_list)
532 return d
537 return d
533 else:
538 else:
534 # Here we are going to use a _local_ PendingDeferredManager.
539 # Here we are going to use a _local_ PendingDeferredManager.
535 deferred_id = self.pdm.get_deferred_id()
540 deferred_id = self.pdm.get_deferred_id()
536 # This is the deferred we will return to the user that will fire
541 # This is the deferred we will return to the user that will fire
537 # with the local deferred_id AFTER we have received the list of
542 # with the local deferred_id AFTER we have received the list of
538 # primary deferred_ids
543 # primary deferred_ids
539 d_to_return = defer.Deferred()
544 d_to_return = defer.Deferred()
540 def do_it(did_list):
545 def do_it(did_list):
541 """Produce a deferred to the final result, but first fire the
546 """Produce a deferred to the final result, but first fire the
542 deferred we will return to the user that has the local
547 deferred we will return to the user that has the local
543 deferred id."""
548 deferred id."""
544 d_to_return.callback(deferred_id)
549 d_to_return.callback(deferred_id)
545 return process_did_list(did_list)
550 return process_did_list(did_list)
546 d.addCallback(do_it)
551 d.addCallback(do_it)
547 # Now save the deferred to the final result
552 # Now save the deferred to the final result
548 self.pdm.save_pending_deferred(d, deferred_id)
553 self.pdm.save_pending_deferred(d, deferred_id)
549 return d_to_return
554 return d_to_return
550
555
551 d = self._process_targets(targets)
556 d = self._process_targets(targets)
552 d.addCallback(do_scatter)
557 d.addCallback(do_scatter)
553 return d
558 return d
554
559
555 def gather(self, key, dist='b', targets='all', block=True):
560 def gather(self, key, dist='b', targets='all', block=True):
556
561
557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
562 # Note: scatter and gather handle pending deferreds locally through self.pdm.
558 # This enables us to collect a bunch fo deferred ids and make a secondary
563 # This enables us to collect a bunch fo deferred ids and make a secondary
559 # deferred id that corresponds to the entire group. This logic is extremely
564 # deferred id that corresponds to the entire group. This logic is extremely
560 # difficult to get right though.
565 # difficult to get right though.
561 def do_gather(engines):
566 def do_gather(engines):
562 nEngines = len(engines)
567 nEngines = len(engines)
563 mapClass = Map.dists[dist]
568 mapClass = Map.dists[dist]
564 mapObject = mapClass()
569 mapObject = mapClass()
565 d_list = []
570 d_list = []
566 # Loop through and push to each engine in non-blocking mode.
571 # Loop through and push to each engine in non-blocking mode.
567 # This returns a set of deferreds to deferred_ids
572 # This returns a set of deferreds to deferred_ids
568 for index, engineid in enumerate(engines):
573 for index, engineid in enumerate(engines):
569 d = self.pull(key, targets=engineid, block=False)
574 d = self.pull(key, targets=engineid, block=False)
570 d_list.append(d)
575 d_list.append(d)
571 # Collect the deferred to deferred_ids
576 # Collect the deferred to deferred_ids
572 d = gatherBoth(d_list,
577 d = gatherBoth(d_list,
573 fireOnOneErrback=0,
578 fireOnOneErrback=0,
574 consumeErrors=1,
579 consumeErrors=1,
575 logErrors=0)
580 logErrors=0)
576 # Now d has a list of deferred_ids or Failures coming
581 # Now d has a list of deferred_ids or Failures coming
577 d.addCallback(error.collect_exceptions, 'scatter')
582 d.addCallback(error.collect_exceptions, 'scatter')
578 def process_did_list(did_list):
583 def process_did_list(did_list):
579 """Turn a list of deferred_ids into a final result or failure."""
584 """Turn a list of deferred_ids into a final result or failure."""
580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
585 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
581 final_d = gatherBoth(new_d_list,
586 final_d = gatherBoth(new_d_list,
582 fireOnOneErrback=0,
587 fireOnOneErrback=0,
583 consumeErrors=1,
588 consumeErrors=1,
584 logErrors=0)
589 logErrors=0)
585 final_d.addCallback(error.collect_exceptions, 'gather')
590 final_d.addCallback(error.collect_exceptions, 'gather')
586 final_d.addCallback(lambda lop: [i[0] for i in lop])
591 final_d.addCallback(lambda lop: [i[0] for i in lop])
587 final_d.addCallback(mapObject.joinPartitions)
592 final_d.addCallback(mapObject.joinPartitions)
588 return final_d
593 return final_d
589 # Now, depending on block, we need to handle the list deferred_ids
594 # Now, depending on block, we need to handle the list deferred_ids
590 # coming down the pipe diferently.
595 # coming down the pipe diferently.
591 if block:
596 if block:
592 # If we are blocking register a callback that will transform the
597 # If we are blocking register a callback that will transform the
593 # list of deferred_ids into the final result.
598 # list of deferred_ids into the final result.
594 d.addCallback(process_did_list)
599 d.addCallback(process_did_list)
595 return d
600 return d
596 else:
601 else:
597 # Here we are going to use a _local_ PendingDeferredManager.
602 # Here we are going to use a _local_ PendingDeferredManager.
598 deferred_id = self.pdm.get_deferred_id()
603 deferred_id = self.pdm.get_deferred_id()
599 # This is the deferred we will return to the user that will fire
604 # This is the deferred we will return to the user that will fire
600 # with the local deferred_id AFTER we have received the list of
605 # with the local deferred_id AFTER we have received the list of
601 # primary deferred_ids
606 # primary deferred_ids
602 d_to_return = defer.Deferred()
607 d_to_return = defer.Deferred()
603 def do_it(did_list):
608 def do_it(did_list):
604 """Produce a deferred to the final result, but first fire the
609 """Produce a deferred to the final result, but first fire the
605 deferred we will return to the user that has the local
610 deferred we will return to the user that has the local
606 deferred id."""
611 deferred id."""
607 d_to_return.callback(deferred_id)
612 d_to_return.callback(deferred_id)
608 return process_did_list(did_list)
613 return process_did_list(did_list)
609 d.addCallback(do_it)
614 d.addCallback(do_it)
610 # Now save the deferred to the final result
615 # Now save the deferred to the final result
611 self.pdm.save_pending_deferred(d, deferred_id)
616 self.pdm.save_pending_deferred(d, deferred_id)
612 return d_to_return
617 return d_to_return
613
618
614 d = self._process_targets(targets)
619 d = self._process_targets(targets)
615 d.addCallback(do_gather)
620 d.addCallback(do_gather)
616 return d
621 return d
617
622
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
623 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
619 """
624 """
620 A parallelized version of Python's builtin map.
625 A parallelized version of Python's builtin map.
621
626
622 This has a slightly different syntax than the builtin `map`.
627 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
628 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
629 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
630 be passed in a list or tuple.
626
631
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
632 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
633
629 Most users will want to use parallel functions or the `mapper`
634 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
635 and `map` methods for an API that follows that of the builtin
631 `map`.
636 `map`.
632 """
637 """
633 if not isinstance(sequences, (list, tuple)):
638 if not isinstance(sequences, (list, tuple)):
634 raise TypeError('sequences must be a list or tuple')
639 raise TypeError('sequences must be a list or tuple')
635 max_len = max(len(s) for s in sequences)
640 max_len = max(len(s) for s in sequences)
636 for s in sequences:
641 for s in sequences:
637 if len(s)!=max_len:
642 if len(s)!=max_len:
638 raise ValueError('all sequences must have equal length')
643 raise ValueError('all sequences must have equal length')
639 if isinstance(func, FunctionType):
644 if isinstance(func, FunctionType):
640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
645 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
646 d.addCallback(lambda did: self.get_pending_deferred(did, True))
642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
647 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
643 elif isinstance(func, str):
648 elif isinstance(func, str):
644 d = defer.succeed(None)
649 d = defer.succeed(None)
645 sourceToRun = \
650 sourceToRun = \
646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
651 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
647 else:
652 else:
648 raise TypeError("func must be a function or str")
653 raise TypeError("func must be a function or str")
649
654
650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
655 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
656 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
657 d.addCallback(lambda did: self.get_pending_deferred(did, True))
653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
658 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
654 return d
659 return d
655
660
656 def map(self, func, *sequences):
661 def map(self, func, *sequences):
657 """
662 """
658 A parallel version of Python's builtin `map` function.
663 A parallel version of Python's builtin `map` function.
659
664
660 This method applies a function to sequences of arguments. It
665 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
666 follows the same syntax as the builtin `map`.
662
667
663 This method creates a mapper objects by calling `self.mapper` with
668 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
669 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
670 the documentation of `mapper` for more details.
666 """
671 """
667 return self.mapper().map(func, *sequences)
672 return self.mapper().map(func, *sequences)
668
673
669 def mapper(self, dist='b', targets='all', block=True):
674 def mapper(self, dist='b', targets='all', block=True):
670 """
675 """
671 Create a mapper object that has a `map` method.
676 Create a mapper object that has a `map` method.
672
677
673 This method returns an object that implements the `IMapper`
678 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
679 interface. This method is a factory that is used to control how
675 the map happens.
680 the map happens.
676
681
677 :Parameters:
682 :Parameters:
678 dist : str
683 dist : str
679 What decomposition to use, 'b' is the only one supported
684 What decomposition to use, 'b' is the only one supported
680 currently
685 currently
681 targets : str, int, sequence of ints
686 targets : str, int, sequence of ints
682 Which engines to use for the map
687 Which engines to use for the map
683 block : boolean
688 block : boolean
684 Should calls to `map` block or not
689 Should calls to `map` block or not
685 """
690 """
686 return MultiEngineMapper(self, dist, targets, block)
691 return MultiEngineMapper(self, dist, targets, block)
687
692
688 def parallel(self, dist='b', targets='all', block=True):
693 def parallel(self, dist='b', targets='all', block=True):
689 """
694 """
690 A decorator that turns a function into a parallel function.
695 A decorator that turns a function into a parallel function.
691
696
692 This can be used as:
697 This can be used as:
693
698
694 @parallel()
699 @parallel()
695 def f(x, y)
700 def f(x, y)
696 ...
701 ...
697
702
698 f(range(10), range(10))
703 f(range(10), range(10))
699
704
700 This causes f(0,0), f(1,1), ... to be called in parallel.
705 This causes f(0,0), f(1,1), ... to be called in parallel.
701
706
702 :Parameters:
707 :Parameters:
703 dist : str
708 dist : str
704 What decomposition to use, 'b' is the only one supported
709 What decomposition to use, 'b' is the only one supported
705 currently
710 currently
706 targets : str, int, sequence of ints
711 targets : str, int, sequence of ints
707 Which engines to use for the map
712 Which engines to use for the map
708 block : boolean
713 block : boolean
709 Should calls to `map` block or not
714 Should calls to `map` block or not
710 """
715 """
711 mapper = self.mapper(dist, targets, block)
716 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
717 pf = ParallelFunction(mapper)
713 return pf
718 return pf
714
719
715 #---------------------------------------------------------------------------
720 #---------------------------------------------------------------------------
716 # ISynchronousMultiEngineExtras related methods
721 # ISynchronousMultiEngineExtras related methods
717 #---------------------------------------------------------------------------
722 #---------------------------------------------------------------------------
718
723
719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
724 def _transformPullResult(self, pushResult, multitargets, lenKeys):
720 if not multitargets:
725 if not multitargets:
721 result = pushResult[0]
726 result = pushResult[0]
722 elif lenKeys > 1:
727 elif lenKeys > 1:
723 result = zip(*pushResult)
728 result = zip(*pushResult)
724 elif lenKeys is 1:
729 elif lenKeys is 1:
725 result = list(pushResult)
730 result = list(pushResult)
726 return result
731 return result
727
732
728 def zip_pull(self, keys, targets='all', block=True):
733 def zip_pull(self, keys, targets='all', block=True):
729 multitargets = not isinstance(targets, int) and len(targets) > 1
734 multitargets = not isinstance(targets, int) and len(targets) > 1
730 lenKeys = len(keys)
735 lenKeys = len(keys)
731 d = self.pull(keys, targets=targets, block=block)
736 d = self.pull(keys, targets=targets, block=block)
732 if block:
737 if block:
733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
738 d.addCallback(self._transformPullResult, multitargets, lenKeys)
734 else:
739 else:
735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
740 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
736 return d
741 return d
737
742
738 def run(self, fname, targets='all', block=True):
743 def run(self, fname, targets='all', block=True):
739 fileobj = open(fname,'r')
744 fileobj = open(fname,'r')
740 source = fileobj.read()
745 source = fileobj.read()
741 fileobj.close()
746 fileobj.close()
742 # if the compilation blows, we get a local error right away
747 # if the compilation blows, we get a local error right away
743 try:
748 try:
744 code = compile(source,fname,'exec')
749 code = compile(source,fname,'exec')
745 except:
750 except:
746 return defer.fail(failure.Failure())
751 return defer.fail(failure.Failure())
747 # Now run the code
752 # Now run the code
748 d = self.execute(source, targets=targets, block=block)
753 d = self.execute(source, targets=targets, block=block)
749 return d
754 return d
750
755
751 #---------------------------------------------------------------------------
756 #---------------------------------------------------------------------------
752 # IBlockingClientAdaptor related methods
757 # IBlockingClientAdaptor related methods
753 #---------------------------------------------------------------------------
758 #---------------------------------------------------------------------------
754
759
755 def adapt_to_blocking_client(self):
760 def adapt_to_blocking_client(self):
756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
761 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
757 return IFullBlockingMultiEngineClient(self)
762 return IFullBlockingMultiEngineClient(self)
@@ -1,340 +1,345 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
3 """A Foolscap interface to a TaskController.
3 """A Foolscap interface to a TaskController.
4
4
5 This class lets Foolscap clients talk to a TaskController.
5 This class lets Foolscap clients talk to a TaskController.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 import cPickle as pickle
21 import cPickle as pickle
22 import xmlrpclib, copy
22 import xmlrpclib, copy
23
23
24 from zope.interface import Interface, implements
24 from zope.interface import Interface, implements
25 from twisted.internet import defer
25 from twisted.internet import defer
26 from twisted.python import components, failure
26 from twisted.python import components, failure
27
27
28 from foolscap import Referenceable
28 try:
29 # This is preferred in foolscap v > 0.4.3
30 from foolscap.api import Referenceable
31 except ImportError:
32 # Fallback for older versions
33 from foolscap import Referenceable
29
34
30 from IPython.kernel.twistedutil import blockingCallFromThread
35 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel import error, task as taskmodule, taskclient
36 from IPython.kernel import error, task as taskmodule, taskclient
32 from IPython.kernel.pickleutil import can, uncan
37 from IPython.kernel.pickleutil import can, uncan
33 from IPython.kernel.clientinterfaces import (
38 from IPython.kernel.clientinterfaces import (
34 IFCClientInterfaceProvider,
39 IFCClientInterfaceProvider,
35 IBlockingClientAdaptor
40 IBlockingClientAdaptor
36 )
41 )
37 from IPython.kernel.mapper import (
42 from IPython.kernel.mapper import (
38 TaskMapper,
43 TaskMapper,
39 ITaskMapperFactory,
44 ITaskMapperFactory,
40 IMapper
45 IMapper
41 )
46 )
42 from IPython.kernel.parallelfunction import (
47 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
48 ParallelFunction,
44 ITaskParallelDecorator
49 ITaskParallelDecorator
45 )
50 )
46
51
47 #-------------------------------------------------------------------------------
52 #-------------------------------------------------------------------------------
48 # The Controller side of things
53 # The Controller side of things
49 #-------------------------------------------------------------------------------
54 #-------------------------------------------------------------------------------
50
55
51
56
52 class IFCTaskController(Interface):
57 class IFCTaskController(Interface):
53 """Foolscap interface to task controller.
58 """Foolscap interface to task controller.
54
59
55 See the documentation of `ITaskController` for more information.
60 See the documentation of `ITaskController` for more information.
56 """
61 """
57 def remote_run(binTask):
62 def remote_run(binTask):
58 """"""
63 """"""
59
64
60 def remote_abort(taskid):
65 def remote_abort(taskid):
61 """"""
66 """"""
62
67
63 def remote_get_task_result(taskid, block=False):
68 def remote_get_task_result(taskid, block=False):
64 """"""
69 """"""
65
70
66 def remote_barrier(taskids):
71 def remote_barrier(taskids):
67 """"""
72 """"""
68
73
69 def remote_spin():
74 def remote_spin():
70 """"""
75 """"""
71
76
72 def remote_queue_status(verbose):
77 def remote_queue_status(verbose):
73 """"""
78 """"""
74
79
75 def remote_clear(taskids=None):
80 def remote_clear(taskids=None):
76 """"""
81 """"""
77
82
78
83
79 class FCTaskControllerFromTaskController(Referenceable):
84 class FCTaskControllerFromTaskController(Referenceable):
80 """
85 """
81 Adapt a `TaskController` to an `IFCTaskController`
86 Adapt a `TaskController` to an `IFCTaskController`
82
87
83 This class is used to expose a `TaskController` over the wire using
88 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
89 the Foolscap network protocol.
85 """
90 """
86
91
87 implements(IFCTaskController, IFCClientInterfaceProvider)
92 implements(IFCTaskController, IFCClientInterfaceProvider)
88
93
89 def __init__(self, taskController):
94 def __init__(self, taskController):
90 self.taskController = taskController
95 self.taskController = taskController
91
96
92 #---------------------------------------------------------------------------
97 #---------------------------------------------------------------------------
93 # Non interface methods
98 # Non interface methods
94 #---------------------------------------------------------------------------
99 #---------------------------------------------------------------------------
95
100
96 def packageFailure(self, f):
101 def packageFailure(self, f):
97 f.cleanFailure()
102 f.cleanFailure()
98 return self.packageSuccess(f)
103 return self.packageSuccess(f)
99
104
100 def packageSuccess(self, obj):
105 def packageSuccess(self, obj):
101 serial = pickle.dumps(obj, 2)
106 serial = pickle.dumps(obj, 2)
102 return serial
107 return serial
103
108
104 #---------------------------------------------------------------------------
109 #---------------------------------------------------------------------------
105 # ITaskController related methods
110 # ITaskController related methods
106 #---------------------------------------------------------------------------
111 #---------------------------------------------------------------------------
107
112
108 def remote_run(self, ptask):
113 def remote_run(self, ptask):
109 try:
114 try:
110 task = pickle.loads(ptask)
115 task = pickle.loads(ptask)
111 task.uncan_task()
116 task.uncan_task()
112 except:
117 except:
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
118 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
114 else:
119 else:
115 d = self.taskController.run(task)
120 d = self.taskController.run(task)
116 d.addCallback(self.packageSuccess)
121 d.addCallback(self.packageSuccess)
117 d.addErrback(self.packageFailure)
122 d.addErrback(self.packageFailure)
118 return d
123 return d
119
124
120 def remote_abort(self, taskid):
125 def remote_abort(self, taskid):
121 d = self.taskController.abort(taskid)
126 d = self.taskController.abort(taskid)
122 d.addCallback(self.packageSuccess)
127 d.addCallback(self.packageSuccess)
123 d.addErrback(self.packageFailure)
128 d.addErrback(self.packageFailure)
124 return d
129 return d
125
130
126 def remote_get_task_result(self, taskid, block=False):
131 def remote_get_task_result(self, taskid, block=False):
127 d = self.taskController.get_task_result(taskid, block)
132 d = self.taskController.get_task_result(taskid, block)
128 d.addCallback(self.packageSuccess)
133 d.addCallback(self.packageSuccess)
129 d.addErrback(self.packageFailure)
134 d.addErrback(self.packageFailure)
130 return d
135 return d
131
136
132 def remote_barrier(self, taskids):
137 def remote_barrier(self, taskids):
133 d = self.taskController.barrier(taskids)
138 d = self.taskController.barrier(taskids)
134 d.addCallback(self.packageSuccess)
139 d.addCallback(self.packageSuccess)
135 d.addErrback(self.packageFailure)
140 d.addErrback(self.packageFailure)
136 return d
141 return d
137
142
138 def remote_spin(self):
143 def remote_spin(self):
139 d = self.taskController.spin()
144 d = self.taskController.spin()
140 d.addCallback(self.packageSuccess)
145 d.addCallback(self.packageSuccess)
141 d.addErrback(self.packageFailure)
146 d.addErrback(self.packageFailure)
142 return d
147 return d
143
148
144 def remote_queue_status(self, verbose):
149 def remote_queue_status(self, verbose):
145 d = self.taskController.queue_status(verbose)
150 d = self.taskController.queue_status(verbose)
146 d.addCallback(self.packageSuccess)
151 d.addCallback(self.packageSuccess)
147 d.addErrback(self.packageFailure)
152 d.addErrback(self.packageFailure)
148 return d
153 return d
149
154
150 def remote_clear(self,taskids=None):
155 def remote_clear(self,taskids=None):
151 d = self.taskController.clear(taskids)
156 d = self.taskController.clear(taskids)
152 d.addCallback(self.packageSuccess)
157 d.addCallback(self.packageSuccess)
153 d.addErrback(self.packageFailure)
158 d.addErrback(self.packageFailure)
154 return d
159 return d
155
160
156 def remote_get_client_name(self):
161 def remote_get_client_name(self):
157 return 'IPython.kernel.taskfc.FCTaskClient'
162 return 'IPython.kernel.taskfc.FCTaskClient'
158
163
159 components.registerAdapter(FCTaskControllerFromTaskController,
164 components.registerAdapter(FCTaskControllerFromTaskController,
160 taskmodule.ITaskController, IFCTaskController)
165 taskmodule.ITaskController, IFCTaskController)
161
166
162
167
163 #-------------------------------------------------------------------------------
168 #-------------------------------------------------------------------------------
164 # The Client side of things
169 # The Client side of things
165 #-------------------------------------------------------------------------------
170 #-------------------------------------------------------------------------------
166
171
167 class FCTaskClient(object):
172 class FCTaskClient(object):
168 """
173 """
169 Client class for Foolscap exposed `TaskController`.
174 Client class for Foolscap exposed `TaskController`.
170
175
171 This class is an adapter that makes a `RemoteReference` to a
176 This class is an adapter that makes a `RemoteReference` to a
172 `TaskController` look like an actual `ITaskController` on the client side.
177 `TaskController` look like an actual `ITaskController` on the client side.
173
178
174 This class also implements `IBlockingClientAdaptor` so that clients can
179 This class also implements `IBlockingClientAdaptor` so that clients can
175 automatically get a blocking version of this class.
180 automatically get a blocking version of this class.
176 """
181 """
177
182
178 implements(
183 implements(
179 taskmodule.ITaskController,
184 taskmodule.ITaskController,
180 IBlockingClientAdaptor,
185 IBlockingClientAdaptor,
181 ITaskMapperFactory,
186 ITaskMapperFactory,
182 IMapper,
187 IMapper,
183 ITaskParallelDecorator
188 ITaskParallelDecorator
184 )
189 )
185
190
186 def __init__(self, remote_reference):
191 def __init__(self, remote_reference):
187 self.remote_reference = remote_reference
192 self.remote_reference = remote_reference
188
193
189 #---------------------------------------------------------------------------
194 #---------------------------------------------------------------------------
190 # Non interface methods
195 # Non interface methods
191 #---------------------------------------------------------------------------
196 #---------------------------------------------------------------------------
192
197
193 def unpackage(self, r):
198 def unpackage(self, r):
194 return pickle.loads(r)
199 return pickle.loads(r)
195
200
196 #---------------------------------------------------------------------------
201 #---------------------------------------------------------------------------
197 # ITaskController related methods
202 # ITaskController related methods
198 #---------------------------------------------------------------------------
203 #---------------------------------------------------------------------------
199 def run(self, task):
204 def run(self, task):
200 """Run a task on the `TaskController`.
205 """Run a task on the `TaskController`.
201
206
202 See the documentation of the `MapTask` and `StringTask` classes for
207 See the documentation of the `MapTask` and `StringTask` classes for
203 details on how to build a task of different types.
208 details on how to build a task of different types.
204
209
205 :Parameters:
210 :Parameters:
206 task : an `ITask` implementer
211 task : an `ITask` implementer
207
212
208 :Returns: The int taskid of the submitted task. Pass this to
213 :Returns: The int taskid of the submitted task. Pass this to
209 `get_task_result` to get the `TaskResult` object.
214 `get_task_result` to get the `TaskResult` object.
210 """
215 """
211 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
216 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
212 task.can_task()
217 task.can_task()
213 ptask = pickle.dumps(task, 2)
218 ptask = pickle.dumps(task, 2)
214 task.uncan_task()
219 task.uncan_task()
215 d = self.remote_reference.callRemote('run', ptask)
220 d = self.remote_reference.callRemote('run', ptask)
216 d.addCallback(self.unpackage)
221 d.addCallback(self.unpackage)
217 return d
222 return d
218
223
219 def get_task_result(self, taskid, block=False):
224 def get_task_result(self, taskid, block=False):
220 """
225 """
221 Get a task result by taskid.
226 Get a task result by taskid.
222
227
223 :Parameters:
228 :Parameters:
224 taskid : int
229 taskid : int
225 The taskid of the task to be retrieved.
230 The taskid of the task to be retrieved.
226 block : boolean
231 block : boolean
227 Should I block until the task is done?
232 Should I block until the task is done?
228
233
229 :Returns: A `TaskResult` object that encapsulates the task result.
234 :Returns: A `TaskResult` object that encapsulates the task result.
230 """
235 """
231 d = self.remote_reference.callRemote('get_task_result', taskid, block)
236 d = self.remote_reference.callRemote('get_task_result', taskid, block)
232 d.addCallback(self.unpackage)
237 d.addCallback(self.unpackage)
233 return d
238 return d
234
239
235 def abort(self, taskid):
240 def abort(self, taskid):
236 """
241 """
237 Abort a task by taskid.
242 Abort a task by taskid.
238
243
239 :Parameters:
244 :Parameters:
240 taskid : int
245 taskid : int
241 The taskid of the task to be aborted.
246 The taskid of the task to be aborted.
242 """
247 """
243 d = self.remote_reference.callRemote('abort', taskid)
248 d = self.remote_reference.callRemote('abort', taskid)
244 d.addCallback(self.unpackage)
249 d.addCallback(self.unpackage)
245 return d
250 return d
246
251
247 def barrier(self, taskids):
252 def barrier(self, taskids):
248 """Block until a set of tasks are completed.
253 """Block until a set of tasks are completed.
249
254
250 :Parameters:
255 :Parameters:
251 taskids : list, tuple
256 taskids : list, tuple
252 A sequence of taskids to block on.
257 A sequence of taskids to block on.
253 """
258 """
254 d = self.remote_reference.callRemote('barrier', taskids)
259 d = self.remote_reference.callRemote('barrier', taskids)
255 d.addCallback(self.unpackage)
260 d.addCallback(self.unpackage)
256 return d
261 return d
257
262
258 def spin(self):
263 def spin(self):
259 """
264 """
260 Touch the scheduler, to resume scheduling without submitting a task.
265 Touch the scheduler, to resume scheduling without submitting a task.
261
266
262 This method only needs to be called in unusual situations where the
267 This method only needs to be called in unusual situations where the
263 scheduler is idle for some reason.
268 scheduler is idle for some reason.
264 """
269 """
265 d = self.remote_reference.callRemote('spin')
270 d = self.remote_reference.callRemote('spin')
266 d.addCallback(self.unpackage)
271 d.addCallback(self.unpackage)
267 return d
272 return d
268
273
269 def queue_status(self, verbose=False):
274 def queue_status(self, verbose=False):
270 """
275 """
271 Get a dictionary with the current state of the task queue.
276 Get a dictionary with the current state of the task queue.
272
277
273 :Parameters:
278 :Parameters:
274 verbose : boolean
279 verbose : boolean
275 If True, return a list of taskids. If False, simply give
280 If True, return a list of taskids. If False, simply give
276 the number of tasks with each status.
281 the number of tasks with each status.
277
282
278 :Returns:
283 :Returns:
279 A dict with the queue status.
284 A dict with the queue status.
280 """
285 """
281 d = self.remote_reference.callRemote('queue_status', verbose)
286 d = self.remote_reference.callRemote('queue_status', verbose)
282 d.addCallback(self.unpackage)
287 d.addCallback(self.unpackage)
283 return d
288 return d
284
289
285 def clear(self,taskids=None):
290 def clear(self,taskids=None):
286 """
291 """
287 Clear previously run tasks from the task controller.
292 Clear previously run tasks from the task controller.
288 :Parameters:
293 :Parameters:
289 taskids : list, tuple, None
294 taskids : list, tuple, None
290 A sequence of taskids whose results we should drop.
295 A sequence of taskids whose results we should drop.
291 if None: clear all results
296 if None: clear all results
292
297
293 :Returns:
298 :Returns:
294 An int, the number of tasks cleared
299 An int, the number of tasks cleared
295
300
296 This is needed because the task controller keep all task results
301 This is needed because the task controller keep all task results
297 in memory. This can be a problem is there are many completed
302 in memory. This can be a problem is there are many completed
298 tasks. Users should call this periodically to clean out these
303 tasks. Users should call this periodically to clean out these
299 cached task results.
304 cached task results.
300 """
305 """
301 d = self.remote_reference.callRemote('clear', taskids)
306 d = self.remote_reference.callRemote('clear', taskids)
302 d.addCallback(self.unpackage)
307 d.addCallback(self.unpackage)
303 return d
308 return d
304
309
305 def adapt_to_blocking_client(self):
310 def adapt_to_blocking_client(self):
306 """
311 """
307 Wrap self in a blocking version that implements `IBlockingTaskClient.
312 Wrap self in a blocking version that implements `IBlockingTaskClient.
308 """
313 """
309 from IPython.kernel.taskclient import IBlockingTaskClient
314 from IPython.kernel.taskclient import IBlockingTaskClient
310 return IBlockingTaskClient(self)
315 return IBlockingTaskClient(self)
311
316
312 def map(self, func, *sequences):
317 def map(self, func, *sequences):
313 """
318 """
314 Apply func to *sequences elementwise. Like Python's builtin map.
319 Apply func to *sequences elementwise. Like Python's builtin map.
315
320
316 This version is load balanced.
321 This version is load balanced.
317 """
322 """
318 return self.mapper().map(func, *sequences)
323 return self.mapper().map(func, *sequences)
319
324
320 def mapper(self, clear_before=False, clear_after=False, retries=0,
325 def mapper(self, clear_before=False, clear_after=False, retries=0,
321 recovery_task=None, depend=None, block=True):
326 recovery_task=None, depend=None, block=True):
322 """
327 """
323 Create an `IMapper` implementer with a given set of arguments.
328 Create an `IMapper` implementer with a given set of arguments.
324
329
325 The `IMapper` created using a task controller is load balanced.
330 The `IMapper` created using a task controller is load balanced.
326
331
327 See the documentation for `IPython.kernel.task.BaseTask` for
332 See the documentation for `IPython.kernel.task.BaseTask` for
328 documentation on the arguments to this method.
333 documentation on the arguments to this method.
329 """
334 """
330 return TaskMapper(self, clear_before=clear_before,
335 return TaskMapper(self, clear_before=clear_before,
331 clear_after=clear_after, retries=retries,
336 clear_after=clear_after, retries=retries,
332 recovery_task=recovery_task, depend=depend, block=block)
337 recovery_task=recovery_task, depend=depend, block=block)
333
338
334 def parallel(self, clear_before=False, clear_after=False, retries=0,
339 def parallel(self, clear_before=False, clear_after=False, retries=0,
335 recovery_task=None, depend=None, block=True):
340 recovery_task=None, depend=None, block=True):
336 mapper = self.mapper(clear_before, clear_after, retries,
341 mapper = self.mapper(clear_before, clear_after, retries,
337 recovery_task, depend, block)
342 recovery_task, depend, block)
338 pf = ParallelFunction(mapper)
343 pf = ParallelFunction(mapper)
339 return pf
344 return pf
340
345
General Comments 0
You need to be logged in to leave comments. Login now