##// END OF EJS Templates
Merge with upstream
Merge with upstream

File last commit:

r1338:72652d65
r1705:34eca292 merge
Show More
nwmerge.py
120 lines | 3.9 KiB | text/x-python | PythonLexer
"""Example showing how to merge multiple remote data streams.
"""
# Slightly modified version of:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
import heapq
from IPython.kernel.error import CompositeError
def mergesort(list_of_lists, key=None):
""" Perform an N-way merge operation on sorted lists.
@param list_of_lists: (really iterable of iterable) of sorted elements
(either by naturally or by C{key})
@param key: specify sort key function (like C{sort()}, C{sorted()})
Yields tuples of the form C{(item, iterator)}, where the iterator is the
built-in list iterator or something you pass in, if you pre-generate the
iterators.
This is a stable merge; complexity O(N lg N)
Examples::
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]]))
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
# note stability
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]],
... key=int))
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
>>> print list(mergesort([[4, 3, 2, 1],
... [7, 6, 4.5, 3.75, 3.25, 2],
... [9, 6.625, 3.625, 2.625]],
... key=lambda x: -x))
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
"""
heap = []
for i, itr in enumerate(iter(pl) for pl in list_of_lists):
try:
item = itr.next()
toadd = (key(item), i, item, itr) if key else (item, i, itr)
heap.append(toadd)
except StopIteration:
pass
heapq.heapify(heap)
if key:
while heap:
_, idx, item, itr = heap[0]
yield item
try:
item = itr.next()
heapq.heapreplace(heap, (key(item), idx, item, itr) )
except StopIteration:
heapq.heappop(heap)
else:
while heap:
item, idx, itr = heap[0]
yield item
try:
heapq.heapreplace(heap, (itr.next(), idx, itr))
except StopIteration:
heapq.heappop(heap)
def remote_iterator(rc,engine,name):
"""Return an iterator on an object living on a remote engine.
"""
# Check that the object exists on the engine and pin a reference to it
iter_name = '_%s_rmt_iter_' % name
rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine)
tpl = '_tmp = %s.next()' % iter_name
while True:
try:
rc.execute(tpl, targets=engine)
result = rc.pull('_tmp', targets=engine)[0]
# This causes the StopIteration exception to be raised.
except CompositeError, e:
e.raise_exception()
else:
yield result
# Main, interactive testing
if __name__ == '__main__':
from IPython.kernel import client
ipc = client.MultiEngineClient()
print 'Engine IDs:',ipc.get_ids()
# Make a set of 'sorted datasets'
a0 = range(5,20)
a1 = range(10)
a2 = range(15,25)
# Now, imagine these had been created in the remote engines by some long
# computation. In this simple example, we just send them over into the
# remote engines. They will all be called 'a' in each engine.
ipc.push(dict(a=a0), targets=0)
ipc.push(dict(a=a1), targets=1)
ipc.push(dict(a=a2), targets=2)
# And we now make a local object which represents the remote iterator
aa0 = remote_iterator(ipc,0,'a')
aa1 = remote_iterator(ipc,1,'a')
aa2 = remote_iterator(ipc,2,'a')
# Let's merge them, both locally and remotely:
print 'Merge the local datasets:'
print list(mergesort([a0,a1,a2]))
print 'Locally merge the remote sets:'
print list(mergesort([aa0,aa1,aa2]))