##// END OF EJS Templates
Merge pull request #8296 from minrk/rm-kernel...
Merge pull request #8296 from minrk/rm-kernel remove ipython_kernel

File last commit:

r21171:40f50cd7
r21224:f9254a22 merge
Show More
map.py
124 lines | 3.6 KiB | text/x-python | PythonLexer
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # encoding: utf-8
"""Classes used in scattering and gathering sequences.
Scattering consists of partitioning a sequence and sending the various
pieces to individual nodes in a cluster.
"""
MinRK
avoid unnecessary imports of numpy...
r17053 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
avoid unnecessary imports of numpy...
r17053 import sys
Min RK
remove use of utils.flatten...
r21171 from itertools import islice, chain
MinRK
avoid unnecessary imports of numpy...
r17053
numpy = None
def is_array(obj):
"""Is an object a numpy array?
Avoids importing numpy until it is requested
"""
global numpy
if 'numpy' not in sys.modules:
return False
if numpy is None:
import numpy
return isinstance(obj, numpy.ndarray)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
don't create lists...
r10072 class Map(object):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """A class for partitioning a sequence using a map."""
MinRK
don't create lists...
r10072
MinRK
allow map objects to partition specified lengths
r10567 def getPartition(self, seq, p, q, n=None):
"""Returns the pth partition of q partitions of seq.
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
allow map objects to partition specified lengths
r10567 The length can be specified as `n`,
otherwise it is the value of `len(seq)`
"""
n = len(seq) if n is None else n
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # Test for error conditions here
if p<0 or p>=q:
MinRK
allow map objects to partition specified lengths
r10567 raise ValueError("must have 0 <= p <= q, but have p=%s,q=%s" % (p, q))
MinRK
don't create lists...
r10072
MinRK
allow map objects to partition specified lengths
r10567 remainder = n % q
basesize = n // q
MinRK
don't create lists...
r10072
if p < remainder:
low = p * (basesize + 1)
high = low + basesize + 1
else:
low = p * basesize + remainder
high = low + basesize
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
support iterators in view.map...
r5560 try:
MinRK
don't create lists...
r10072 result = seq[low:high]
MinRK
support iterators in view.map...
r5560 except TypeError:
# some objects (iterators) can't be sliced,
# use islice:
MinRK
don't create lists...
r10072 result = list(islice(seq, low, high))
MinRK
support iterators in view.map...
r5560
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 return result
def joinPartitions(self, listOfPartitions):
return self.concatenate(listOfPartitions)
MinRK
avoid unnecessary imports of numpy...
r17053
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 def concatenate(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
MinRK
avoid unnecessary imports of numpy...
r17053 if is_array(testObject):
return numpy.concatenate(listOfPartitions)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # Next try for Python sequence types
Thomas Kluyver
Fix type checks in IPython.parallel
r13399 if isinstance(testObject, (list, tuple)):
Min RK
remove use of utils.flatten...
r21171 return list(chain.from_iterable(listOfPartitions))
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # If we have scalars, just return listOfPartitions
return listOfPartitions
class RoundRobinMap(Map):
MinRK
allow map objects to partition specified lengths
r10567 """Partitions a sequence in a round robin fashion.
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
This currently does not work!
"""
MinRK
allow map objects to partition specified lengths
r10567 def getPartition(self, seq, p, q, n=None):
n = len(seq) if n is None else n
return seq[p:n:q]
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
def joinPartitions(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
MinRK
avoid unnecessary imports of numpy...
r17053 if is_array(testObject):
return self.flatten_array(listOfPartitions)
Thomas Kluyver
Fix type checks in IPython.parallel
r13399 if isinstance(testObject, (list, tuple)):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 return self.flatten_list(listOfPartitions)
return listOfPartitions
MinRK
avoid unnecessary imports of numpy...
r17053 def flatten_array(self, listOfPartitions):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 test = listOfPartitions[0]
shape = list(test.shape)
shape[0] = sum([ p.shape[0] for p in listOfPartitions])
MinRK
avoid unnecessary imports of numpy...
r17053 A = numpy.ndarray(shape)
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 N = shape[0]
q = len(listOfPartitions)
for p,part in enumerate(listOfPartitions):
A[p:N:q] = part
return A
def flatten_list(self, listOfPartitions):
flat = []
for i in range(len(listOfPartitions[0])):
flat.extend([ part[i] for part in listOfPartitions if len(part) > i ])
return flat
def mappable(obj):
"""return whether an object is mappable or not."""
if isinstance(obj, (tuple,list)):
return True
MinRK
avoid unnecessary imports of numpy...
r17053 if is_array(obj):
return True
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 return False
dists = {'b':Map,'r':RoundRobinMap}