Show More
@@ -35,6 +35,8 b' class AsyncResult(object):' | |||||
35 |
|
35 | |||
36 | def __init__(self, client, msg_ids, fname=''): |
|
36 | def __init__(self, client, msg_ids, fname=''): | |
37 | self._client = client |
|
37 | self._client = client | |
|
38 | if isinstance(msg_ids, basestring): | |||
|
39 | msg_ids = [msg_ids] | |||
38 | self.msg_ids = msg_ids |
|
40 | self.msg_ids = msg_ids | |
39 | self._fname=fname |
|
41 | self._fname=fname | |
40 | self._ready = False |
|
42 | self._ready = False | |
@@ -204,5 +206,27 b' class AsyncMapResult(AsyncResult):' | |||||
204 | """Perform the gather on the actual results.""" |
|
206 | """Perform the gather on the actual results.""" | |
205 | return self._mapObject.joinPartitions(res) |
|
207 | return self._mapObject.joinPartitions(res) | |
206 |
|
208 | |||
|
209 | # asynchronous iterator: | |||
|
210 | def __iter__(self): | |||
|
211 | try: | |||
|
212 | rlist = self.get(0) | |||
|
213 | except error.TimeoutError: | |||
|
214 | # wait for each result individually | |||
|
215 | for msg_id in self.msg_ids: | |||
|
216 | ar = AsyncResult(self._client, msg_id, self._fname) | |||
|
217 | rlist = ar.get() | |||
|
218 | try: | |||
|
219 | for r in rlist: | |||
|
220 | yield r | |||
|
221 | except TypeError: | |||
|
222 | # flattened, not a list | |||
|
223 | # this could get broken by flattened data that returns iterables | |||
|
224 | # but most calls to map do not expose the `flatten` argument | |||
|
225 | yield rlist | |||
|
226 | else: | |||
|
227 | # already done | |||
|
228 | for r in rlist: | |||
|
229 | yield r | |||
|
230 | ||||
207 |
|
231 | |||
208 | __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file |
|
232 | __all__ = ['AsyncResult', 'AsyncMapResult'] |
@@ -1098,10 +1098,82 b' class Client(object):' | |||||
1098 | # Map and decorators |
|
1098 | # Map and decorators | |
1099 | #-------------------------------------------------------------------------- |
|
1099 | #-------------------------------------------------------------------------- | |
1100 |
|
1100 | |||
1101 | def map(self, f, *sequences): |
|
1101 | def map(self, f, *sequences, **kwargs): | |
1102 |
"""Parallel version of builtin `map`, using all our engines. |
|
1102 | """Parallel version of builtin `map`, using all our engines. | |
|
1103 | ||||
|
1104 | `block` and `targets` can be passed as keyword arguments only. | |||
|
1105 | ||||
|
1106 | There will be one task per target, so work will be chunked | |||
|
1107 | if the sequences are longer than `targets`. | |||
|
1108 | ||||
|
1109 | Results can be iterated as they are ready, but will become available in chunks. | |||
|
1110 | ||||
|
1111 | Parameters | |||
|
1112 | ---------- | |||
|
1113 | ||||
|
1114 | f : callable | |||
|
1115 | function to be mapped | |||
|
1116 | *sequences: one or more sequences of matching length | |||
|
1117 | the sequences to be distributed and passed to `f` | |||
|
1118 | block : bool | |||
|
1119 | whether to wait for the result or not [default self.block] | |||
|
1120 | targets : valid targets | |||
|
1121 | targets to be used [default self.targets] | |||
|
1122 | ||||
|
1123 | Returns | |||
|
1124 | ------- | |||
|
1125 | ||||
|
1126 | if block=False: | |||
|
1127 | AsyncMapResult | |||
|
1128 | An object like AsyncResult, but which reassembles the sequence of results | |||
|
1129 | into a single list. AsyncMapResults can be iterated through before all | |||
|
1130 | results are complete. | |||
|
1131 | else: | |||
|
1132 | the result of map(f,*sequences) | |||
|
1133 | ||||
|
1134 | """ | |||
|
1135 | block = kwargs.get('block', self.block) | |||
|
1136 | targets = kwargs.get('targets', self.targets) | |||
|
1137 | assert len(sequences) > 0, "must have some sequences to map onto!" | |||
|
1138 | pf = ParallelFunction(self, f, block=block, | |||
|
1139 | bound=True, targets=targets) | |||
|
1140 | return pf.map(*sequences) | |||
|
1141 | ||||
|
1142 | def imap(self, f, *sequences, **kwargs): | |||
|
1143 | """Parallel version of builtin `itertools.imap`, load-balanced across all engines. | |||
|
1144 | ||||
|
1145 | Each element will be a separate task, and will be load-balanced. This | |||
|
1146 | lets individual elements be ready for iteration as soon as they come. | |||
|
1147 | ||||
|
1148 | Parameters | |||
|
1149 | ---------- | |||
|
1150 | ||||
|
1151 | f : callable | |||
|
1152 | function to be mapped | |||
|
1153 | *sequences: one or more sequences of matching length | |||
|
1154 | the sequences to be distributed and passed to `f` | |||
|
1155 | block : bool | |||
|
1156 | whether to wait for the result or not [default self.block] | |||
|
1157 | ||||
|
1158 | Returns | |||
|
1159 | ------- | |||
|
1160 | ||||
|
1161 | if block=False: | |||
|
1162 | AsyncMapResult | |||
|
1163 | An object like AsyncResult, but which reassembles the sequence of results | |||
|
1164 | into a single list. AsyncMapResults can be iterated through before all | |||
|
1165 | results are complete. | |||
|
1166 | else: | |||
|
1167 | the result of map(f,*sequences) | |||
|
1168 | ||||
|
1169 | """ | |||
|
1170 | ||||
|
1171 | block = kwargs.get('block', self.block) | |||
|
1172 | ||||
|
1173 | assert len(sequences) > 0, "must have some sequences to map onto!" | |||
|
1174 | ||||
1103 | pf = ParallelFunction(self, f, block=self.block, |
|
1175 | pf = ParallelFunction(self, f, block=self.block, | |
1104 |
bound=True, targets= |
|
1176 | bound=True, targets=None) | |
1105 | return pf.map(*sequences) |
|
1177 | return pf.map(*sequences) | |
1106 |
|
1178 | |||
1107 | def parallel(self, bound=True, targets='all', block=True): |
|
1179 | def parallel(self, bound=True, targets='all', block=True): |
@@ -283,7 +283,9 b' class Kernel(SessionFactory):' | |||||
283 | return self.completer.complete(msg.content.line, msg.content.text) |
|
283 | return self.completer.complete(msg.content.line, msg.content.text) | |
284 |
|
284 | |||
285 | def apply_request(self, stream, ident, parent): |
|
285 | def apply_request(self, stream, ident, parent): | |
286 | # print (parent) |
|
286 | # flush previous reply, so this request won't block it | |
|
287 | stream.flush(zmq.POLLOUT) | |||
|
288 | ||||
287 | try: |
|
289 | try: | |
288 | content = parent[u'content'] |
|
290 | content = parent[u'content'] | |
289 | bufs = parent[u'buffers'] |
|
291 | bufs = parent[u'buffers'] | |
@@ -354,7 +356,7 b' class Kernel(SessionFactory):' | |||||
354 |
|
356 | |||
355 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
357 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
356 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
358 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |
357 |
|
359 | |||
358 | # if reply_msg['content']['status'] == u'error': |
|
360 | # if reply_msg['content']['status'] == u'error': | |
359 | # self.abort_queues() |
|
361 | # self.abort_queues() | |
360 |
|
362 |
General Comments 0
You need to be logged in to leave comments.
Login now