##// END OF EJS Templates
Merge pull request #1779 from takluyver/i1778...
Merge pull request #1779 from takluyver/i1778 Tidy up error raising in magic decorators, change a few exceptions for more appropriate kinds in new magics code. Closes gh-1778.

File last commit:

r6455:15863dc1
r7048:5e21b9ee merge
Show More
nwmerge.py
124 lines | 3.8 KiB | text/x-python | PythonLexer
MinRK
updates to docs and examples
r3670 """Example showing how to merge multiple remote data streams.
"""
# Slightly modified version of:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
Thomas Kluyver
Update print syntax in parallel examples.
r6455 from __future__ import print_function
MinRK
updates to docs and examples
r3670
import heapq
MinRK
remove kernel examples already ported to newparallel
r3675 from IPython.parallel.error import RemoteError
MinRK
updates to docs and examples
r3670
def mergesort(list_of_lists, key=None):
""" Perform an N-way merge operation on sorted lists.
@param list_of_lists: (really iterable of iterable) of sorted elements
(either by naturally or by C{key})
@param key: specify sort key function (like C{sort()}, C{sorted()})
Yields tuples of the form C{(item, iterator)}, where the iterator is the
built-in list iterator or something you pass in, if you pre-generate the
iterators.
This is a stable merge; complexity O(N lg N)
Examples::
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]]))
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
# note stability
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]],
... key=int))
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
>>> print list(mergesort([[4, 3, 2, 1],
... [7, 6, 4.5, 3.75, 3.25, 2],
... [9, 6.625, 3.625, 2.625]],
... key=lambda x: -x))
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
"""
heap = []
for i, itr in enumerate(iter(pl) for pl in list_of_lists):
try:
item = itr.next()
if key:
toadd = (key(item), i, item, itr)
else:
toadd = (item, i, itr)
heap.append(toadd)
except StopIteration:
pass
heapq.heapify(heap)
if key:
while heap:
_, idx, item, itr = heap[0]
yield item
try:
item = itr.next()
heapq.heapreplace(heap, (key(item), idx, item, itr) )
except StopIteration:
heapq.heappop(heap)
else:
while heap:
item, idx, itr = heap[0]
yield item
try:
heapq.heapreplace(heap, (itr.next(), idx, itr))
except StopIteration:
heapq.heappop(heap)
MinRK
remove kernel examples already ported to newparallel
r3675 def remote_iterator(view,name):
MinRK
updates to docs and examples
r3670 """Return an iterator on an object living on a remote engine.
"""
MinRK
remove kernel examples already ported to newparallel
r3675 view.execute('it%s=iter(%s)'%(name,name), block=True)
MinRK
updates to docs and examples
r3670 while True:
try:
MinRK
remove kernel examples already ported to newparallel
r3675 result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
MinRK
updates to docs and examples
r3670 # This causes the StopIteration exception to be raised.
Thomas Kluyver
Update print syntax in parallel examples.
r6455 except RemoteError as e:
MinRK
remove kernel examples already ported to newparallel
r3675 if e.ename == 'StopIteration':
raise StopIteration
else:
raise e
MinRK
updates to docs and examples
r3670 else:
yield result
# Main, interactive testing
if __name__ == '__main__':
MinRK
remove kernel examples already ported to newparallel
r3675 from IPython.parallel import Client, Reference
rc = Client()
view = rc[:]
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print('Engine IDs:', rc.ids)
MinRK
updates to docs and examples
r3670
# Make a set of 'sorted datasets'
a0 = range(5,20)
a1 = range(10)
a2 = range(15,25)
# Now, imagine these had been created in the remote engines by some long
# computation. In this simple example, we just send them over into the
# remote engines. They will all be called 'a' in each engine.
MinRK
remove kernel examples already ported to newparallel
r3675 rc[0]['a'] = a0
rc[1]['a'] = a1
rc[2]['a'] = a2
MinRK
updates to docs and examples
r3670
# And we now make a local object which represents the remote iterator
MinRK
remove kernel examples already ported to newparallel
r3675 aa0 = remote_iterator(rc[0],'a')
aa1 = remote_iterator(rc[1],'a')
aa2 = remote_iterator(rc[2],'a')
MinRK
updates to docs and examples
r3670
# Let's merge them, both locally and remotely:
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print('Merge the local datasets:')
print(list(mergesort([a0,a1,a2])))
MinRK
updates to docs and examples
r3670
Thomas Kluyver
Update print syntax in parallel examples.
r6455 print('Locally merge the remote sets:')
print(list(mergesort([aa0,aa1,aa2])))