##// END OF EJS Templates
bash completion: ipython <tab> shows only subcmds...
bash completion: ipython <tab> shows only subcmds If a partial filename, or an flag or option argument is invoked, those completions will still take place, but by default, to aid the user in the discovery of subcommands, pressing tab immediately after ipython will list *only* the subcommands as possible completions

File last commit:

r9190:20a102a5
r13569:53c89a60
Show More
nwmerge.py
124 lines | 3.8 KiB | text/x-python | PythonLexer
"""Example showing how to merge multiple remote data streams.
"""
# Slightly modified version of:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
from __future__ import print_function
import heapq
from IPython.parallel.error import RemoteError
def mergesort(list_of_lists, key=None):
""" Perform an N-way merge operation on sorted lists.
@param list_of_lists: (really iterable of iterable) of sorted elements
(either by naturally or by C{key})
@param key: specify sort key function (like C{sort()}, C{sorted()})
Yields tuples of the form C{(item, iterator)}, where the iterator is the
built-in list iterator or something you pass in, if you pre-generate the
iterators.
This is a stable merge; complexity O(N lg N)
Examples::
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]]))
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
# note stability
>>> print list(mergesort([[1,2,3,4],
... [2,3.25,3.75,4.5,6,7],
... [2.625,3.625,6.625,9]],
... key=int))
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
>>> print list(mergesort([[4, 3, 2, 1],
... [7, 6, 4.5, 3.75, 3.25, 2],
... [9, 6.625, 3.625, 2.625]],
... key=lambda x: -x))
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
"""
heap = []
for i, itr in enumerate(iter(pl) for pl in list_of_lists):
try:
item = itr.next()
if key:
toadd = (key(item), i, item, itr)
else:
toadd = (item, i, itr)
heap.append(toadd)
except StopIteration:
pass
heapq.heapify(heap)
if key:
while heap:
_, idx, item, itr = heap[0]
yield item
try:
item = itr.next()
heapq.heapreplace(heap, (key(item), idx, item, itr) )
except StopIteration:
heapq.heappop(heap)
else:
while heap:
item, idx, itr = heap[0]
yield item
try:
heapq.heapreplace(heap, (itr.next(), idx, itr))
except StopIteration:
heapq.heappop(heap)
def remote_iterator(view,name):
"""Return an iterator on an object living on a remote engine.
"""
view.execute('it%s=iter(%s)'%(name,name), block=True)
while True:
try:
result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
# This causes the StopIteration exception to be raised.
except RemoteError as e:
if e.ename == 'StopIteration':
raise StopIteration
else:
raise e
else:
yield result
# Main, interactive testing
if __name__ == '__main__':
from IPython.parallel import Client, Reference
rc = Client()
view = rc[:]
print('Engine IDs:', rc.ids)
# Make a set of 'sorted datasets'
a0 = range(5,20)
a1 = range(10)
a2 = range(15,25)
# Now, imagine these had been created in the remote engines by some long
# computation. In this simple example, we just send them over into the
# remote engines. They will all be called 'a' in each engine.
rc[0]['a'] = a0
rc[1]['a'] = a1
rc[2]['a'] = a2
# And we now make a local object which represents the remote iterator
aa0 = remote_iterator(rc[0],'a')
aa1 = remote_iterator(rc[1],'a')
aa2 = remote_iterator(rc[2],'a')
# Let's merge them, both locally and remotely:
print('Merge the local datasets:')
print(list(mergesort([a0,a1,a2])))
print('Locally merge the remote sets:')
print(list(mergesort([aa0,aa1,aa2])))