##// END OF EJS Templates
add unordered iteration to AsyncMapResults...
MinRK -
Show More
@@ -261,12 +261,19 b' class AsyncMapResult(AsyncResult):'
261 """Class for representing results of non-blocking gathers.
261 """Class for representing results of non-blocking gathers.
262
262
263 This will properly reconstruct the gather.
263 This will properly reconstruct the gather.
264
265 This class is iterable at any time, and will wait on results as they come.
266
267 If ordered=False, then the first results to arrive will come first, otherwise
268 results will be yielded in the order they were submitted.
269
264 """
270 """
265
271
266 def __init__(self, client, msg_ids, mapObject, fname=''):
272 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
267 AsyncResult.__init__(self, client, msg_ids, fname=fname)
273 AsyncResult.__init__(self, client, msg_ids, fname=fname)
268 self._mapObject = mapObject
274 self._mapObject = mapObject
269 self._single_result = False
275 self._single_result = False
276 self.ordered = ordered
270
277
271 def _reconstruct_result(self, res):
278 def _reconstruct_result(self, res):
272 """Perform the gather on the actual results."""
279 """Perform the gather on the actual results."""
@@ -274,6 +281,13 b' class AsyncMapResult(AsyncResult):'
274
281
275 # asynchronous iterator:
282 # asynchronous iterator:
276 def __iter__(self):
283 def __iter__(self):
284 it = self._ordered_iter if self.ordered else self._unordered_iter
285 for r in it():
286 yield r
287
288 # asynchronous ordered iterator:
289 def _ordered_iter(self):
290 """iterator for results *as they arrive*, preserving submission order."""
277 try:
291 try:
278 rlist = self.get(0)
292 rlist = self.get(0)
279 except error.TimeoutError:
293 except error.TimeoutError:
@@ -294,6 +308,42 b' class AsyncMapResult(AsyncResult):'
294 for r in rlist:
308 for r in rlist:
295 yield r
309 yield r
296
310
311 # asynchronous unordered iterator:
312 def _unordered_iter(self):
313 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
314 try:
315 rlist = self.get(0)
316 except error.TimeoutError:
317 pending = set(self.msg_ids)
318 while pending:
319 try:
320 self._client.wait(pending, 1e-3)
321 except error.TimeoutError:
322 # ignore timeout error, because that only means
323 # *some* jobs are outstanding
324 pass
325 # update ready set with those no longer outstanding:
326 ready = pending.difference(self._client.outstanding)
327 # update pending to exclude those that are finished
328 pending = pending.difference(ready)
329 while ready:
330 msg_id = ready.pop()
331 ar = AsyncResult(self._client, msg_id, self._fname)
332 rlist = ar.get()
333 try:
334 for r in rlist:
335 yield r
336 except TypeError:
337 # flattened, not a list
338 # this could get broken by flattened data that returns iterables
339 # but most calls to map do not expose the `flatten` argument
340 yield rlist
341 else:
342 # already done
343 for r in rlist:
344 yield r
345
346
297
347
298 class AsyncHubResult(AsyncResult):
348 class AsyncHubResult(AsyncResult):
299 """Class to wrap pending results that must be requested from the Hub.
349 """Class to wrap pending results that must be requested from the Hub.
@@ -46,7 +46,7 b' def remote(view, block=None, **flags):'
46 return remote_function
46 return remote_function
47
47
48 @skip_doctest
48 @skip_doctest
49 def parallel(view, dist='b', block=None, **flags):
49 def parallel(view, dist='b', block=None, ordered=True, **flags):
50 """Turn a function into a parallel remote function.
50 """Turn a function into a parallel remote function.
51
51
52 This method can be used for map:
52 This method can be used for map:
@@ -57,7 +57,7 b" def parallel(view, dist='b', block=None, **flags):"
57 """
57 """
58
58
59 def parallel_function(f):
59 def parallel_function(f):
60 return ParallelFunction(view, f, dist=dist, block=block, **flags)
60 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
61 return parallel_function
61 return parallel_function
62
62
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
@@ -122,15 +122,19 b' class ParallelFunction(RemoteFunction):'
122 to use the current `block` attribute of `view`
122 to use the current `block` attribute of `view`
123 chunksize : int or None
123 chunksize : int or None
124 The size of chunk to use when breaking up sequences in a load-balanced manner
124 The size of chunk to use when breaking up sequences in a load-balanced manner
125 ordered : bool [default: True]
126 Whether
125 **flags : remaining kwargs are passed to View.temp_flags
127 **flags : remaining kwargs are passed to View.temp_flags
126 """
128 """
127
129
128 chunksize=None
130 chunksize=None
131 ordered=None
129 mapObject=None
132 mapObject=None
130
133
131 def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags):
134 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
132 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
135 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
133 self.chunksize = chunksize
136 self.chunksize = chunksize
137 self.ordered = ordered
134
138
135 mapClass = Map.dists[dist]
139 mapClass = Map.dists[dist]
136 self.mapObject = mapClass()
140 self.mapObject = mapClass()
@@ -186,7 +190,10 b' class ParallelFunction(RemoteFunction):'
186
190
187 msg_ids.append(ar.msg_ids[0])
191 msg_ids.append(ar.msg_ids[0])
188
192
189 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__)
193 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
194 fname=self.func.__name__,
195 ordered=self.ordered
196 )
190
197
191 if self.block:
198 if self.block:
192 try:
199 try:
@@ -992,7 +992,7 b' class LoadBalancedView(View):'
992 @spin_after
992 @spin_after
993 @save_ids
993 @save_ids
994 def map(self, f, *sequences, **kwargs):
994 def map(self, f, *sequences, **kwargs):
995 """view.map(f, *sequences, block=self.block, chunksize=1) => list|AsyncMapResult
995 """view.map(f, *sequences, block=self.block, chunksize=1, ordered=True) => list|AsyncMapResult
996
996
997 Parallel version of builtin `map`, load-balanced by this View.
997 Parallel version of builtin `map`, load-balanced by this View.
998
998
@@ -1009,14 +1009,20 b' class LoadBalancedView(View):'
1009 function to be mapped
1009 function to be mapped
1010 *sequences: one or more sequences of matching length
1010 *sequences: one or more sequences of matching length
1011 the sequences to be distributed and passed to `f`
1011 the sequences to be distributed and passed to `f`
1012 block : bool
1012 block : bool [default self.block]
1013 whether to wait for the result or not [default self.block]
1013 whether to wait for the result or not
1014 track : bool
1014 track : bool
1015 whether to create a MessageTracker to allow the user to
1015 whether to create a MessageTracker to allow the user to
1016 safely edit after arrays and buffers during non-copying
1016 safely edit after arrays and buffers during non-copying
1017 sends.
1017 sends.
1018 chunksize : int
1018 chunksize : int [default 1]
1019 how many elements should be in each task [default 1]
1019 how many elements should be in each task.
1020 ordered : bool [default True]
1021 Whether the results should be gathered as they arrive, or enforce
1022 the order of submission.
1023
1024 Only applies when iterating through AsyncMapResult as results arrive.
1025 Has no effect when block=True.
1020
1026
1021 Returns
1027 Returns
1022 -------
1028 -------
@@ -1034,6 +1040,7 b' class LoadBalancedView(View):'
1034 # default
1040 # default
1035 block = kwargs.get('block', self.block)
1041 block = kwargs.get('block', self.block)
1036 chunksize = kwargs.get('chunksize', 1)
1042 chunksize = kwargs.get('chunksize', 1)
1043 ordered = kwargs.get('ordered', True)
1037
1044
1038 keyset = set(kwargs.keys())
1045 keyset = set(kwargs.keys())
1039 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1046 extra_keys = keyset.difference_update(set(['block', 'chunksize']))
@@ -1042,7 +1049,7 b' class LoadBalancedView(View):'
1042
1049
1043 assert len(sequences) > 0, "must have some sequences to map onto!"
1050 assert len(sequences) > 0, "must have some sequences to map onto!"
1044
1051
1045 pf = ParallelFunction(self, f, block=block, chunksize=chunksize)
1052 pf = ParallelFunction(self, f, block=block, chunksize=chunksize, ordered=ordered)
1046 return pf.map(*sequences)
1053 return pf.map(*sequences)
1047
1054
1048 __all__ = ['LoadBalancedView', 'DirectView']
1055 __all__ = ['LoadBalancedView', 'DirectView']
@@ -58,6 +58,42 b' class TestLoadBalancedView(ClusterTestCase):'
58 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
59 self.assertEquals(r, map(f, data))
59 self.assertEquals(r, map(f, data))
60
60
61 def test_map_unordered(self):
62 def f(x):
63 return x**2
64 def slow_f(x):
65 import time
66 time.sleep(0.05*x)
67 return x**2
68 data = range(16,0,-1)
69 reference = map(f, data)
70
71 amr = self.view.map_async(f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come (uses __iter__)
74 astheycame = list(amr)
75 # Ensure that at least one result came out of order:
76 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
78
79 def test_map_ordered(self):
80 def f(x):
81 return x**2
82 def slow_f(x):
83 import time
84 time.sleep(0.05*x)
85 return x**2
86 data = range(16,0,-1)
87 reference = map(f, data)
88
89 amr = self.view.map_async(f, data)
90 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 # check individual elements, retrieved as they come (uses __iter__)
92 astheycame = list(amr)
93 # Ensure that results came in order
94 self.assertEquals(astheycame, reference)
95 self.assertEquals(amr.result, reference)
96
61 def test_abort(self):
97 def test_abort(self):
62 view = self.view
98 view = self.view
63 ar = self.client[:].apply_async(time.sleep, .5)
99 ar = self.client[:].apply_async(time.sleep, .5)
General Comments 0
You need to be logged in to leave comments. Login now