##// END OF EJS Templates
support iterating through map results as they arrive
MinRK -
Show More
@@ -35,6 +35,8 b' class AsyncResult(object):'
35 35
36 36 def __init__(self, client, msg_ids, fname=''):
37 37 self._client = client
38 if isinstance(msg_ids, basestring):
39 msg_ids = [msg_ids]
38 40 self.msg_ids = msg_ids
39 41 self._fname=fname
40 42 self._ready = False
@@ -204,5 +206,27 b' class AsyncMapResult(AsyncResult):'
204 206 """Perform the gather on the actual results."""
205 207 return self._mapObject.joinPartitions(res)
206 208
209 # asynchronous iterator:
210 def __iter__(self):
211 try:
212 rlist = self.get(0)
213 except error.TimeoutError:
214 # wait for each result individually
215 for msg_id in self.msg_ids:
216 ar = AsyncResult(self._client, msg_id, self._fname)
217 rlist = ar.get()
218 try:
219 for r in rlist:
220 yield r
221 except TypeError:
222 # flattened, not a list
223 # this could get broken by flattened data that returns iterables
224 # but most calls to map do not expose the `flatten` argument
225 yield rlist
226 else:
227 # already done
228 for r in rlist:
229 yield r
230
207 231
208 232 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
@@ -1098,10 +1098,82 b' class Client(object):'
1098 1098 # Map and decorators
1099 1099 #--------------------------------------------------------------------------
1100 1100
1101 def map(self, f, *sequences):
1102 """Parallel version of builtin `map`, using all our engines."""
1101 def map(self, f, *sequences, **kwargs):
1102 """Parallel version of builtin `map`, using all our engines.
1103
1104 `block` and `targets` can be passed as keyword arguments only.
1105
1106 There will be one task per target, so work will be chunked
1107 if the sequences are longer than `targets`.
1108
1109 Results can be iterated as they are ready, but will become available in chunks.
1110
1111 Parameters
1112 ----------
1113
1114 f : callable
1115 function to be mapped
1116 *sequences: one or more sequences of matching length
1117 the sequences to be distributed and passed to `f`
1118 block : bool
1119 whether to wait for the result or not [default self.block]
1120 targets : valid targets
1121 targets to be used [default self.targets]
1122
1123 Returns
1124 -------
1125
1126 if block=False:
1127 AsyncMapResult
1128 An object like AsyncResult, but which reassembles the sequence of results
1129 into a single list. AsyncMapResults can be iterated through before all
1130 results are complete.
1131 else:
1132 the result of map(f,*sequences)
1133
1134 """
1135 block = kwargs.get('block', self.block)
1136 targets = kwargs.get('targets', self.targets)
1137 assert len(sequences) > 0, "must have some sequences to map onto!"
1138 pf = ParallelFunction(self, f, block=block,
1139 bound=True, targets=targets)
1140 return pf.map(*sequences)
1141
1142 def imap(self, f, *sequences, **kwargs):
1143 """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
1144
1145 Each element will be a separate task, and will be load-balanced. This
1146 lets individual elements be ready for iteration as soon as they come.
1147
1148 Parameters
1149 ----------
1150
1151 f : callable
1152 function to be mapped
1153 *sequences: one or more sequences of matching length
1154 the sequences to be distributed and passed to `f`
1155 block : bool
1156 whether to wait for the result or not [default self.block]
1157
1158 Returns
1159 -------
1160
1161 if block=False:
1162 AsyncMapResult
1163 An object like AsyncResult, but which reassembles the sequence of results
1164 into a single list. AsyncMapResults can be iterated through before all
1165 results are complete.
1166 else:
1167 the result of map(f,*sequences)
1168
1169 """
1170
1171 block = kwargs.get('block', self.block)
1172
1173 assert len(sequences) > 0, "must have some sequences to map onto!"
1174
1103 1175 pf = ParallelFunction(self, f, block=self.block,
1104 bound=True, targets='all')
1176 bound=True, targets=None)
1105 1177 return pf.map(*sequences)
1106 1178
1107 1179 def parallel(self, bound=True, targets='all', block=True):
@@ -283,7 +283,9 b' class Kernel(SessionFactory):'
283 283 return self.completer.complete(msg.content.line, msg.content.text)
284 284
285 285 def apply_request(self, stream, ident, parent):
286 # print (parent)
286 # flush previous reply, so this request won't block it
287 stream.flush(zmq.POLLOUT)
288
287 289 try:
288 290 content = parent[u'content']
289 291 bufs = parent[u'buffers']
@@ -354,7 +356,7 b' class Kernel(SessionFactory):'
354 356
355 357 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
356 358 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
357
359
358 360 # if reply_msg['content']['status'] == u'error':
359 361 # self.abort_queues()
360 362
General Comments 0
You need to be logged in to leave comments. Login now