Show More
@@ -35,6 +35,8 b' class AsyncResult(object):' | |||
|
35 | 35 | |
|
36 | 36 | def __init__(self, client, msg_ids, fname=''): |
|
37 | 37 | self._client = client |
|
38 | if isinstance(msg_ids, basestring): | |
|
39 | msg_ids = [msg_ids] | |
|
38 | 40 | self.msg_ids = msg_ids |
|
39 | 41 | self._fname=fname |
|
40 | 42 | self._ready = False |
@@ -204,5 +206,27 b' class AsyncMapResult(AsyncResult):' | |||
|
204 | 206 | """Perform the gather on the actual results.""" |
|
205 | 207 | return self._mapObject.joinPartitions(res) |
|
206 | 208 | |
|
209 | # asynchronous iterator: | |
|
210 | def __iter__(self): | |
|
211 | try: | |
|
212 | rlist = self.get(0) | |
|
213 | except error.TimeoutError: | |
|
214 | # wait for each result individually | |
|
215 | for msg_id in self.msg_ids: | |
|
216 | ar = AsyncResult(self._client, msg_id, self._fname) | |
|
217 | rlist = ar.get() | |
|
218 | try: | |
|
219 | for r in rlist: | |
|
220 | yield r | |
|
221 | except TypeError: | |
|
222 | # flattened, not a list | |
|
223 | # this could get broken by flattened data that returns iterables | |
|
224 | # but most calls to map do not expose the `flatten` argument | |
|
225 | yield rlist | |
|
226 | else: | |
|
227 | # already done | |
|
228 | for r in rlist: | |
|
229 | yield r | |
|
230 | ||
|
207 | 231 | |
|
208 | 232 | __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file |
@@ -1098,10 +1098,82 b' class Client(object):' | |||
|
1098 | 1098 | # Map and decorators |
|
1099 | 1099 | #-------------------------------------------------------------------------- |
|
1100 | 1100 | |
|
1101 | def map(self, f, *sequences): | |
|
1102 |
"""Parallel version of builtin `map`, using all our engines. |
|
|
1101 | def map(self, f, *sequences, **kwargs): | |
|
1102 | """Parallel version of builtin `map`, using all our engines. | |
|
1103 | ||
|
1104 | `block` and `targets` can be passed as keyword arguments only. | |
|
1105 | ||
|
1106 | There will be one task per target, so work will be chunked | |
|
1107 | if the sequences are longer than `targets`. | |
|
1108 | ||
|
1109 | Results can be iterated as they are ready, but will become available in chunks. | |
|
1110 | ||
|
1111 | Parameters | |
|
1112 | ---------- | |
|
1113 | ||
|
1114 | f : callable | |
|
1115 | function to be mapped | |
|
1116 | *sequences: one or more sequences of matching length | |
|
1117 | the sequences to be distributed and passed to `f` | |
|
1118 | block : bool | |
|
1119 | whether to wait for the result or not [default self.block] | |
|
1120 | targets : valid targets | |
|
1121 | targets to be used [default self.targets] | |
|
1122 | ||
|
1123 | Returns | |
|
1124 | ------- | |
|
1125 | ||
|
1126 | if block=False: | |
|
1127 | AsyncMapResult | |
|
1128 | An object like AsyncResult, but which reassembles the sequence of results | |
|
1129 | into a single list. AsyncMapResults can be iterated through before all | |
|
1130 | results are complete. | |
|
1131 | else: | |
|
1132 | the result of map(f,*sequences) | |
|
1133 | ||
|
1134 | """ | |
|
1135 | block = kwargs.get('block', self.block) | |
|
1136 | targets = kwargs.get('targets', self.targets) | |
|
1137 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
1138 | pf = ParallelFunction(self, f, block=block, | |
|
1139 | bound=True, targets=targets) | |
|
1140 | return pf.map(*sequences) | |
|
1141 | ||
|
1142 | def imap(self, f, *sequences, **kwargs): | |
|
1143 | """Parallel version of builtin `itertools.imap`, load-balanced across all engines. | |
|
1144 | ||
|
1145 | Each element will be a separate task, and will be load-balanced. This | |
|
1146 | lets individual elements be ready for iteration as soon as they come. | |
|
1147 | ||
|
1148 | Parameters | |
|
1149 | ---------- | |
|
1150 | ||
|
1151 | f : callable | |
|
1152 | function to be mapped | |
|
1153 | *sequences: one or more sequences of matching length | |
|
1154 | the sequences to be distributed and passed to `f` | |
|
1155 | block : bool | |
|
1156 | whether to wait for the result or not [default self.block] | |
|
1157 | ||
|
1158 | Returns | |
|
1159 | ------- | |
|
1160 | ||
|
1161 | if block=False: | |
|
1162 | AsyncMapResult | |
|
1163 | An object like AsyncResult, but which reassembles the sequence of results | |
|
1164 | into a single list. AsyncMapResults can be iterated through before all | |
|
1165 | results are complete. | |
|
1166 | else: | |
|
1167 | the result of map(f,*sequences) | |
|
1168 | ||
|
1169 | """ | |
|
1170 | ||
|
1171 | block = kwargs.get('block', self.block) | |
|
1172 | ||
|
1173 | assert len(sequences) > 0, "must have some sequences to map onto!" | |
|
1174 | ||
|
1103 | 1175 | pf = ParallelFunction(self, f, block=self.block, |
|
1104 |
bound=True, targets= |
|
|
1176 | bound=True, targets=None) | |
|
1105 | 1177 | return pf.map(*sequences) |
|
1106 | 1178 | |
|
1107 | 1179 | def parallel(self, bound=True, targets='all', block=True): |
@@ -283,7 +283,9 b' class Kernel(SessionFactory):' | |||
|
283 | 283 | return self.completer.complete(msg.content.line, msg.content.text) |
|
284 | 284 | |
|
285 | 285 | def apply_request(self, stream, ident, parent): |
|
286 | # print (parent) | |
|
286 | # flush previous reply, so this request won't block it | |
|
287 | stream.flush(zmq.POLLOUT) | |
|
288 | ||
|
287 | 289 | try: |
|
288 | 290 | content = parent[u'content'] |
|
289 | 291 | bufs = parent[u'buffers'] |
@@ -354,7 +356,7 b' class Kernel(SessionFactory):' | |||
|
354 | 356 | |
|
355 | 357 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
356 | 358 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
357 | ||
|
359 | ||
|
358 | 360 | # if reply_msg['content']['status'] == u'error': |
|
359 | 361 | # self.abort_queues() |
|
360 | 362 |
General Comments 0
You need to be logged in to leave comments.
Login now