nwmerge.py
123 lines
| 3.9 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1337 | """Example showing how to merge multiple remote data streams. | ||
""" | ||||
# Slightly modified version of: | ||||
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509 | ||||
import heapq | ||||
Brian E Granger
|
r1338 | from IPython.kernel.error import CompositeError | ||
Brian E Granger
|
r1337 | |||
def mergesort(list_of_lists, key=None): | ||||
""" Perform an N-way merge operation on sorted lists. | ||||
@param list_of_lists: (really iterable of iterable) of sorted elements | ||||
(either by naturally or by C{key}) | ||||
@param key: specify sort key function (like C{sort()}, C{sorted()}) | ||||
Yields tuples of the form C{(item, iterator)}, where the iterator is the | ||||
built-in list iterator or something you pass in, if you pre-generate the | ||||
iterators. | ||||
This is a stable merge; complexity O(N lg N) | ||||
Examples:: | ||||
>>> print list(mergesort([[1,2,3,4], | ||||
... [2,3.25,3.75,4.5,6,7], | ||||
... [2.625,3.625,6.625,9]])) | ||||
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9] | ||||
# note stability | ||||
>>> print list(mergesort([[1,2,3,4], | ||||
... [2,3.25,3.75,4.5,6,7], | ||||
... [2.625,3.625,6.625,9]], | ||||
... key=int)) | ||||
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9] | ||||
>>> print list(mergesort([[4, 3, 2, 1], | ||||
... [7, 6, 4.5, 3.75, 3.25, 2], | ||||
... [9, 6.625, 3.625, 2.625]], | ||||
... key=lambda x: -x)) | ||||
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1] | ||||
""" | ||||
heap = [] | ||||
for i, itr in enumerate(iter(pl) for pl in list_of_lists): | ||||
try: | ||||
item = itr.next() | ||||
Fernando Perez
|
r1706 | if key: | ||
toadd = (key(item), i, item, itr) | ||||
else: | ||||
toadd = (item, i, itr) | ||||
Brian E Granger
|
r1337 | heap.append(toadd) | ||
except StopIteration: | ||||
pass | ||||
heapq.heapify(heap) | ||||
if key: | ||||
while heap: | ||||
_, idx, item, itr = heap[0] | ||||
yield item | ||||
try: | ||||
item = itr.next() | ||||
heapq.heapreplace(heap, (key(item), idx, item, itr) ) | ||||
except StopIteration: | ||||
heapq.heappop(heap) | ||||
else: | ||||
while heap: | ||||
item, idx, itr = heap[0] | ||||
yield item | ||||
try: | ||||
heapq.heapreplace(heap, (itr.next(), idx, itr)) | ||||
except StopIteration: | ||||
heapq.heappop(heap) | ||||
def remote_iterator(rc,engine,name): | ||||
"""Return an iterator on an object living on a remote engine. | ||||
""" | ||||
# Check that the object exists on the engine and pin a reference to it | ||||
iter_name = '_%s_rmt_iter_' % name | ||||
rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine) | ||||
tpl = '_tmp = %s.next()' % iter_name | ||||
while True: | ||||
try: | ||||
rc.execute(tpl, targets=engine) | ||||
result = rc.pull('_tmp', targets=engine)[0] | ||||
# This causes the StopIteration exception to be raised. | ||||
except CompositeError, e: | ||||
e.raise_exception() | ||||
else: | ||||
yield result | ||||
# Main, interactive testing | ||||
if __name__ == '__main__': | ||||
Brian E Granger
|
r1338 | from IPython.kernel import client | ||
Brian E Granger
|
r1337 | ipc = client.MultiEngineClient() | ||
print 'Engine IDs:',ipc.get_ids() | ||||
# Make a set of 'sorted datasets' | ||||
a0 = range(5,20) | ||||
a1 = range(10) | ||||
a2 = range(15,25) | ||||
# Now, imagine these had been created in the remote engines by some long | ||||
# computation. In this simple example, we just send them over into the | ||||
# remote engines. They will all be called 'a' in each engine. | ||||
ipc.push(dict(a=a0), targets=0) | ||||
ipc.push(dict(a=a1), targets=1) | ||||
ipc.push(dict(a=a2), targets=2) | ||||
# And we now make a local object which represents the remote iterator | ||||
aa0 = remote_iterator(ipc,0,'a') | ||||
aa1 = remote_iterator(ipc,1,'a') | ||||
aa2 = remote_iterator(ipc,2,'a') | ||||
# Let's merge them, both locally and remotely: | ||||
print 'Merge the local datasets:' | ||||
print list(mergesort([a0,a1,a2])) | ||||
print 'Locally merge the remote sets:' | ||||
print list(mergesort([aa0,aa1,aa2])) | ||||