##// END OF EJS Templates
payload.write_payload: use `single` keyword instead of `update`...
payload.write_payload: use `single` keyword instead of `update` * change default behavior when adding payloads to single=True * document write_payload

File last commit:

r10567:9dd1ef95
r12933:b8499e39
Show More
map.py
167 lines | 5.1 KiB | text/x-python | PythonLexer
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # encoding: utf-8
"""Classes used in scattering and gathering sequences.
Scattering consists of partitioning a sequence and sending the various
pieces to individual nodes in a cluster.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Brian Granger
* MinRK
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """
#-------------------------------------------------------------------------------
MinRK
update recently changed modules with Authors in docstring
r4018 # Copyright (C) 2008-2011 The IPython Development Team
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 import types
MinRK
support iterators in view.map...
r5560 from itertools import islice
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
from IPython.utils.data import flatten as utils_flatten
#-------------------------------------------------------------------------------
# Figure out which array packages are present and their array types
#-------------------------------------------------------------------------------
arrayModules = []
try:
import Numeric
except ImportError:
pass
else:
arrayModules.append({'module':Numeric, 'type':Numeric.arraytype})
try:
import numpy
except ImportError:
pass
else:
arrayModules.append({'module':numpy, 'type':numpy.ndarray})
try:
import numarray
except ImportError:
pass
else:
arrayModules.append({'module':numarray,
'type':numarray.numarraycore.NumArray})
MinRK
don't create lists...
r10072 class Map(object):
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 """A class for partitioning a sequence using a map."""
MinRK
don't create lists...
r10072
MinRK
allow map objects to partition specified lengths
r10567 def getPartition(self, seq, p, q, n=None):
"""Returns the pth partition of q partitions of seq.
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
allow map objects to partition specified lengths
r10567 The length can be specified as `n`,
otherwise it is the value of `len(seq)`
"""
n = len(seq) if n is None else n
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 # Test for error conditions here
if p<0 or p>=q:
MinRK
allow map objects to partition specified lengths
r10567 raise ValueError("must have 0 <= p <= q, but have p=%s,q=%s" % (p, q))
MinRK
don't create lists...
r10072
MinRK
allow map objects to partition specified lengths
r10567 remainder = n % q
basesize = n // q
MinRK
don't create lists...
r10072
if p < remainder:
low = p * (basesize + 1)
high = low + basesize + 1
else:
low = p * basesize + remainder
high = low + basesize
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
MinRK
support iterators in view.map...
r5560 try:
MinRK
don't create lists...
r10072 result = seq[low:high]
MinRK
support iterators in view.map...
r5560 except TypeError:
# some objects (iterators) can't be sliced,
# use islice:
MinRK
don't create lists...
r10072 result = list(islice(seq, low, high))
MinRK
support iterators in view.map...
r5560
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587 return result
def joinPartitions(self, listOfPartitions):
return self.concatenate(listOfPartitions)
def concatenate(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
for m in arrayModules:
#print m
if isinstance(testObject, m['type']):
return m['module'].concatenate(listOfPartitions)
# Next try for Python sequence types
if isinstance(testObject, (types.ListType, types.TupleType)):
return utils_flatten(listOfPartitions)
# If we have scalars, just return listOfPartitions
return listOfPartitions
class RoundRobinMap(Map):
MinRK
allow map objects to partition specified lengths
r10567 """Partitions a sequence in a round robin fashion.
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
This currently does not work!
"""
MinRK
allow map objects to partition specified lengths
r10567 def getPartition(self, seq, p, q, n=None):
n = len(seq) if n is None else n
return seq[p:n:q]
MinRK
add map/scatter/gather/ParallelFunction from kernel
r3587
def joinPartitions(self, listOfPartitions):
testObject = listOfPartitions[0]
# First see if we have a known array type
for m in arrayModules:
#print m
if isinstance(testObject, m['type']):
return self.flatten_array(m['type'], listOfPartitions)
if isinstance(testObject, (types.ListType, types.TupleType)):
return self.flatten_list(listOfPartitions)
return listOfPartitions
def flatten_array(self, klass, listOfPartitions):
test = listOfPartitions[0]
shape = list(test.shape)
shape[0] = sum([ p.shape[0] for p in listOfPartitions])
A = klass(shape)
N = shape[0]
q = len(listOfPartitions)
for p,part in enumerate(listOfPartitions):
A[p:N:q] = part
return A
def flatten_list(self, listOfPartitions):
flat = []
for i in range(len(listOfPartitions[0])):
flat.extend([ part[i] for part in listOfPartitions if len(part) > i ])
return flat
#lengths = [len(x) for x in listOfPartitions]
#maxPartitionLength = len(listOfPartitions[0])
#numberOfPartitions = len(listOfPartitions)
#concat = self.concatenate(listOfPartitions)
#totalLength = len(concat)
#result = []
#for i in range(maxPartitionLength):
# result.append(concat[i:totalLength:maxPartitionLength])
# return self.concatenate(listOfPartitions)
def mappable(obj):
"""return whether an object is mappable or not."""
if isinstance(obj, (tuple,list)):
return True
for m in arrayModules:
if isinstance(obj,m['type']):
return True
return False
dists = {'b':Map,'r':RoundRobinMap}