|
|
#!/usr/bin/env python
|
|
|
"""
|
|
|
A simple python program of solving a 2D wave equation in parallel.
|
|
|
Domain partitioning and inter-processor communication
|
|
|
are done by an object of class ZMQRectPartitioner2D
|
|
|
(which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
|
|
|
|
|
|
An example of running the program is (8 processors, 4x2 partition,
|
|
|
200x200 grid cells)::
|
|
|
|
|
|
$ ipclusterz start -n 8 # start 8 engines
|
|
|
$ ./parallelwave.py --grid 200 200 --partition 4 2
|
|
|
|
|
|
See also parallelwave-mpi, which runs the same program, but uses MPI
|
|
|
(via mpi4py) for the inter-engine communication.
|
|
|
|
|
|
Authors
|
|
|
-------
|
|
|
|
|
|
* Xing Cai
|
|
|
* Min Ragan-Kelley
|
|
|
|
|
|
"""
|
|
|
#
|
|
|
import sys
|
|
|
import time
|
|
|
|
|
|
from numpy import exp, zeros, newaxis, sqrt
|
|
|
|
|
|
from IPython.external import argparse
|
|
|
from IPython.parallel.client import Client, Reference
|
|
|
|
|
|
def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
|
|
|
"""create a partitioner in the engine namespace"""
|
|
|
global partitioner
|
|
|
p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
|
|
|
p.redim(global_num_cells=gnum_cells, num_parts=parts)
|
|
|
p.prepare_communication()
|
|
|
# put the partitioner into the global namespace:
|
|
|
partitioner=p
|
|
|
|
|
|
def setup_solver(*args, **kwargs):
|
|
|
"""create a WaveSolver in the engine namespace."""
|
|
|
global solver
|
|
|
solver = WaveSolver(*args, **kwargs)
|
|
|
|
|
|
def wave_saver(u, x, y, t):
|
|
|
"""save the wave state for each timestep."""
|
|
|
global u_hist
|
|
|
global t_hist
|
|
|
t_hist.append(t)
|
|
|
u_hist.append(1.0*u)
|
|
|
|
|
|
|
|
|
# main program:
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
paa = parser.add_argument
|
|
|
paa('--grid', '-g',
|
|
|
type=int, nargs=2, default=[100,100], dest='grid',
|
|
|
help="Cells in the grid, e.g. --grid 100 200")
|
|
|
paa('--partition', '-p',
|
|
|
type=int, nargs=2, default=None,
|
|
|
help="Process partition grid, e.g. --partition 4 2 for 4x2")
|
|
|
paa('-c',
|
|
|
type=float, default=1.,
|
|
|
help="Wave speed (I think)")
|
|
|
paa('-Ly',
|
|
|
type=float, default=1.,
|
|
|
help="system size (in y)")
|
|
|
paa('-Lx',
|
|
|
type=float, default=1.,
|
|
|
help="system size (in x)")
|
|
|
paa('-t', '--tstop',
|
|
|
type=float, default=1.,
|
|
|
help="Time units to run")
|
|
|
paa('--profile',
|
|
|
type=unicode, default=u'default',
|
|
|
help="Specify the ipcluster profile for the client to connect to.")
|
|
|
paa('--save',
|
|
|
action='store_true',
|
|
|
help="Add this flag to save the time/wave history during the run.")
|
|
|
paa('--scalar',
|
|
|
action='store_true',
|
|
|
help="Also run with scalar interior implementation, to see vector speedup.")
|
|
|
|
|
|
ns = parser.parse_args()
|
|
|
# set up arguments
|
|
|
grid = ns.grid
|
|
|
partition = ns.partition
|
|
|
Lx = ns.Lx
|
|
|
Ly = ns.Ly
|
|
|
c = ns.c
|
|
|
tstop = ns.tstop
|
|
|
if ns.save:
|
|
|
user_action = wave_saver
|
|
|
else:
|
|
|
user_action = None
|
|
|
|
|
|
num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
|
|
|
final_test = True
|
|
|
|
|
|
# create the Client
|
|
|
rc = Client(profile=ns.profile)
|
|
|
num_procs = len(rc.ids)
|
|
|
|
|
|
if partition is None:
|
|
|
partition = [num_procs,1]
|
|
|
else:
|
|
|
num_procs = min(num_procs, partition[0]*partition[1])
|
|
|
|
|
|
assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
|
|
|
|
|
|
# construct the View:
|
|
|
view = rc[:num_procs]
|
|
|
print "Running %s system on %s processes until %f"%(grid, partition, tstop)
|
|
|
|
|
|
# functions defining initial/boundary/source conditions
|
|
|
def I(x,y):
|
|
|
from numpy import exp
|
|
|
return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
|
|
|
def f(x,y,t):
|
|
|
return 0.0
|
|
|
# from numpy import exp,sin
|
|
|
# return 10*exp(-(x - sin(100*t))**2)
|
|
|
def bc(x,y,t):
|
|
|
return 0.0
|
|
|
|
|
|
# initialize t_hist/u_hist for saving the state at each step (optional)
|
|
|
view['t_hist'] = []
|
|
|
view['u_hist'] = []
|
|
|
|
|
|
# set vector/scalar implementation details
|
|
|
impl = {}
|
|
|
impl['ic'] = 'vectorized'
|
|
|
impl['inner'] = 'scalar'
|
|
|
impl['bc'] = 'vectorized'
|
|
|
|
|
|
# execute some files so that the classes we need will be defined on the engines:
|
|
|
view.execute('import numpy')
|
|
|
view.run('communicator.py')
|
|
|
view.run('RectPartitioner.py')
|
|
|
view.run('wavesolver.py')
|
|
|
|
|
|
# scatter engine IDs
|
|
|
view.scatter('my_id', range(num_procs), flatten=True)
|
|
|
|
|
|
# create the engine connectors
|
|
|
view.execute('com = EngineCommunicator()')
|
|
|
|
|
|
# gather the connection information into a single dict
|
|
|
ar = view.apply_async(lambda : com.info)
|
|
|
peers = ar.get_dict()
|
|
|
# print peers
|
|
|
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
|
|
|
|
|
|
# setup remote partitioner
|
|
|
# note that Reference means that the argument passed to setup_partitioner will be the
|
|
|
# object named 'com' in the engine's namespace
|
|
|
view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
|
|
|
time.sleep(1)
|
|
|
# convenience lambda to call solver.solve:
|
|
|
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
|
|
|
|
|
|
if ns.scalar:
|
|
|
impl['inner'] = 'scalar'
|
|
|
# setup remote solvers
|
|
|
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
|
|
|
|
|
|
# run first with element-wise Python operations for each cell
|
|
|
t0 = time.time()
|
|
|
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
|
|
|
if final_test:
|
|
|
# this sum is performed element-wise as results finish
|
|
|
s = sum(ar)
|
|
|
# the L2 norm (RMS) of the result:
|
|
|
norm = sqrt(s/num_cells)
|
|
|
else:
|
|
|
norm = -1
|
|
|
t1 = time.time()
|
|
|
print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
|
|
|
|
|
|
# run again with faster numpy-vectorized inner implementation:
|
|
|
impl['inner'] = 'vectorized'
|
|
|
# setup remote solvers
|
|
|
view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
|
|
|
|
|
|
t0 = time.time()
|
|
|
|
|
|
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
|
|
|
if final_test:
|
|
|
# this sum is performed element-wise as results finish
|
|
|
s = sum(ar)
|
|
|
# the L2 norm (RMS) of the result:
|
|
|
norm = sqrt(s/num_cells)
|
|
|
else:
|
|
|
norm = -1
|
|
|
t1 = time.time()
|
|
|
print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
|
|
|
|
|
|
# if ns.save is True, then u_hist stores the history of u as a list
|
|
|
# If the partion scheme is Nx1, then u can be reconstructed via 'gather':
|
|
|
if ns.save and partition[-1] == 1:
|
|
|
import pylab
|
|
|
view.execute('u_last=u_hist[-1]')
|
|
|
u_last = view.gather('u_last', block=True)
|
|
|
pylab.pcolor(u_last)
|
|
|
pylab.show()
|