Show More
@@ -161,13 +161,14 b' class ParallelFunction(RemoteFunction):' | |||||
161 | chunksize : int or None |
|
161 | chunksize : int or None | |
162 | The size of chunk to use when breaking up sequences in a load-balanced manner |
|
162 | The size of chunk to use when breaking up sequences in a load-balanced manner | |
163 | ordered : bool [default: True] |
|
163 | ordered : bool [default: True] | |
164 | Whether |
|
164 | Whether the result should be kept in order. If False, | |
|
165 | results become available as they arrive, regardless of submission order. | |||
165 | **flags : remaining kwargs are passed to View.temp_flags |
|
166 | **flags : remaining kwargs are passed to View.temp_flags | |
166 | """ |
|
167 | """ | |
167 |
|
168 | |||
168 | chunksize=None |
|
169 | chunksize = None | |
169 | ordered=None |
|
170 | ordered = None | |
170 | mapObject=None |
|
171 | mapObject = None | |
171 | _mapping = False |
|
172 | _mapping = False | |
172 |
|
173 | |||
173 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): |
|
174 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | |
@@ -183,6 +184,7 b' class ParallelFunction(RemoteFunction):' | |||||
183 | client = self.view.client |
|
184 | client = self.view.client | |
184 |
|
185 | |||
185 | lens = [] |
|
186 | lens = [] | |
|
187 | maxlen = minlen = -1 | |||
186 | for i, seq in enumerate(sequences): |
|
188 | for i, seq in enumerate(sequences): | |
187 | try: |
|
189 | try: | |
188 | n = len(seq) |
|
190 | n = len(seq) | |
@@ -190,20 +192,23 b' class ParallelFunction(RemoteFunction):' | |||||
190 | seq = list(seq) |
|
192 | seq = list(seq) | |
191 | sequences[i] = seq |
|
193 | sequences[i] = seq | |
192 | n = len(seq) |
|
194 | n = len(seq) | |
|
195 | if n > maxlen: | |||
|
196 | maxlen = n | |||
|
197 | if minlen == -1 or n < minlen: | |||
|
198 | minlen = n | |||
193 | lens.append(n) |
|
199 | lens.append(n) | |
194 |
|
200 | |||
195 | # check that the length of sequences match |
|
201 | # check that the length of sequences match | |
196 | len_0 = lens[0] |
|
202 | if not self._mapping and minlen != maxlen: | |
197 | if min(lens) != len_0: |
|
|||
198 | msg = 'all sequences must have equal length, but have %s' % lens |
|
203 | msg = 'all sequences must have equal length, but have %s' % lens | |
199 | raise ValueError(msg) |
|
204 | raise ValueError(msg) | |
200 |
|
205 | |||
201 | balanced = 'Balanced' in self.view.__class__.__name__ |
|
206 | balanced = 'Balanced' in self.view.__class__.__name__ | |
202 | if balanced: |
|
207 | if balanced: | |
203 | if self.chunksize: |
|
208 | if self.chunksize: | |
204 |
nparts = |
|
209 | nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0) | |
205 | else: |
|
210 | else: | |
206 |
nparts = len |
|
211 | nparts = maxlen | |
207 | targets = [None]*nparts |
|
212 | targets = [None]*nparts | |
208 | else: |
|
213 | else: | |
209 | if self.chunksize: |
|
214 | if self.chunksize: | |
@@ -222,12 +227,9 b' class ParallelFunction(RemoteFunction):' | |||||
222 | for index, t in enumerate(targets): |
|
227 | for index, t in enumerate(targets): | |
223 | args = [] |
|
228 | args = [] | |
224 | for seq in sequences: |
|
229 | for seq in sequences: | |
225 | part = self.mapObject.getPartition(seq, index, nparts) |
|
230 | part = self.mapObject.getPartition(seq, index, nparts, maxlen) | |
226 |
|
|
231 | args.append(part) | |
227 | continue |
|
232 | if not any(args): | |
228 | else: |
|
|||
229 | args.append(part) |
|
|||
230 | if not args: |
|
|||
231 | continue |
|
233 | continue | |
232 |
|
234 | |||
233 | if self._mapping: |
|
235 | if self._mapping: | |
@@ -235,7 +237,7 b' class ParallelFunction(RemoteFunction):' | |||||
235 | f = lambda f, *sequences: list(map(f, *sequences)) |
|
237 | f = lambda f, *sequences: list(map(f, *sequences)) | |
236 | else: |
|
238 | else: | |
237 | f = map |
|
239 | f = map | |
238 | args = [self.func]+args |
|
240 | args = [self.func] + args | |
239 | else: |
|
241 | else: | |
240 | f=self.func |
|
242 | f=self.func | |
241 |
|
243 | |||
@@ -243,9 +245,9 b' class ParallelFunction(RemoteFunction):' | |||||
243 | with view.temp_flags(block=False, **self.flags): |
|
245 | with view.temp_flags(block=False, **self.flags): | |
244 | ar = view.apply(f, *args) |
|
246 | ar = view.apply(f, *args) | |
245 |
|
247 | |||
246 |
msg_ids. |
|
248 | msg_ids.extend(ar.msg_ids) | |
247 |
|
249 | |||
248 |
r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, |
|
250 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | |
249 | fname=getname(self.func), |
|
251 | fname=getname(self.func), | |
250 | ordered=self.ordered |
|
252 | ordered=self.ordered | |
251 | ) |
|
253 | ) |
General Comments 0
You need to be logged in to leave comments.
Login now