##// END OF EJS Templates
Merge pull request #3184 from juliantaylor/cython-cache...
Merge pull request #3184 from juliantaylor/cython-cache Cython cache

File last commit:

r10072:a20ff9cf
r10232:81ae2752 merge
Show More
map.py
170 lines | 5.2 KiB | text/x-python | PythonLexer
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # encoding: utf-8
"""Classes used in scattering and gathering sequences.
Scattering consists of partitioning a sequence and sending the various
pieces to individual nodes in a cluster.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
#-------------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 import types
MinRK
support iterators in view.map...
r5560 from itertools import islice
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
from IPython.utils.data import flatten as utils_flatten
#-------------------------------------------------------------------------------
# Figure out which array packages are present and their array types
#-------------------------------------------------------------------------------
arrayModules = []
try:
import Numeric
except ImportError:
pass
else:
arrayModules.append({'module':Numeric, 'type':Numeric.arraytype})
try:
import numpy
except ImportError:
pass
else:
arrayModules.append({'module':numpy, 'type':numpy.ndarray})
try:
import numarray
except ImportError:
pass
else:
arrayModules.append({'module':numarray,
'type':numarray.numarraycore.NumArray})
MinRK
don't create lists...
r10072 class Map(object):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """A class for partitioning a sequence using a map."""
MinRK
don't create lists...
r10072
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 def getPartition(self, seq, p, q):
"""Returns the pth partition of q partitions of seq."""
# Test for error conditions here
if p<0 or p>=q:
print "No partition exists."
return
MinRK
don't create lists...
r10072
N = len(seq)
remainder = N % q
basesize = N // q
if p < remainder:
low = p * (basesize + 1)
high = low + basesize + 1
else:
low = p * basesize + remainder
high = low + basesize
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
support iterators in view.map...
r5560 try:
MinRK
don't create lists...
r10072 result = seq[low:high]
MinRK
support iterators in view.map...
r5560 except TypeError:
# some objects (iterators) can't be sliced,
# use islice:
MinRK
don't create lists...
r10072 result = list(islice(seq, low, high))
MinRK
support iterators in view.map...
r5560
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 return result
def joinPartitions(self, listOfPartitions):
return self.concatenate(listOfPartitions)
def concatenate(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
for m in arrayModules:
#print m
if isinstance(testObject, m['type']):
return m['module'].concatenate(listOfPartitions)
# Next try for Python sequence types
if isinstance(testObject, (types.ListType, types.TupleType)):
return utils_flatten(listOfPartitions)
# If we have scalars, just return listOfPartitions
return listOfPartitions
class RoundRobinMap(Map):
"""Partitions a sequence in a roun robin fashion.
This currently does not work!
"""
def getPartition(self, seq, p, q):
# if not isinstance(seq,(list,tuple)):
# raise NotImplementedError("cannot RR partition type %s"%type(seq))
return seq[p:len(seq):q]
#result = []
#for i in range(p,len(seq),q):
# result.append(seq[i])
#return result
def joinPartitions(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
for m in arrayModules:
#print m
if isinstance(testObject, m['type']):
return self.flatten_array(m['type'], listOfPartitions)
if isinstance(testObject, (types.ListType, types.TupleType)):
return self.flatten_list(listOfPartitions)
return listOfPartitions
def flatten_array(self, klass, listOfPartitions):
test = listOfPartitions[0]
shape = list(test.shape)
shape[0] = sum([ p.shape[0] for p in listOfPartitions])
A = klass(shape)
N = shape[0]
q = len(listOfPartitions)
for p,part in enumerate(listOfPartitions):
A[p:N:q] = part
return A
def flatten_list(self, listOfPartitions):
flat = []
for i in range(len(listOfPartitions[0])):
flat.extend([ part[i] for part in listOfPartitions if len(part) > i ])
return flat
#lengths = [len(x) for x in listOfPartitions]
#maxPartitionLength = len(listOfPartitions[0])
#numberOfPartitions = len(listOfPartitions)
#concat = self.concatenate(listOfPartitions)
#totalLength = len(concat)
#result = []
#for i in range(maxPartitionLength):
# result.append(concat[i:totalLength:maxPartitionLength])
# return self.concatenate(listOfPartitions)
def mappable(obj):
"""return whether an object is mappable or not."""
if isinstance(obj, (tuple,list)):
return True
for m in arrayModules:
if isinstance(obj,m['type']):
return True
return False
dists = {'b':Map,'r':RoundRobinMap}