nwmerge.py
123 lines
| 3.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3670 | """Example showing how to merge multiple remote data streams. | ||
""" | ||||
# Slightly modified version of: | ||||
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509 | ||||
import heapq | ||||
MinRK
|
r3675 | from IPython.parallel.error import RemoteError | ||
MinRK
|
r3670 | |||
def mergesort(list_of_lists, key=None): | ||||
""" Perform an N-way merge operation on sorted lists. | ||||
@param list_of_lists: (really iterable of iterable) of sorted elements | ||||
(either by naturally or by C{key}) | ||||
@param key: specify sort key function (like C{sort()}, C{sorted()}) | ||||
Yields tuples of the form C{(item, iterator)}, where the iterator is the | ||||
built-in list iterator or something you pass in, if you pre-generate the | ||||
iterators. | ||||
This is a stable merge; complexity O(N lg N) | ||||
Examples:: | ||||
>>> print list(mergesort([[1,2,3,4], | ||||
... [2,3.25,3.75,4.5,6,7], | ||||
... [2.625,3.625,6.625,9]])) | ||||
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9] | ||||
# note stability | ||||
>>> print list(mergesort([[1,2,3,4], | ||||
... [2,3.25,3.75,4.5,6,7], | ||||
... [2.625,3.625,6.625,9]], | ||||
... key=int)) | ||||
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9] | ||||
>>> print list(mergesort([[4, 3, 2, 1], | ||||
... [7, 6, 4.5, 3.75, 3.25, 2], | ||||
... [9, 6.625, 3.625, 2.625]], | ||||
... key=lambda x: -x)) | ||||
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1] | ||||
""" | ||||
heap = [] | ||||
for i, itr in enumerate(iter(pl) for pl in list_of_lists): | ||||
try: | ||||
item = itr.next() | ||||
if key: | ||||
toadd = (key(item), i, item, itr) | ||||
else: | ||||
toadd = (item, i, itr) | ||||
heap.append(toadd) | ||||
except StopIteration: | ||||
pass | ||||
heapq.heapify(heap) | ||||
if key: | ||||
while heap: | ||||
_, idx, item, itr = heap[0] | ||||
yield item | ||||
try: | ||||
item = itr.next() | ||||
heapq.heapreplace(heap, (key(item), idx, item, itr) ) | ||||
except StopIteration: | ||||
heapq.heappop(heap) | ||||
else: | ||||
while heap: | ||||
item, idx, itr = heap[0] | ||||
yield item | ||||
try: | ||||
heapq.heapreplace(heap, (itr.next(), idx, itr)) | ||||
except StopIteration: | ||||
heapq.heappop(heap) | ||||
MinRK
|
r3675 | def remote_iterator(view,name): | ||
MinRK
|
r3670 | """Return an iterator on an object living on a remote engine. | ||
""" | ||||
MinRK
|
r3675 | view.execute('it%s=iter(%s)'%(name,name), block=True) | ||
MinRK
|
r3670 | while True: | ||
try: | ||||
MinRK
|
r3675 | result = view.apply_sync(lambda x: x.next(), Reference('it'+name)) | ||
MinRK
|
r3670 | # This causes the StopIteration exception to be raised. | ||
MinRK
|
r3675 | except RemoteError, e: | ||
if e.ename == 'StopIteration': | ||||
raise StopIteration | ||||
else: | ||||
raise e | ||||
MinRK
|
r3670 | else: | ||
yield result | ||||
# Main, interactive testing | ||||
if __name__ == '__main__': | ||||
MinRK
|
r3675 | from IPython.parallel import Client, Reference | ||
rc = Client() | ||||
view = rc[:] | ||||
print 'Engine IDs:', rc.ids | ||||
MinRK
|
r3670 | |||
# Make a set of 'sorted datasets' | ||||
a0 = range(5,20) | ||||
a1 = range(10) | ||||
a2 = range(15,25) | ||||
# Now, imagine these had been created in the remote engines by some long | ||||
# computation. In this simple example, we just send them over into the | ||||
# remote engines. They will all be called 'a' in each engine. | ||||
MinRK
|
r3675 | rc[0]['a'] = a0 | ||
rc[1]['a'] = a1 | ||||
rc[2]['a'] = a2 | ||||
MinRK
|
r3670 | |||
# And we now make a local object which represents the remote iterator | ||||
MinRK
|
r3675 | aa0 = remote_iterator(rc[0],'a') | ||
aa1 = remote_iterator(rc[1],'a') | ||||
aa2 = remote_iterator(rc[2],'a') | ||||
MinRK
|
r3670 | |||
# Let's merge them, both locally and remotely: | ||||
print 'Merge the local datasets:' | ||||
print list(mergesort([a0,a1,a2])) | ||||
print 'Locally merge the remote sets:' | ||||
print list(mergesort([aa0,aa1,aa2])) | ||||