parallelwave.py
200 lines
| 6.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3656 | #!/usr/bin/env python | |
""" | |||
A simple python program of solving a 2D wave equation in parallel. | |||
Domain partitioning and inter-processor communication | |||
are done by an object of class ZMQRectPartitioner2D | |||
(which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) | |||
An example of running the program is (8 processors, 4x2 partition, | |||
200x200 grid cells):: | |||
$ ipclusterz start -n 8 # start 8 engines | |||
$ ./parallelwave.py --grid 200 200 --partition 4 2 | |||
See also parallelwave-mpi, which runs the same program, but uses MPI | |||
(via mpi4py) for the inter-engine communication. | |||
Authors | |||
------- | |||
* Xing Cai | |||
* Min Ragan-Kelley | |||
""" | |||
# | |||
import sys | |||
import time | |||
from numpy import exp, zeros, newaxis, sqrt | |||
from IPython.external import argparse | |||
from IPython.zmq.parallel.client import Client, Reference | |||
def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts): | |||
"""create a partitioner in the engine namespace""" | |||
p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) | |||
p.redim(global_num_cells=gnum_cells, num_parts=parts) | |||
p.prepare_communication() | |||
# put the partitioner into the global namespace: | |||
ns.partitioner=p | |||
def setup_solver(ns, *args, **kwargs): | |||
"""create a WaveSolver in the engine namespace.""" | |||
ns.solver = WaveSolver(*args, **kwargs) | |||
def wave_saver(u, x, y, t): | |||
"""save the wave state for each timestep.""" | |||
global u_hist | |||
global t_hist | |||
t_hist.append(t) | |||
u_hist.append(1.0*u) | |||
# main program: | |||
if __name__ == '__main__': | |||
parser = argparse.ArgumentParser() | |||
paa = parser.add_argument | |||
paa('--grid', '-g', | |||
type=int, nargs=2, default=[100,100], dest='grid', | |||
help="Cells in the grid, e.g. --grid 100 200") | |||
paa('--partition', '-p', | |||
type=int, nargs=2, default=None, | |||
help="Process partition grid, e.g. --partition 4 2 for 4x2") | |||
paa('-c', | |||
type=float, default=1., | |||
help="Wave speed (I think)") | |||
paa('-Ly', | |||
type=float, default=1., | |||
help="system size (in y)") | |||
paa('-Lx', | |||
type=float, default=1., | |||
help="system size (in x)") | |||
paa('-t', '--tstop', | |||
type=float, default=1., | |||
help="Time units to run") | |||
paa('--profile', | |||
type=unicode, default=u'default', | |||
help="Specify the ipcluster profile for the client to connect to.") | |||
paa('--save', | |||
action='store_true', | |||
help="Add this flag to save the time/wave history during the run.") | |||
paa('--scalar', | |||
action='store_true', | |||
help="Also run with scalar interior implementation, to see vector speedup.") | |||
ns = parser.parse_args() | |||
# set up arguments | |||
grid = ns.grid | |||
partition = ns.partition | |||
Lx = ns.Lx | |||
Ly = ns.Ly | |||
c = ns.c | |||
tstop = ns.tstop | |||
if ns.save: | |||
user_action = wave_saver | |||
else: | |||
user_action = None | |||
num_cells = 1.0*(grid[0]-1)*(grid[1]-1) | |||
final_test = True | |||
# create the Client | |||
rc = Client(profile=ns.profile) | |||
num_procs = len(rc.ids) | |||
if partition is None: | |||
partition = [num_procs,1] | |||
assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |||
# functions defining initial/boundary/source conditions | |||
def I(x,y): | |||
from numpy import exp | |||
return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) | |||
def f(x,y,t): | |||
return 0.0 | |||
# from numpy import exp,sin | |||
# return 10*exp(-(x - sin(100*t))**2) | |||
def bc(x,y,t): | |||
return 0.0 | |||
# initialize t_hist/u_hist for saving the state at each step (optional) | |||
rc[:]['t_hist'] = [] | |||
rc[:]['u_hist'] = [] | |||
# set vector/scalar implementation details | |||
impl = {} | |||
impl['ic'] = 'vectorized' | |||
impl['inner'] = 'scalar' | |||
impl['bc'] = 'vectorized' | |||
# execute some files so that the classes we need will be defined on the engines: | |||
rc[:].execute('import numpy') | |||
rc[:].run('communicator.py') | |||
rc[:].run('RectPartitioner.py') | |||
rc[:].run('wavesolver.py') | |||
# scatter engine IDs | |||
rc[:].scatter('my_id', rc.ids, flatten=True) | |||
# create the engine connectors | |||
rc[:].execute('com = EngineCommunicator()') | |||
# gather the connection information into a single dict | |||
ar = rc[:].apply_async(lambda : com.info) | |||
peers = ar.get_dict() | |||
# print peers | |||
# this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |||
# setup remote partitioner | |||
# note that Reference means that the argument passed to setup_partitioner will be the | |||
# object named 'com' in the engine's namespace | |||
rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) | |||
time.sleep(1) | |||
# convenience lambda to call solver.solve: | |||
_solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |||
if ns.scalar: | |||
impl['inner'] = 'scalar' | |||
# setup remote solvers | |||
rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
# run first with element-wise Python operations for each cell | |||
t0 = time.time() | |||
ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |||
if final_test: | |||
# this sum is performed element-wise as results finish | |||
s = sum(ar) | |||
# the L2 norm (RMS) of the result: | |||
norm = sqrt(s/num_cells) | |||
else: | |||
norm = -1 | |||
t1 = time.time() | |||
print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
# run again with faster numpy-vectorized inner implementation: | |||
impl['inner'] = 'vectorized' | |||
# setup remote solvers | |||
rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
t0 = time.time() | |||
ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) | |||
if final_test: | |||
# this sum is performed element-wise as results finish | |||
s = sum(ar) | |||
# the L2 norm (RMS) of the result: | |||
norm = sqrt(s/num_cells) | |||
else: | |||
norm = -1 | |||
t1 = time.time() | |||
print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
# if ns.save is True, then u_hist stores the history of u as a list | |||
# If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |||
if ns.save and partition[-1] == 1: | |||
import pylab | |||
rc[:].execute('u_last=u_hist[-1]') | |||
u_last = rc[:].gather('u_last', block=True) | |||
pylab.pcolor(u_last) | |||
pylab.show() |