##// END OF EJS Templates
Partial fixes for 2.4 compatibility. Unfinished....
Partial fixes for 2.4 compatibility. Unfinished. We just discovered that 0.9 breaks with python 2.4. This is an attempt at fixing the problem, but I'm not finished yet. Pushing so others can work off my branch.

File last commit:

r1706:a953d91d
r1706:a953d91d
Show More
nwmerge.py
123 lines | 3.9 KiB | text/x-python | PythonLexer
"""Example showing how to merge multiple remote data streams.
"""
# Slightly modified version of:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
import heapq
from IPython.kernel.error import CompositeError
def mergesort(list_of_lists, key=None):
""" Perform an N-way merge operation on sorted lists.
@param list_of_lists: (really iterable of iterable) of sorted elements
(either by naturally or by C{key})
@param key: specify sort key function (like C{sort()}, C{sorted()})
Yields tuples of the form C{(item, iterator)}, where the iterator is the
built-in list iterator or something you pass in, if you pre-generate the
iterators.
This is a stable merge; complexity O(N lg N)
Examples::
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]]))
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
# note stability
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]],
... key=int))
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
>>> print list(mergesort([[4, 3, 2, 1],
... [7, 6, 4.5, 3.75, 3.25, 2],
... [9, 6.625, 3.625, 2.625]],
... key=lambda x: -x))
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
"""
heap = []
for i, itr in enumerate(iter(pl) for pl in list_of_lists):
try:
item = itr.next()
if key:
toadd = (key(item), i, item, itr)
else:
toadd = (item, i, itr)
heap.append(toadd)
except StopIteration:
pass
heapq.heapify(heap)
if key:
while heap:
_, idx, item, itr = heap[0]
yield item
try:
item = itr.next()
heapq.heapreplace(heap, (key(item), idx, item, itr) )
except StopIteration:
heapq.heappop(heap)
else:
while heap:
item, idx, itr = heap[0]
yield item
try:
heapq.heapreplace(heap, (itr.next(), idx, itr))
except StopIteration:
heapq.heappop(heap)
def remote_iterator(rc,engine,name):
"""Return an iterator on an object living on a remote engine.
"""
# Check that the object exists on the engine and pin a reference to it
iter_name = '_%s_rmt_iter_' % name
rc.execute('%s = iter(%s)' % (iter_name,name), targets=engine)
tpl = '_tmp = %s.next()' % iter_name
while True:
try:
rc.execute(tpl, targets=engine)
result = rc.pull('_tmp', targets=engine)[0]
# This causes the StopIteration exception to be raised.
except CompositeError, e:
e.raise_exception()
else:
yield result
# Main, interactive testing
if __name__ == '__main__':
from IPython.kernel import client
ipc = client.MultiEngineClient()
print 'Engine IDs:',ipc.get_ids()
# Make a set of 'sorted datasets'
a0 = range(5,20)
a1 = range(10)
a2 = range(15,25)
# Now, imagine these had been created in the remote engines by some long
# computation. In this simple example, we just send them over into the
# remote engines. They will all be called 'a' in each engine.
ipc.push(dict(a=a0), targets=0)
ipc.push(dict(a=a1), targets=1)
ipc.push(dict(a=a2), targets=2)
# And we now make a local object which represents the remote iterator
aa0 = remote_iterator(ipc,0,'a')
aa1 = remote_iterator(ipc,1,'a')
aa2 = remote_iterator(ipc,2,'a')
# Let's merge them, both locally and remotely:
print 'Merge the local datasets:'
print list(mergesort([a0,a1,a2]))
print 'Locally merge the remote sets:'
print list(mergesort([aa0,aa1,aa2]))