From 46d7c9dafca1f430834d908792876a0af624c996 2011-04-08 00:38:19 From: MinRK Date: 2011-04-08 00:38:19 Subject: [PATCH] support iterating through map results as they arrive --- diff --git a/IPython/zmq/parallel/asyncresult.py b/IPython/zmq/parallel/asyncresult.py index 2d855fa..d910004 100644 --- a/IPython/zmq/parallel/asyncresult.py +++ b/IPython/zmq/parallel/asyncresult.py @@ -35,6 +35,8 @@ class AsyncResult(object): def __init__(self, client, msg_ids, fname=''): self._client = client + if isinstance(msg_ids, basestring): + msg_ids = [msg_ids] self.msg_ids = msg_ids self._fname=fname self._ready = False @@ -204,5 +206,27 @@ class AsyncMapResult(AsyncResult): """Perform the gather on the actual results.""" return self._mapObject.joinPartitions(res) + # asynchronous iterator: + def __iter__(self): + try: + rlist = self.get(0) + except error.TimeoutError: + # wait for each result individually + for msg_id in self.msg_ids: + ar = AsyncResult(self._client, msg_id, self._fname) + rlist = ar.get() + try: + for r in rlist: + yield r + except TypeError: + # flattened, not a list + # this could get broken by flattened data that returns iterables + # but most calls to map do not expose the `flatten` argument + yield rlist + else: + # already done + for r in rlist: + yield r + __all__ = ['AsyncResult', 'AsyncMapResult'] \ No newline at end of file diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 65c8e62..d165b71 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -1098,10 +1098,82 @@ class Client(object): # Map and decorators #-------------------------------------------------------------------------- - def map(self, f, *sequences): - """Parallel version of builtin `map`, using all our engines.""" + def map(self, f, *sequences, **kwargs): + """Parallel version of builtin `map`, using all our engines. + + `block` and `targets` can be passed as keyword arguments only. + + There will be one task per target, so work will be chunked + if the sequences are longer than `targets`. + + Results can be iterated as they are ready, but will become available in chunks. + + Parameters + ---------- + + f : callable + function to be mapped + *sequences: one or more sequences of matching length + the sequences to be distributed and passed to `f` + block : bool + whether to wait for the result or not [default self.block] + targets : valid targets + targets to be used [default self.targets] + + Returns + ------- + + if block=False: + AsyncMapResult + An object like AsyncResult, but which reassembles the sequence of results + into a single list. AsyncMapResults can be iterated through before all + results are complete. + else: + the result of map(f,*sequences) + + """ + block = kwargs.get('block', self.block) + targets = kwargs.get('targets', self.targets) + assert len(sequences) > 0, "must have some sequences to map onto!" + pf = ParallelFunction(self, f, block=block, + bound=True, targets=targets) + return pf.map(*sequences) + + def imap(self, f, *sequences, **kwargs): + """Parallel version of builtin `itertools.imap`, load-balanced across all engines. + + Each element will be a separate task, and will be load-balanced. This + lets individual elements be ready for iteration as soon as they come. + + Parameters + ---------- + + f : callable + function to be mapped + *sequences: one or more sequences of matching length + the sequences to be distributed and passed to `f` + block : bool + whether to wait for the result or not [default self.block] + + Returns + ------- + + if block=False: + AsyncMapResult + An object like AsyncResult, but which reassembles the sequence of results + into a single list. AsyncMapResults can be iterated through before all + results are complete. + else: + the result of map(f,*sequences) + + """ + + block = kwargs.get('block', self.block) + + assert len(sequences) > 0, "must have some sequences to map onto!" + pf = ParallelFunction(self, f, block=self.block, - bound=True, targets='all') + bound=True, targets=None) return pf.map(*sequences) def parallel(self, bound=True, targets='all', block=True): diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index a34d0ca..0455ed3 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -283,7 +283,9 @@ class Kernel(SessionFactory): return self.completer.complete(msg.content.line, msg.content.text) def apply_request(self, stream, ident, parent): - # print (parent) + # flush previous reply, so this request won't block it + stream.flush(zmq.POLLOUT) + try: content = parent[u'content'] bufs = parent[u'buffers'] @@ -354,7 +356,7 @@ class Kernel(SessionFactory): reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf, subheader=sub) - + # if reply_msg['content']['status'] == u'error': # self.abort_queues()