diff --git a/docs/examples/newparallel/wave2D/RectPartitioner.py b/docs/examples/newparallel/wave2D/RectPartitioner.py new file mode 100755 index 0000000..985659d --- /dev/null +++ b/docs/examples/newparallel/wave2D/RectPartitioner.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python +"""A rectangular domain partitioner and associated communication +functionality for solving PDEs in (1D,2D) using FDM +written in the python language + +The global solution domain is assumed to be of rectangular shape, +where the number of cells in each direction is stored in nx, ny, nz + +The numerical scheme is fully explicit + +Authors +------- + + * Xing Cai + * Min Ragan-Kelley + +""" +import time + +from numpy import zeros, ascontiguousarray, frombuffer +try: + from mpi4py import MPI +except ImportError: + pass +else: + mpi = MPI.COMM_WORLD + +class RectPartitioner: + """ + Responsible for a rectangular partitioning of a global domain, + which is expressed as the numbers of cells in the different + spatial directions. The partitioning info is expressed as an + array of integers, each indicating the number of subdomains in + one spatial direction. + """ + + def __init__(self, my_id=-1, num_procs=-1, \ + global_num_cells=[], num_parts=[]): + self.nsd = 0 + self.my_id = my_id + self.num_procs = num_procs + self.redim (global_num_cells, num_parts) + + def redim (self, global_num_cells, num_parts): + nsd_ = len(global_num_cells) +# print "Inside the redim function, nsd=%d" %nsd_ + + if nsd_<1 | nsd_>3 | nsd_!=len(num_parts): + print 'The input global_num_cells is not ok!' + return + + self.nsd = nsd_ + self.global_num_cells = global_num_cells + self.num_parts = num_parts + + def prepare_communication (self): + """ + Find the subdomain rank (tuple) for each processor and + determine the neighbor info. + """ + + nsd_ = self.nsd + if nsd_<1: + print 'Number of space dimensions is %d, nothing to do' %nsd_ + return + + self.subd_rank = [-1,-1,-1] + self.subd_lo_ix = [-1,-1,-1] + self.subd_hi_ix = [-1,-1,-1] + self.lower_neighbors = [-1,-1,-1] + self.upper_neighbors = [-1,-1,-1] + + num_procs = self.num_procs + my_id = self.my_id + + num_subds = 1 + for i in range(nsd_): + num_subds = num_subds*self.num_parts[i] + if my_id==0: + print "# subds=", num_subds + # should check num_subds againt num_procs + + offsets = [1, 0, 0] + + # find the subdomain rank + self.subd_rank[0] = my_id%self.num_parts[0] + if nsd_>=2: + offsets[1] = self.num_parts[0] + self.subd_rank[1] = my_id/offsets[1] + if nsd_==3: + offsets[1] = self.num_parts[0] + offsets[2] = self.num_parts[0]*self.num_parts[1] + self.subd_rank[1] = (my_id%offsets[2])/self.num_parts[0] + self.subd_rank[2] = my_id/offsets[2] + + print "my_id=%d, subd_rank: "%my_id, self.subd_rank + if my_id==0: + print "offsets=", offsets + + # find the neighbor ids + for i in range(nsd_): + rank = self.subd_rank[i] + if rank>0: + self.lower_neighbors[i] = my_id-offsets[i] + if rank=(self.num_parts[i]-m): + ix = ix+1 # load balancing + if rank=0: + self.in_lower_buffers = [zeros(1, float)] + self.out_lower_buffers = [zeros(1, float)] + if self.upper_neighbors[0]>=0: + self.in_upper_buffers = [zeros(1, float)] + self.out_upper_buffers = [zeros(1, float)] + + def get_num_loc_cells(self): + return [self.subd_hi_ix[0]-self.subd_lo_ix[0]] + + +class RectPartitioner2D(RectPartitioner): + """ + Subclass of RectPartitioner, for 2D problems + """ + def prepare_communication (self): + """ + Prepare the buffers to be used for later communications + """ + + RectPartitioner.prepare_communication (self) + + self.in_lower_buffers = [[], []] + self.out_lower_buffers = [[], []] + self.in_upper_buffers = [[], []] + self.out_upper_buffers = [[], []] + + size1 = self.subd_hi_ix[1]-self.subd_lo_ix[1]+1 + if self.lower_neighbors[0]>=0: + self.in_lower_buffers[0] = zeros(size1, float) + self.out_lower_buffers[0] = zeros(size1, float) + if self.upper_neighbors[0]>=0: + self.in_upper_buffers[0] = zeros(size1, float) + self.out_upper_buffers[0] = zeros(size1, float) + + size0 = self.subd_hi_ix[0]-self.subd_lo_ix[0]+1 + if self.lower_neighbors[1]>=0: + self.in_lower_buffers[1] = zeros(size0, float) + self.out_lower_buffers[1] = zeros(size0, float) + if self.upper_neighbors[1]>=0: + self.in_upper_buffers[1] = zeros(size0, float) + self.out_upper_buffers[1] = zeros(size0, float) + + def get_num_loc_cells(self): + return [self.subd_hi_ix[0]-self.subd_lo_ix[0],\ + self.subd_hi_ix[1]-self.subd_lo_ix[1]] + + +class MPIRectPartitioner2D(RectPartitioner2D): + """ + Subclass of RectPartitioner2D, which uses MPI via mpi4py for communication + """ + + def __init__(self, my_id=-1, num_procs=-1, + global_num_cells=[], num_parts=[], + slice_copy=True): + RectPartitioner.__init__(self, my_id, num_procs, + global_num_cells, num_parts) + self.slice_copy = slice_copy + + def update_internal_boundary (self, solution_array): + nsd_ = self.nsd + if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): + print "Buffers for communicating with lower neighbors not ready" + return + if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): + print "Buffers for communicating with upper neighbors not ready" + return + + loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] + loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] + + lower_x_neigh = self.lower_neighbors[0] + upper_x_neigh = self.upper_neighbors[0] + lower_y_neigh = self.lower_neighbors[1] + upper_y_neigh = self.upper_neighbors[1] + + # communicate in the x-direction first + if lower_x_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) + else: + for i in xrange(0,loc_ny+1): + self.out_lower_buffers[0][i] = solution_array[1,i] + mpi.Isend(self.out_lower_buffers[0], lower_x_neigh) + + if upper_x_neigh>-1: + mpi.Recv(self.in_upper_buffers[0], upper_x_neigh) + if self.slice_copy: + solution_array[loc_nx,:] = self.in_upper_buffers[0] + self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) + else: + for i in xrange(0,loc_ny+1): + solution_array[loc_nx,i] = self.in_upper_buffers[0][i] + self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] + mpi.Isend(self.out_upper_buffers[0], upper_x_neigh) + + if lower_x_neigh>-1: + mpi.Recv(self.in_lower_buffers[0], lower_x_neigh) + if self.slice_copy: + solution_array[0,:] = self.in_lower_buffers[0] + else: + for i in xrange(0,loc_ny+1): + solution_array[0,i] = self.in_lower_buffers[0][i] + + # communicate in the y-direction afterwards + if lower_y_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) + else: + for i in xrange(0,loc_nx+1): + self.out_lower_buffers[1][i] = solution_array[i,1] + mpi.Isend(self.out_lower_buffers[1], lower_y_neigh) + + if upper_y_neigh>-1: + mpi.Recv(self.in_upper_buffers[1], upper_y_neigh) + if self.slice_copy: + solution_array[:,loc_ny] = self.in_upper_buffers[1] + self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) + else: + for i in xrange(0,loc_nx+1): + solution_array[i,loc_ny] = self.in_upper_buffers[1][i] + self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] + mpi.Isend(self.out_upper_buffers[1], upper_y_neigh) + + if lower_y_neigh>-1: + mpi.Recv(self.in_lower_buffers[1], lower_y_neigh) + if self.slice_copy: + solution_array[:,0] = self.in_lower_buffers[1] + else: + for i in xrange(0,loc_nx+1): + solution_array[i,0] = self.in_lower_buffers[1][i] + +class ZMQRectPartitioner2D(RectPartitioner2D): + """ + Subclass of RectPartitioner2D, which uses 0MQ via pyzmq for communication + The first two arguments must be `comm`, an EngineCommunicator object, + and `addrs`, a dict of connection information for other EngineCommunicator + objects. + """ + + def __init__(self, comm, addrs, my_id=-1, num_procs=-1, + global_num_cells=[], num_parts=[], + slice_copy=True): + RectPartitioner.__init__(self, my_id, num_procs, + global_num_cells, num_parts) + self.slice_copy = slice_copy + self.comm = comm # an Engine + self.addrs = addrs + + def prepare_communication(self): + RectPartitioner2D.prepare_communication(self) + # connect west/south to east/north + west_id,south_id = self.lower_neighbors[:2] + west = self.addrs.get(west_id, None) + south = self.addrs.get(south_id, None) + self.comm.connect(south, west) + + def update_internal_boundary_x_y (self, solution_array): + """update the inner boundary with the same send/recv pattern as the MPIPartitioner""" + nsd_ = self.nsd + dtype = solution_array.dtype + if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): + print "Buffers for communicating with lower neighbors not ready" + return + if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): + print "Buffers for communicating with upper neighbors not ready" + return + + loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] + loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] + + lower_x_neigh = self.lower_neighbors[0] + upper_x_neigh = self.upper_neighbors[0] + lower_y_neigh = self.lower_neighbors[1] + upper_y_neigh = self.upper_neighbors[1] + trackers = [] + flags = dict(copy=False, track=False) + # communicate in the x-direction first + if lower_x_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) + else: + for i in xrange(0,loc_ny+1): + self.out_lower_buffers[0][i] = solution_array[1,i] + t = self.comm.west.send(self.out_lower_buffers[0], **flags) + trackers.append(t) + + if upper_x_neigh>-1: + msg = self.comm.east.recv(copy=False) + self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[loc_nx,:] = self.in_upper_buffers[0] + self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) + else: + for i in xrange(0,loc_ny+1): + solution_array[loc_nx,i] = self.in_upper_buffers[0][i] + self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] + t = self.comm.east.send(self.out_upper_buffers[0], **flags) + trackers.append(t) + + + if lower_x_neigh>-1: + msg = self.comm.west.recv(copy=False) + self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[0,:] = self.in_lower_buffers[0] + else: + for i in xrange(0,loc_ny+1): + solution_array[0,i] = self.in_lower_buffers[0][i] + + # communicate in the y-direction afterwards + if lower_y_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) + else: + for i in xrange(0,loc_nx+1): + self.out_lower_buffers[1][i] = solution_array[i,1] + t = self.comm.south.send(self.out_lower_buffers[1], **flags) + trackers.append(t) + + + if upper_y_neigh>-1: + msg = self.comm.north.recv(copy=False) + self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[:,loc_ny] = self.in_upper_buffers[1] + self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) + else: + for i in xrange(0,loc_nx+1): + solution_array[i,loc_ny] = self.in_upper_buffers[1][i] + self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] + t = self.comm.north.send(self.out_upper_buffers[1], **flags) + trackers.append(t) + + if lower_y_neigh>-1: + msg = self.comm.south.recv(copy=False) + self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[:,0] = self.in_lower_buffers[1] + else: + for i in xrange(0,loc_nx+1): + solution_array[i,0] = self.in_lower_buffers[1][i] + + # wait for sends to complete: + if flags['track']: + for t in trackers: + t.wait() + + def update_internal_boundary_send_recv (self, solution_array): + """update the inner boundary, sending first, then recving""" + nsd_ = self.nsd + dtype = solution_array.dtype + if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): + print "Buffers for communicating with lower neighbors not ready" + return + if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): + print "Buffers for communicating with upper neighbors not ready" + return + + loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] + loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] + + lower_x_neigh = self.lower_neighbors[0] + upper_x_neigh = self.upper_neighbors[0] + lower_y_neigh = self.lower_neighbors[1] + upper_y_neigh = self.upper_neighbors[1] + trackers = [] + flags = dict(copy=False, track=False) + + # send in all directions first + if lower_x_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) + else: + for i in xrange(0,loc_ny+1): + self.out_lower_buffers[0][i] = solution_array[1,i] + t = self.comm.west.send(self.out_lower_buffers[0], **flags) + trackers.append(t) + + if lower_y_neigh>-1: + if self.slice_copy: + self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) + else: + for i in xrange(0,loc_nx+1): + self.out_lower_buffers[1][i] = solution_array[i,1] + t = self.comm.south.send(self.out_lower_buffers[1], **flags) + trackers.append(t) + + if upper_x_neigh>-1: + if self.slice_copy: + self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) + else: + for i in xrange(0,loc_ny+1): + self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] + t = self.comm.east.send(self.out_upper_buffers[0], **flags) + trackers.append(t) + + if upper_y_neigh>-1: + if self.slice_copy: + self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) + else: + for i in xrange(0,loc_nx+1): + self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] + t = self.comm.north.send(self.out_upper_buffers[1], **flags) + trackers.append(t) + + + # now start receiving + if upper_x_neigh>-1: + msg = self.comm.east.recv(copy=False) + self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[loc_nx,:] = self.in_upper_buffers[0] + else: + for i in xrange(0,loc_ny+1): + solution_array[loc_nx,i] = self.in_upper_buffers[0][i] + + if lower_x_neigh>-1: + msg = self.comm.west.recv(copy=False) + self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[0,:] = self.in_lower_buffers[0] + else: + for i in xrange(0,loc_ny+1): + solution_array[0,i] = self.in_lower_buffers[0][i] + + if upper_y_neigh>-1: + msg = self.comm.north.recv(copy=False) + self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[:,loc_ny] = self.in_upper_buffers[1] + else: + for i in xrange(0,loc_nx+1): + solution_array[i,loc_ny] = self.in_upper_buffers[1][i] + + if lower_y_neigh>-1: + msg = self.comm.south.recv(copy=False) + self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype) + if self.slice_copy: + solution_array[:,0] = self.in_lower_buffers[1] + else: + for i in xrange(0,loc_nx+1): + solution_array[i,0] = self.in_lower_buffers[1][i] + + # wait for sends to complete: + if flags['track']: + for t in trackers: + t.wait() + + # use send/recv pattern instead of x/y sweeps + update_internal_boundary = update_internal_boundary_send_recv + diff --git a/docs/examples/newparallel/wave2D/communicator.py b/docs/examples/newparallel/wave2D/communicator.py new file mode 100644 index 0000000..9634fcb --- /dev/null +++ b/docs/examples/newparallel/wave2D/communicator.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +"""A simple Communicator class that has N,E,S,W neighbors connected via 0MQ PEER sockets""" + +import socket + +import zmq + +from IPython.zmq.parallel.util import disambiguate_url + +class EngineCommunicator(object): + """An object that connects Engines to each other. + north and west sockets listen, while south and east sockets connect. + + This class is useful in cases where there is a set of nodes that + must communicate only with their nearest neighbors. + """ + + def __init__(self, interface='tcp://*', identity=None): + self._ctx = zmq.Context() + self.north = self._ctx.socket(zmq.PAIR) + self.west = self._ctx.socket(zmq.PAIR) + self.south = self._ctx.socket(zmq.PAIR) + self.east = self._ctx.socket(zmq.PAIR) + + # bind to ports + northport = self.north.bind_to_random_port(interface) + eastport = self.east.bind_to_random_port(interface) + + self.north_url = interface+":%i"%northport + self.east_url = interface+":%i"%eastport + + # guess first public IP from socket + self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] + + def __del__(self): + self.north.close() + self.south.close() + self.east.close() + self.west.close() + self._ctx.term() + + @property + def info(self): + """return the connection info for this object's sockets.""" + return (self.location, self.north_url, self.east_url) + + def connect(self, south_peer=None, west_peer=None): + """connect to peers. `peers` will be a 3-tuples, of the form: + (location, north_addr, east_addr) + as produced by + """ + if south_peer is not None: + location, url, _ = south_peer + self.south.connect(disambiguate_url(url, location)) + if west_peer is not None: + location, _, url = west_peer + self.west.connect(disambiguate_url(url, location)) + + diff --git a/docs/examples/newparallel/wave2D/parallelwave-mpi.py b/docs/examples/newparallel/wave2D/parallelwave-mpi.py new file mode 100755 index 0000000..f952b1f --- /dev/null +++ b/docs/examples/newparallel/wave2D/parallelwave-mpi.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +""" +A simple python program of solving a 2D wave equation in parallel. +Domain partitioning and inter-processor communication +are done by an object of class MPIRectPartitioner2D +(which is a subclass of RectPartitioner2D and uses MPI via mpi4py) + +An example of running the program is (8 processors, 4x2 partition, +400x100 grid cells):: + + $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured) + $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi + +See also parallelwave-mpi, which runs the same program, but uses MPI +(via mpi4py) for the inter-engine communication. + +Authors +------- + + * Xing Cai + * Min Ragan-Kelley + +""" + +import sys +import time + +from numpy import exp, zeros, newaxis, sqrt + +from IPython.external import argparse +from IPython.zmq.parallel.client import Client, Reference + +def setup_partitioner(ns, index, num_procs, gnum_cells, parts): + """create a partitioner in the engine namespace""" + p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) + p.redim(global_num_cells=gnum_cells, num_parts=parts) + p.prepare_communication() + # put the partitioner into the global namespace: + ns.partitioner=p + +def setup_solver(ns, *args, **kwargs): + """create a WaveSolver in the engine namespace""" + ns.solver = WaveSolver(*args, **kwargs) + +def wave_saver(u, x, y, t): + """save the wave log""" + global u_hist + global t_hist + t_hist.append(t) + u_hist.append(1.0*u) + + +# main program: +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + paa = parser.add_argument + paa('--grid', '-g', + type=int, nargs=2, default=[100,100], dest='grid', + help="Cells in the grid, e.g. --grid 100 200") + paa('--partition', '-p', + type=int, nargs=2, default=None, + help="Process partition grid, e.g. --partition 4 2 for 4x2") + paa('-c', + type=float, default=1., + help="Wave speed (I think)") + paa('-Ly', + type=float, default=1., + help="system size (in y)") + paa('-Lx', + type=float, default=1., + help="system size (in x)") + paa('-t', '--tstop', + type=float, default=1., + help="Time units to run") + paa('--profile', + type=unicode, default=u'default', + help="Specify the ipcluster profile for the client to connect to.") + paa('--save', + action='store_true', + help="Add this flag to save the time/wave history during the run.") + paa('--scalar', + action='store_true', + help="Also run with scalar interior implementation, to see vector speedup.") + + ns = parser.parse_args() + # set up arguments + grid = ns.grid + partition = ns.partition + Lx = ns.Lx + Ly = ns.Ly + c = ns.c + tstop = ns.tstop + if ns.save: + user_action = wave_saver + else: + user_action = None + + num_cells = 1.0*(grid[0]-1)*(grid[1]-1) + final_test = True + + # create the Client + rc = Client(profile=ns.profile) + num_procs = len(rc.ids) + + if partition is None: + partition = [1,num_procs] + + assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) + + # functions defining initial/boundary/source conditions + def I(x,y): + from numpy import exp + return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) + def f(x,y,t): + return 0.0 + # from numpy import exp,sin + # return 10*exp(-(x - sin(100*t))**2) + def bc(x,y,t): + return 0.0 + + # initial imports, setup rank + rc[:].execute('\n'.join([ + "from mpi4py import MPI", + "import numpy", + "mpi = MPI.COMM_WORLD", + "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) + + # initialize t_hist/u_hist for saving the state at each step (optional) + rc[:]['t_hist'] = [] + rc[:]['u_hist'] = [] + + # set vector/scalar implementation details + impl = {} + impl['ic'] = 'vectorized' + impl['inner'] = 'scalar' + impl['bc'] = 'vectorized' + + # execute some files so that the classes we need will be defined on the engines: + rc[:].run('RectPartitioner.py') + rc[:].run('wavesolver.py') + + # setup remote partitioner + # note that Reference means that the argument passed to setup_partitioner will be the + # object named 'my_id' in the engine's namespace + rc[:].apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition) + # wait for initial communication to complete + rc[:].execute('mpi.barrier()') + # setup remote solvers + rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + + # lambda for calling solver.solve: + _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) + + if ns.scalar: + impl['inner'] = 'scalar' + # run first with element-wise Python operations for each cell + t0 = time.time() + ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) + if final_test: + # this sum is performed element-wise as results finish + s = sum(ar) + # the L2 norm (RMS) of the result: + norm = sqrt(s/num_cells) + else: + norm = -1 + t1 = time.time() + print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) + + impl['inner'] = 'vectorized' + # setup new solvers + rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + rc[:].execute('mpi.barrier()') + + # run again with numpy vectorized inner-implementation + t0 = time.time() + ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) + if final_test: + # this sum is performed element-wise as results finish + s = sum(ar) + # the L2 norm (RMS) of the result: + norm = sqrt(s/num_cells) + else: + norm = -1 + t1 = time.time() + print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) + + # if ns.save is True, then u_hist stores the history of u as a list + # If the partion scheme is Nx1, then u can be reconstructed via 'gather': + if ns.save and partition[-1] == 1: + import pylab + rc[:].execute('u_last=u_hist[-1]') + # map mpi IDs to IPython IDs, which may not match + ranks = rc[:]['my_id'] + targets = range(len(ranks)) + for idx in range(len(ranks)): + targets[idx] = ranks.index(idx) + u_last = rc[targets].gather('u_last', block=True) + pylab.pcolor(u_last) + pylab.show() diff --git a/docs/examples/newparallel/wave2D/parallelwave.py b/docs/examples/newparallel/wave2D/parallelwave.py new file mode 100755 index 0000000..ae30155 --- /dev/null +++ b/docs/examples/newparallel/wave2D/parallelwave.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +""" +A simple python program of solving a 2D wave equation in parallel. +Domain partitioning and inter-processor communication +are done by an object of class ZMQRectPartitioner2D +(which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) + +An example of running the program is (8 processors, 4x2 partition, +200x200 grid cells):: + + $ ipclusterz start -n 8 # start 8 engines + $ ./parallelwave.py --grid 200 200 --partition 4 2 + +See also parallelwave-mpi, which runs the same program, but uses MPI +(via mpi4py) for the inter-engine communication. + +Authors +------- + + * Xing Cai + * Min Ragan-Kelley + +""" +# +import sys +import time + +from numpy import exp, zeros, newaxis, sqrt + +from IPython.external import argparse +from IPython.zmq.parallel.client import Client, Reference + +def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts): + """create a partitioner in the engine namespace""" + p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) + p.redim(global_num_cells=gnum_cells, num_parts=parts) + p.prepare_communication() + # put the partitioner into the global namespace: + ns.partitioner=p + +def setup_solver(ns, *args, **kwargs): + """create a WaveSolver in the engine namespace.""" + ns.solver = WaveSolver(*args, **kwargs) + +def wave_saver(u, x, y, t): + """save the wave state for each timestep.""" + global u_hist + global t_hist + t_hist.append(t) + u_hist.append(1.0*u) + + +# main program: +if __name__ == '__main__': + + parser = argparse.ArgumentParser() + paa = parser.add_argument + paa('--grid', '-g', + type=int, nargs=2, default=[100,100], dest='grid', + help="Cells in the grid, e.g. --grid 100 200") + paa('--partition', '-p', + type=int, nargs=2, default=None, + help="Process partition grid, e.g. --partition 4 2 for 4x2") + paa('-c', + type=float, default=1., + help="Wave speed (I think)") + paa('-Ly', + type=float, default=1., + help="system size (in y)") + paa('-Lx', + type=float, default=1., + help="system size (in x)") + paa('-t', '--tstop', + type=float, default=1., + help="Time units to run") + paa('--profile', + type=unicode, default=u'default', + help="Specify the ipcluster profile for the client to connect to.") + paa('--save', + action='store_true', + help="Add this flag to save the time/wave history during the run.") + paa('--scalar', + action='store_true', + help="Also run with scalar interior implementation, to see vector speedup.") + + ns = parser.parse_args() + # set up arguments + grid = ns.grid + partition = ns.partition + Lx = ns.Lx + Ly = ns.Ly + c = ns.c + tstop = ns.tstop + if ns.save: + user_action = wave_saver + else: + user_action = None + + num_cells = 1.0*(grid[0]-1)*(grid[1]-1) + final_test = True + + # create the Client + rc = Client(profile=ns.profile) + num_procs = len(rc.ids) + + if partition is None: + partition = [num_procs,1] + + assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) + + # functions defining initial/boundary/source conditions + def I(x,y): + from numpy import exp + return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) + def f(x,y,t): + return 0.0 + # from numpy import exp,sin + # return 10*exp(-(x - sin(100*t))**2) + def bc(x,y,t): + return 0.0 + + # initialize t_hist/u_hist for saving the state at each step (optional) + rc[:]['t_hist'] = [] + rc[:]['u_hist'] = [] + + # set vector/scalar implementation details + impl = {} + impl['ic'] = 'vectorized' + impl['inner'] = 'scalar' + impl['bc'] = 'vectorized' + + # execute some files so that the classes we need will be defined on the engines: + rc[:].execute('import numpy') + rc[:].run('communicator.py') + rc[:].run('RectPartitioner.py') + rc[:].run('wavesolver.py') + + # scatter engine IDs + rc[:].scatter('my_id', rc.ids, flatten=True) + + # create the engine connectors + rc[:].execute('com = EngineCommunicator()') + + # gather the connection information into a single dict + ar = rc[:].apply_async(lambda : com.info) + peers = ar.get_dict() + # print peers + # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators + + # setup remote partitioner + # note that Reference means that the argument passed to setup_partitioner will be the + # object named 'com' in the engine's namespace + rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) + time.sleep(1) + # convenience lambda to call solver.solve: + _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) + + if ns.scalar: + impl['inner'] = 'scalar' + # setup remote solvers + rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) + + # run first with element-wise Python operations for each cell + t0 = time.time() + ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) + if final_test: + # this sum is performed element-wise as results finish + s = sum(ar) + # the L2 norm (RMS) of the result: + norm = sqrt(s/num_cells) + else: + norm = -1 + t1 = time.time() + print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) + + # run again with faster numpy-vectorized inner implementation: + impl['inner'] = 'vectorized' + # setup remote solvers + rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) + + t0 = time.time() + + ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) + if final_test: + # this sum is performed element-wise as results finish + s = sum(ar) + # the L2 norm (RMS) of the result: + norm = sqrt(s/num_cells) + else: + norm = -1 + t1 = time.time() + print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) + + # if ns.save is True, then u_hist stores the history of u as a list + # If the partion scheme is Nx1, then u can be reconstructed via 'gather': + if ns.save and partition[-1] == 1: + import pylab + rc[:].execute('u_last=u_hist[-1]') + u_last = rc[:].gather('u_last', block=True) + pylab.pcolor(u_last) + pylab.show() \ No newline at end of file diff --git a/docs/examples/newparallel/wave2D/wavesolver.py b/docs/examples/newparallel/wave2D/wavesolver.py new file mode 100755 index 0000000..6c806d5 --- /dev/null +++ b/docs/examples/newparallel/wave2D/wavesolver.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python +""" +A simple WaveSolver class for evolving the wave equation in 2D. +This works in parallel by using a RectPartitioner object. + +Authors +------- + + * Xing Cai + * Min Ragan-Kelley + +""" +import time + +from mpi4py import MPI +mpi = MPI.COMM_WORLD +from numpy import exp, zeros, newaxis, sqrt, arange + +def iseq(start=0, stop=None, inc=1): + """ + Generate integers from start to (and including!) stop, + with increment of inc. Alternative to range/xrange. + """ + if stop is None: # allow isequence(3) to be 0, 1, 2, 3 + # take 1st arg as stop, start as 0, and inc=1 + stop = start; start = 0; inc = 1 + return arange(start, stop+inc, inc) + +class WaveSolver(object): + """ + Solve the 2D wave equation u_tt = u_xx + u_yy + f(x,y,t) with + u = bc(x,y,t) on the boundary and initial condition du/dt = 0. + + Parallelization by using a RectPartitioner object 'partitioner' + + nx and ny are the total number of global grid cells in the x and y + directions. The global grid points are numbered as (0,0), (1,0), (2,0), + ..., (nx,0), (0,1), (1,1), ..., (nx, ny). + + dt is the time step. If dt<=0, an optimal time step is used. + + tstop is the stop time for the simulation. + + I, f are functions: I(x,y), f(x,y,t) + + user_action: function of (u, x, y, t) called at each time + level (x and y are one-dimensional coordinate vectors). + This function allows the calling code to plot the solution, + compute errors, etc. + + implementation: a dictionary specifying how the initial + condition ('ic'), the scheme over inner points ('inner'), + and the boundary conditions ('bc') are to be implemented. + Normally, values are legal: 'scalar' or 'vectorized'. + 'scalar' means straight loops over grid points, while + 'vectorized' means special NumPy vectorized operations. + + If a key in the implementation dictionary is missing, it + defaults in this function to 'scalar' (the safest strategy). + Note that if 'vectorized' is specified, the functions I, f, + and bc must work in vectorized mode. It is always recommended + to first run the 'scalar' mode and then compare 'vectorized' + results with the 'scalar' results to check that I, f, and bc + work. + + verbose: true if a message at each time step is written, + false implies no output during the simulation. + + final_test: true means the discrete L2-norm of the final solution is + to be computed. + """ + + def __init__(self, I, f, c, bc, Lx, Ly, partitioner=None, dt=-1, + user_action=None, + implementation={'ic': 'vectorized', # or 'scalar' + 'inner': 'vectorized', + 'bc': 'vectorized'}): + + nx = partitioner.global_num_cells[0] # number of global cells in x dir + ny = partitioner.global_num_cells[1] # number of global cells in y dir + dx = Lx/float(nx) + dy = Ly/float(ny) + loc_nx, loc_ny = partitioner.get_num_loc_cells() + nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead + lo_ix0 = partitioner.subd_lo_ix[0] + lo_ix1 = partitioner.subd_lo_ix[1] + hi_ix0 = partitioner.subd_hi_ix[0] + hi_ix1 = partitioner.subd_hi_ix[1] + x = iseq(dx*lo_ix0, dx*hi_ix0, dx) # local grid points in x dir + y = iseq(dy*lo_ix1, dy*hi_ix1, dy) # local grid points in y dir + self.x = x + self.y = y + xv = x[:,newaxis] # for vectorized expressions with f(xv,yv) + yv = y[newaxis,:] # -- " -- + if dt <= 0: + dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step + Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables + + u = zeros((nx+1,ny+1)) # solution array + u_1 = u.copy() # solution at t-dt + u_2 = u.copy() # solution at t-2*dt + + # preserve for self.solve + implementation=dict(implementation) # copy + + if 'ic' not in implementation: + implementation['ic'] = 'scalar' + if 'bc' not in implementation: + implementation['bc'] = 'scalar' + if 'inner' not in implementation: + implementation['inner'] = 'scalar' + + self.implementation = implementation + self.Lx = Lx + self.Ly = Ly + self.I=I + self.f=f + self.c=c + self.bc=bc + self.user_action = user_action + self.partitioner=partitioner + + # set initial condition (pointwise - allows straight if-tests in I(x,y)): + t=0.0 + if implementation['ic'] == 'scalar': + for i in xrange(0,nx+1): + for j in xrange(0,ny+1): + u_1[i,j] = I(x[i], y[j]) + + for i in xrange(1,nx): + for j in xrange(1,ny): + u_2[i,j] = u_1[i,j] + \ + 0.5*Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \ + 0.5*Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \ + dt2*f(x[i], y[j], 0.0) + + # boundary values of u_2 (equals u(t=dt) due to du/dt=0) + i = 0 + for j in xrange(0,ny+1): + u_2[i,j] = bc(x[i], y[j], t+dt) + j = 0 + for i in xrange(0,nx+1): + u_2[i,j] = bc(x[i], y[j], t+dt) + i = nx + for j in xrange(0,ny+1): + u_2[i,j] = bc(x[i], y[j], t+dt) + j = ny + for i in xrange(0,nx+1): + u_2[i,j] = bc(x[i], y[j], t+dt) + + elif implementation['ic'] == 'vectorized': + u_1 = I(xv,yv) + u_2[1:nx,1:ny] = u_1[1:nx,1:ny] + \ + 0.5*Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \ + 0.5*Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \ + dt2*(f(xv[1:nx,1:ny], yv[1:nx,1:ny], 0.0)) + # boundary values (t=dt): + i = 0; u_2[i,:] = bc(x[i], y, t+dt) + j = 0; u_2[:,j] = bc(x, y[j], t+dt) + i = nx; u_2[i,:] = bc(x[i], y, t+dt) + j = ny; u_2[:,j] = bc(x, y[j], t+dt) + + if user_action is not None: + user_action(u_1, x, y, t) # allow user to plot etc. + # print list(self.us[2][2]) + self.us = (u,u_1,u_2) + + + def solve(self, tstop, dt=-1, user_action=None, verbose=False, final_test=False): + t0=time.time() + f=self.f + c=self.c + bc=self.bc + partitioner = self.partitioner + implementation = self.implementation + nx = partitioner.global_num_cells[0] # number of global cells in x dir + ny = partitioner.global_num_cells[1] # number of global cells in y dir + dx = self.Lx/float(nx) + dy = self.Ly/float(ny) + loc_nx, loc_ny = partitioner.get_num_loc_cells() + nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead + x = self.x + y = self.y + xv = x[:,newaxis] # for vectorized expressions with f(xv,yv) + yv = y[newaxis,:] # -- " -- + if dt <= 0: + dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step + Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables + # id for the four possible neighbor subdomains + lower_x_neigh = partitioner.lower_neighbors[0] + upper_x_neigh = partitioner.upper_neighbors[0] + lower_y_neigh = partitioner.lower_neighbors[1] + upper_y_neigh = partitioner.upper_neighbors[1] + u,u_1,u_2 = self.us + # u_1 = self.u_1 + + t = 0.0 + while t <= tstop: + t_old = t; t += dt + if verbose: + print 'solving (%s version) at t=%g' % \ + (implementation['inner'], t) + # update all inner points: + if implementation['inner'] == 'scalar': + for i in xrange(1, nx): + for j in xrange(1, ny): + u[i,j] = - u_2[i,j] + 2*u_1[i,j] + \ + Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \ + Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \ + dt2*f(x[i], y[j], t_old) + elif implementation['inner'] == 'vectorized': + u[1:nx,1:ny] = - u_2[1:nx,1:ny] + 2*u_1[1:nx,1:ny] + \ + Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \ + Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \ + dt2*f(xv[1:nx,1:ny], yv[1:nx,1:ny], t_old) + + # insert boundary conditions (if there's no neighbor): + if lower_x_neigh < 0: + if implementation['bc'] == 'scalar': + i = 0 + for j in xrange(0, ny+1): + u[i,j] = bc(x[i], y[j], t) + elif implementation['bc'] == 'vectorized': + u[0,:] = bc(x[0], y, t) + if upper_x_neigh < 0: + if implementation['bc'] == 'scalar': + i = nx + for j in xrange(0, ny+1): + u[i,j] = bc(x[i], y[j], t) + elif implementation['bc'] == 'vectorized': + u[nx,:] = bc(x[nx], y, t) + if lower_y_neigh < 0: + if implementation['bc'] == 'scalar': + j = 0 + for i in xrange(0, nx+1): + u[i,j] = bc(x[i], y[j], t) + elif implementation['bc'] == 'vectorized': + u[:,0] = bc(x, y[0], t) + if upper_y_neigh < 0: + if implementation['bc'] == 'scalar': + j = ny + for i in xrange(0, nx+1): + u[i,j] = bc(x[i], y[j], t) + elif implementation['bc'] == 'vectorized': + u[:,ny] = bc(x, y[ny], t) + + # communication + partitioner.update_internal_boundary (u) + + if user_action is not None: + user_action(u, x, y, t) + # update data structures for next step + u_2, u_1, u = u_1, u, u_2 + + t1 = time.time() + print 'my_id=%2d, dt=%g, %s version, slice_copy=%s, net Wtime=%g'\ + %(partitioner.my_id,dt,implementation['inner'],\ + partitioner.slice_copy,t1-t0) + # save the us + self.us = u,u_1,u_2 + # check final results; compute discrete L2-norm of the solution + if final_test: + loc_res = 0.0 + for i in iseq(start=1, stop=nx-1): + for j in iseq(start=1, stop=ny-1): + loc_res += u_1[i,j]**2 + return loc_res + return dt +