Show More
@@ -168,6 +168,7 b' class ParallelFunction(RemoteFunction):' | |||||
168 | chunksize=None |
|
168 | chunksize=None | |
169 | ordered=None |
|
169 | ordered=None | |
170 | mapObject=None |
|
170 | mapObject=None | |
|
171 | _mapping = False | |||
171 |
|
172 | |||
172 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): |
|
173 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | |
173 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) |
|
174 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |
@@ -176,17 +177,27 b' class ParallelFunction(RemoteFunction):' | |||||
176 |
|
177 | |||
177 | mapClass = Map.dists[dist] |
|
178 | mapClass = Map.dists[dist] | |
178 | self.mapObject = mapClass() |
|
179 | self.mapObject = mapClass() | |
179 |
|
180 | |||
180 | @sync_view_results |
|
181 | @sync_view_results | |
181 | def __call__(self, *sequences): |
|
182 | def __call__(self, *sequences): | |
182 | client = self.view.client |
|
183 | client = self.view.client | |
183 |
|
184 | |||
|
185 | lens = [] | |||
|
186 | for i, seq in enumerate(sequences): | |||
|
187 | try: | |||
|
188 | n = len(seq) | |||
|
189 | except Exception: | |||
|
190 | seq = list(seq) | |||
|
191 | sequences[i] = seq | |||
|
192 | n = len(seq) | |||
|
193 | lens.append(n) | |||
|
194 | ||||
184 | # check that the length of sequences match |
|
195 | # check that the length of sequences match | |
185 |
len_0 = len |
|
196 | len_0 = lens[0] | |
186 | for s in sequences: |
|
197 | if min(lens) != len_0: | |
187 | if len(s)!=len_0: |
|
198 | msg = 'all sequences must have equal length, but have %s' % lens | |
188 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) |
|
199 | raise ValueError(msg) | |
189 | raise ValueError(msg) |
|
200 | ||
190 | balanced = 'Balanced' in self.view.__class__.__name__ |
|
201 | balanced = 'Balanced' in self.view.__class__.__name__ | |
191 | if balanced: |
|
202 | if balanced: | |
192 | if self.chunksize: |
|
203 | if self.chunksize: | |
@@ -219,8 +230,7 b' class ParallelFunction(RemoteFunction):' | |||||
219 | if not args: |
|
230 | if not args: | |
220 | continue |
|
231 | continue | |
221 |
|
232 | |||
222 |
|
|
233 | if self._mapping: | |
223 | if hasattr(self, '_map'): |
|
|||
224 | if sys.version_info[0] >= 3: |
|
234 | if sys.version_info[0] >= 3: | |
225 | f = lambda f, *sequences: list(map(f, *sequences)) |
|
235 | f = lambda f, *sequences: list(map(f, *sequences)) | |
226 | else: |
|
236 | else: | |
@@ -253,12 +263,12 b' class ParallelFunction(RemoteFunction):' | |||||
253 | This should behave very much like the builtin map, but return an AsyncMapResult |
|
263 | This should behave very much like the builtin map, but return an AsyncMapResult | |
254 | if self.block is False. |
|
264 | if self.block is False. | |
255 | """ |
|
265 | """ | |
256 | # set _map as a flag for use inside self.__call__ |
|
266 | # set _mapping as a flag for use inside self.__call__ | |
257 | self._map = True |
|
267 | self._mapping = True | |
258 | try: |
|
268 | try: | |
259 |
ret = self |
|
269 | ret = self(*sequences) | |
260 | finally: |
|
270 | finally: | |
261 |
|
|
271 | self._mapping = False | |
262 | return ret |
|
272 | return ret | |
263 |
|
273 | |||
264 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |
|
274 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |
General Comments 0
You need to be logged in to leave comments.
Login now