Show More
@@ -261,12 +261,19 b' class AsyncMapResult(AsyncResult):' | |||||
261 | """Class for representing results of non-blocking gathers. |
|
261 | """Class for representing results of non-blocking gathers. | |
262 |
|
262 | |||
263 | This will properly reconstruct the gather. |
|
263 | This will properly reconstruct the gather. | |
|
264 | ||||
|
265 | This class is iterable at any time, and will wait on results as they come. | |||
|
266 | ||||
|
267 | If ordered=False, then the first results to arrive will come first, otherwise | |||
|
268 | results will be yielded in the order they were submitted. | |||
|
269 | ||||
264 | """ |
|
270 | """ | |
265 |
|
271 | |||
266 | def __init__(self, client, msg_ids, mapObject, fname=''): |
|
272 | def __init__(self, client, msg_ids, mapObject, fname='', ordered=True): | |
267 | AsyncResult.__init__(self, client, msg_ids, fname=fname) |
|
273 | AsyncResult.__init__(self, client, msg_ids, fname=fname) | |
268 | self._mapObject = mapObject |
|
274 | self._mapObject = mapObject | |
269 | self._single_result = False |
|
275 | self._single_result = False | |
|
276 | self.ordered = ordered | |||
270 |
|
277 | |||
271 | def _reconstruct_result(self, res): |
|
278 | def _reconstruct_result(self, res): | |
272 | """Perform the gather on the actual results.""" |
|
279 | """Perform the gather on the actual results.""" | |
@@ -274,6 +281,13 b' class AsyncMapResult(AsyncResult):' | |||||
274 |
|
281 | |||
275 | # asynchronous iterator: |
|
282 | # asynchronous iterator: | |
276 | def __iter__(self): |
|
283 | def __iter__(self): | |
|
284 | it = self._ordered_iter if self.ordered else self._unordered_iter | |||
|
285 | for r in it(): | |||
|
286 | yield r | |||
|
287 | ||||
|
288 | # asynchronous ordered iterator: | |||
|
289 | def _ordered_iter(self): | |||
|
290 | """iterator for results *as they arrive*, preserving submission order.""" | |||
277 | try: |
|
291 | try: | |
278 | rlist = self.get(0) |
|
292 | rlist = self.get(0) | |
279 | except error.TimeoutError: |
|
293 | except error.TimeoutError: | |
@@ -294,6 +308,42 b' class AsyncMapResult(AsyncResult):' | |||||
294 | for r in rlist: |
|
308 | for r in rlist: | |
295 | yield r |
|
309 | yield r | |
296 |
|
310 | |||
|
311 | # asynchronous unordered iterator: | |||
|
312 | def _unordered_iter(self): | |||
|
313 | """iterator for results *as they arrive*, on FCFS basis, ignoring submission order.""" | |||
|
314 | try: | |||
|
315 | rlist = self.get(0) | |||
|
316 | except error.TimeoutError: | |||
|
317 | pending = set(self.msg_ids) | |||
|
318 | while pending: | |||
|
319 | try: | |||
|
320 | self._client.wait(pending, 1e-3) | |||
|
321 | except error.TimeoutError: | |||
|
322 | # ignore timeout error, because that only means | |||
|
323 | # *some* jobs are outstanding | |||
|
324 | pass | |||
|
325 | # update ready set with those no longer outstanding: | |||
|
326 | ready = pending.difference(self._client.outstanding) | |||
|
327 | # update pending to exclude those that are finished | |||
|
328 | pending = pending.difference(ready) | |||
|
329 | while ready: | |||
|
330 | msg_id = ready.pop() | |||
|
331 | ar = AsyncResult(self._client, msg_id, self._fname) | |||
|
332 | rlist = ar.get() | |||
|
333 | try: | |||
|
334 | for r in rlist: | |||
|
335 | yield r | |||
|
336 | except TypeError: | |||
|
337 | # flattened, not a list | |||
|
338 | # this could get broken by flattened data that returns iterables | |||
|
339 | # but most calls to map do not expose the `flatten` argument | |||
|
340 | yield rlist | |||
|
341 | else: | |||
|
342 | # already done | |||
|
343 | for r in rlist: | |||
|
344 | yield r | |||
|
345 | ||||
|
346 | ||||
297 |
|
347 | |||
298 | class AsyncHubResult(AsyncResult): |
|
348 | class AsyncHubResult(AsyncResult): | |
299 | """Class to wrap pending results that must be requested from the Hub. |
|
349 | """Class to wrap pending results that must be requested from the Hub. |
@@ -46,7 +46,7 b' def remote(view, block=None, **flags):' | |||||
46 | return remote_function |
|
46 | return remote_function | |
47 |
|
47 | |||
48 | @skip_doctest |
|
48 | @skip_doctest | |
49 | def parallel(view, dist='b', block=None, **flags): |
|
49 | def parallel(view, dist='b', block=None, ordered=True, **flags): | |
50 | """Turn a function into a parallel remote function. |
|
50 | """Turn a function into a parallel remote function. | |
51 |
|
51 | |||
52 | This method can be used for map: |
|
52 | This method can be used for map: | |
@@ -57,7 +57,7 b" def parallel(view, dist='b', block=None, **flags):" | |||||
57 | """ |
|
57 | """ | |
58 |
|
58 | |||
59 | def parallel_function(f): |
|
59 | def parallel_function(f): | |
60 | return ParallelFunction(view, f, dist=dist, block=block, **flags) |
|
60 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) | |
61 | return parallel_function |
|
61 | return parallel_function | |
62 |
|
62 | |||
63 | #-------------------------------------------------------------------------- |
|
63 | #-------------------------------------------------------------------------- | |
@@ -122,15 +122,19 b' class ParallelFunction(RemoteFunction):' | |||||
122 | to use the current `block` attribute of `view` |
|
122 | to use the current `block` attribute of `view` | |
123 | chunksize : int or None |
|
123 | chunksize : int or None | |
124 | The size of chunk to use when breaking up sequences in a load-balanced manner |
|
124 | The size of chunk to use when breaking up sequences in a load-balanced manner | |
|
125 | ordered : bool [default: True] | |||
|
126 | Whether | |||
125 | **flags : remaining kwargs are passed to View.temp_flags |
|
127 | **flags : remaining kwargs are passed to View.temp_flags | |
126 | """ |
|
128 | """ | |
127 |
|
129 | |||
128 | chunksize=None |
|
130 | chunksize=None | |
|
131 | ordered=None | |||
129 | mapObject=None |
|
132 | mapObject=None | |
130 |
|
133 | |||
131 | def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): |
|
134 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | |
132 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) |
|
135 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |
133 | self.chunksize = chunksize |
|
136 | self.chunksize = chunksize | |
|
137 | self.ordered = ordered | |||
134 |
|
138 | |||
135 | mapClass = Map.dists[dist] |
|
139 | mapClass = Map.dists[dist] | |
136 | self.mapObject = mapClass() |
|
140 | self.mapObject = mapClass() | |
@@ -186,7 +190,10 b' class ParallelFunction(RemoteFunction):' | |||||
186 |
|
190 | |||
187 | msg_ids.append(ar.msg_ids[0]) |
|
191 | msg_ids.append(ar.msg_ids[0]) | |
188 |
|
192 | |||
189 |
r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, |
|
193 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | |
|
194 | fname=self.func.__name__, | |||
|
195 | ordered=self.ordered | |||
|
196 | ) | |||
190 |
|
197 | |||
191 | if self.block: |
|
198 | if self.block: | |
192 | try: |
|
199 | try: |
@@ -992,7 +992,7 b' class LoadBalancedView(View):' | |||||
992 | @spin_after |
|
992 | @spin_after | |
993 | @save_ids |
|
993 | @save_ids | |
994 | def map(self, f, *sequences, **kwargs): |
|
994 | def map(self, f, *sequences, **kwargs): | |
995 | """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult |
|
995 | """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult | |
996 |
|
996 | |||
997 | Parallel version of builtin `map`, load-balanced by this View. |
|
997 | Parallel version of builtin `map`, load-balanced by this View. | |
998 |
|
998 | |||
@@ -1009,14 +1009,20 b' class LoadBalancedView(View):' | |||||
1009 | function to be mapped |
|
1009 | function to be mapped | |
1010 | *sequences: one or more sequences of matching length |
|
1010 | *sequences: one or more sequences of matching length | |
1011 | the sequences to be distributed and passed to `f` |
|
1011 | the sequences to be distributed and passed to `f` | |
1012 | block : bool |
|
1012 | block : bool [default self.block] | |
1013 |
whether to wait for the result or not |
|
1013 | whether to wait for the result or not | |
1014 | track : bool |
|
1014 | track : bool | |
1015 | whether to create a MessageTracker to allow the user to |
|
1015 | whether to create a MessageTracker to allow the user to | |
1016 | safely edit after arrays and buffers during non-copying |
|
1016 | safely edit after arrays and buffers during non-copying | |
1017 | sends. |
|
1017 | sends. | |
1018 | chunksize : int |
|
1018 | chunksize : int [default 1] | |
1019 |
how many elements should be in each task |
|
1019 | how many elements should be in each task. | |
|
1020 | ordered : bool [default True] | |||
|
1021 | Whether the results should be gathered as they arrive, or enforce | |||
|
1022 | the order of submission. | |||
|
1023 | ||||
|
1024 | Only applies when iterating through AsyncMapResult as results arrive. | |||
|
1025 | Has no effect when block=True. | |||
1020 |
|
1026 | |||
1021 | Returns |
|
1027 | Returns | |
1022 | ------- |
|
1028 | ------- | |
@@ -1034,6 +1040,7 b' class LoadBalancedView(View):' | |||||
1034 | # default |
|
1040 | # default | |
1035 | block = kwargs.get('block', self.block) |
|
1041 | block = kwargs.get('block', self.block) | |
1036 | chunksize = kwargs.get('chunksize', 1) |
|
1042 | chunksize = kwargs.get('chunksize', 1) | |
|
1043 | ordered = kwargs.get('ordered', True) | |||
1037 |
|
1044 | |||
1038 | keyset = set(kwargs.keys()) |
|
1045 | keyset = set(kwargs.keys()) | |
1039 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) |
|
1046 | extra_keys = keyset.difference_update(set(['block', 'chunksize'])) | |
@@ -1042,7 +1049,7 b' class LoadBalancedView(View):' | |||||
1042 |
|
1049 | |||
1043 | assert len(sequences) > 0, "must have some sequences to map onto!" |
|
1050 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
1044 |
|
1051 | |||
1045 |
pf = ParallelFunction(self, f, block=block, |
|
1052 | pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered) | |
1046 | return pf.map(*sequences) |
|
1053 | return pf.map(*sequences) | |
1047 |
|
1054 | |||
1048 | __all__ = ['LoadBalancedView', 'DirectView'] |
|
1055 | __all__ = ['LoadBalancedView', 'DirectView'] |
@@ -58,6 +58,42 b' class TestLoadBalancedView(ClusterTestCase):' | |||||
58 | r = self.view.map_sync(f, data) |
|
58 | r = self.view.map_sync(f, data) | |
59 | self.assertEquals(r, map(f, data)) |
|
59 | self.assertEquals(r, map(f, data)) | |
60 |
|
60 | |||
|
61 | def test_map_unordered(self): | |||
|
62 | def f(x): | |||
|
63 | return x**2 | |||
|
64 | def slow_f(x): | |||
|
65 | import time | |||
|
66 | time.sleep(0.05*x) | |||
|
67 | return x**2 | |||
|
68 | data = range(16,0,-1) | |||
|
69 | reference = map(f, data) | |||
|
70 | ||||
|
71 | amr = self.view.map_async(f, data, ordered=False) | |||
|
72 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | |||
|
73 | # check individual elements, retrieved as they come (uses __iter__) | |||
|
74 | astheycame = list(amr) | |||
|
75 | # Ensure that at least one result came out of order: | |||
|
76 | self.assertNotEquals(astheycame, reference, "should not have preserved order") | |||
|
77 | self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted") | |||
|
78 | ||||
|
79 | def test_map_ordered(self): | |||
|
80 | def f(x): | |||
|
81 | return x**2 | |||
|
82 | def slow_f(x): | |||
|
83 | import time | |||
|
84 | time.sleep(0.05*x) | |||
|
85 | return x**2 | |||
|
86 | data = range(16,0,-1) | |||
|
87 | reference = map(f, data) | |||
|
88 | ||||
|
89 | amr = self.view.map_async(f, data) | |||
|
90 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | |||
|
91 | # check individual elements, retrieved as they come (uses __iter__) | |||
|
92 | astheycame = list(amr) | |||
|
93 | # Ensure that results came in order | |||
|
94 | self.assertEquals(astheycame, reference) | |||
|
95 | self.assertEquals(amr.result, reference) | |||
|
96 | ||||
61 | def test_abort(self): |
|
97 | def test_abort(self): | |
62 | view = self.view |
|
98 | view = self.view | |
63 | ar = self.client[:].apply_async(time.sleep, .5) |
|
99 | ar = self.client[:].apply_async(time.sleep, .5) |
General Comments 0
You need to be logged in to leave comments.
Login now