##// END OF EJS Templates
allow view.map to accept unequal sequence sizes...
MinRK -
Show More
@@ -161,13 +161,14 b' class ParallelFunction(RemoteFunction):'
161 chunksize : int or None
161 chunksize : int or None
162 The size of chunk to use when breaking up sequences in a load-balanced manner
162 The size of chunk to use when breaking up sequences in a load-balanced manner
163 ordered : bool [default: True]
163 ordered : bool [default: True]
164 Whether
164 Whether the result should be kept in order. If False,
165 results become available as they arrive, regardless of submission order.
165 **flags : remaining kwargs are passed to View.temp_flags
166 **flags : remaining kwargs are passed to View.temp_flags
166 """
167 """
167
168
168 chunksize=None
169 chunksize = None
169 ordered=None
170 ordered = None
170 mapObject=None
171 mapObject = None
171 _mapping = False
172 _mapping = False
172
173
173 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
@@ -183,6 +184,7 b' class ParallelFunction(RemoteFunction):'
183 client = self.view.client
184 client = self.view.client
184
185
185 lens = []
186 lens = []
187 maxlen = minlen = -1
186 for i, seq in enumerate(sequences):
188 for i, seq in enumerate(sequences):
187 try:
189 try:
188 n = len(seq)
190 n = len(seq)
@@ -190,20 +192,23 b' class ParallelFunction(RemoteFunction):'
190 seq = list(seq)
192 seq = list(seq)
191 sequences[i] = seq
193 sequences[i] = seq
192 n = len(seq)
194 n = len(seq)
195 if n > maxlen:
196 maxlen = n
197 if minlen == -1 or n < minlen:
198 minlen = n
193 lens.append(n)
199 lens.append(n)
194
200
195 # check that the length of sequences match
201 # check that the length of sequences match
196 len_0 = lens[0]
202 if not self._mapping and minlen != maxlen:
197 if min(lens) != len_0:
198 msg = 'all sequences must have equal length, but have %s' % lens
203 msg = 'all sequences must have equal length, but have %s' % lens
199 raise ValueError(msg)
204 raise ValueError(msg)
200
205
201 balanced = 'Balanced' in self.view.__class__.__name__
206 balanced = 'Balanced' in self.view.__class__.__name__
202 if balanced:
207 if balanced:
203 if self.chunksize:
208 if self.chunksize:
204 nparts = len_0//self.chunksize + int(len_0%self.chunksize > 0)
209 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
205 else:
210 else:
206 nparts = len_0
211 nparts = maxlen
207 targets = [None]*nparts
212 targets = [None]*nparts
208 else:
213 else:
209 if self.chunksize:
214 if self.chunksize:
@@ -222,12 +227,9 b' class ParallelFunction(RemoteFunction):'
222 for index, t in enumerate(targets):
227 for index, t in enumerate(targets):
223 args = []
228 args = []
224 for seq in sequences:
229 for seq in sequences:
225 part = self.mapObject.getPartition(seq, index, nparts)
230 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
226 if len(part) == 0:
231 args.append(part)
227 continue
232 if not any(args):
228 else:
229 args.append(part)
230 if not args:
231 continue
233 continue
232
234
233 if self._mapping:
235 if self._mapping:
@@ -235,7 +237,7 b' class ParallelFunction(RemoteFunction):'
235 f = lambda f, *sequences: list(map(f, *sequences))
237 f = lambda f, *sequences: list(map(f, *sequences))
236 else:
238 else:
237 f = map
239 f = map
238 args = [self.func]+args
240 args = [self.func] + args
239 else:
241 else:
240 f=self.func
242 f=self.func
241
243
@@ -243,9 +245,9 b' class ParallelFunction(RemoteFunction):'
243 with view.temp_flags(block=False, **self.flags):
245 with view.temp_flags(block=False, **self.flags):
244 ar = view.apply(f, *args)
246 ar = view.apply(f, *args)
245
247
246 msg_ids.append(ar.msg_ids[0])
248 msg_ids.extend(ar.msg_ids)
247
249
248 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
250 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
249 fname=getname(self.func),
251 fname=getname(self.func),
250 ordered=self.ordered
252 ordered=self.ordered
251 )
253 )
General Comments 0
You need to be logged in to leave comments. Login now