Show More
@@ -1,205 +1,205 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """ |
|
3 | 3 | A simple python program of solving a 2D wave equation in parallel. |
|
4 | 4 | Domain partitioning and inter-processor communication |
|
5 | 5 | are done by an object of class MPIRectPartitioner2D |
|
6 | 6 | (which is a subclass of RectPartitioner2D and uses MPI via mpi4py) |
|
7 | 7 | |
|
8 | 8 | An example of running the program is (8 processors, 4x2 partition, |
|
9 | 9 | 400x100 grid cells):: |
|
10 | 10 | |
|
11 |
$ ipcluster |
|
|
12 |
$ |
|
|
11 | $ ipcluster start --engines=MPIExec -n 8 # start 8 engines with mpiexec | |
|
12 | $ python parallelwave-mpi.py --grid 400 100 --partition 4 2 | |
|
13 | 13 | |
|
14 | 14 | See also parallelwave-mpi, which runs the same program, but uses MPI |
|
15 | 15 | (via mpi4py) for the inter-engine communication. |
|
16 | 16 | |
|
17 | 17 | Authors |
|
18 | 18 | ------- |
|
19 | 19 | |
|
20 | 20 | * Xing Cai |
|
21 | 21 | * Min Ragan-Kelley |
|
22 | 22 | |
|
23 | 23 | """ |
|
24 | 24 | |
|
25 | 25 | import sys |
|
26 | 26 | import time |
|
27 | 27 | |
|
28 | 28 | from numpy import exp, zeros, newaxis, sqrt |
|
29 | 29 | |
|
30 | 30 | from IPython.external import argparse |
|
31 | 31 | from IPython.parallel import Client, Reference |
|
32 | 32 | |
|
33 | 33 | def setup_partitioner(index, num_procs, gnum_cells, parts): |
|
34 | 34 | """create a partitioner in the engine namespace""" |
|
35 | 35 | global partitioner |
|
36 | 36 | p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) |
|
37 | 37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
38 | 38 | p.prepare_communication() |
|
39 | 39 | # put the partitioner into the global namespace: |
|
40 | 40 | partitioner=p |
|
41 | 41 | |
|
42 | 42 | def setup_solver(*args, **kwargs): |
|
43 | 43 | """create a WaveSolver in the engine namespace""" |
|
44 | 44 | global solver |
|
45 | 45 | solver = WaveSolver(*args, **kwargs) |
|
46 | 46 | |
|
47 | 47 | def wave_saver(u, x, y, t): |
|
48 | 48 | """save the wave log""" |
|
49 | 49 | global u_hist |
|
50 | 50 | global t_hist |
|
51 | 51 | t_hist.append(t) |
|
52 | 52 | u_hist.append(1.0*u) |
|
53 | 53 | |
|
54 | 54 | |
|
55 | 55 | # main program: |
|
56 | 56 | if __name__ == '__main__': |
|
57 | 57 | |
|
58 | 58 | parser = argparse.ArgumentParser() |
|
59 | 59 | paa = parser.add_argument |
|
60 | 60 | paa('--grid', '-g', |
|
61 | 61 | type=int, nargs=2, default=[100,100], dest='grid', |
|
62 | 62 | help="Cells in the grid, e.g. --grid 100 200") |
|
63 | 63 | paa('--partition', '-p', |
|
64 | 64 | type=int, nargs=2, default=None, |
|
65 | 65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") |
|
66 | 66 | paa('-c', |
|
67 | 67 | type=float, default=1., |
|
68 | 68 | help="Wave speed (I think)") |
|
69 | 69 | paa('-Ly', |
|
70 | 70 | type=float, default=1., |
|
71 | 71 | help="system size (in y)") |
|
72 | 72 | paa('-Lx', |
|
73 | 73 | type=float, default=1., |
|
74 | 74 | help="system size (in x)") |
|
75 | 75 | paa('-t', '--tstop', |
|
76 | 76 | type=float, default=1., |
|
77 | 77 | help="Time units to run") |
|
78 | 78 | paa('--profile', |
|
79 | 79 | type=unicode, default=u'default', |
|
80 | 80 | help="Specify the ipcluster profile for the client to connect to.") |
|
81 | 81 | paa('--save', |
|
82 | 82 | action='store_true', |
|
83 | 83 | help="Add this flag to save the time/wave history during the run.") |
|
84 | 84 | paa('--scalar', |
|
85 | 85 | action='store_true', |
|
86 | 86 | help="Also run with scalar interior implementation, to see vector speedup.") |
|
87 | 87 | |
|
88 | 88 | ns = parser.parse_args() |
|
89 | 89 | # set up arguments |
|
90 | 90 | grid = ns.grid |
|
91 | 91 | partition = ns.partition |
|
92 | 92 | Lx = ns.Lx |
|
93 | 93 | Ly = ns.Ly |
|
94 | 94 | c = ns.c |
|
95 | 95 | tstop = ns.tstop |
|
96 | 96 | if ns.save: |
|
97 | 97 | user_action = wave_saver |
|
98 | 98 | else: |
|
99 | 99 | user_action = None |
|
100 | 100 | |
|
101 | 101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) |
|
102 | 102 | final_test = True |
|
103 | 103 | |
|
104 | 104 | # create the Client |
|
105 | 105 | rc = Client(profile=ns.profile) |
|
106 | 106 | num_procs = len(rc.ids) |
|
107 | 107 | |
|
108 | 108 | if partition is None: |
|
109 | 109 | partition = [1,num_procs] |
|
110 | 110 | |
|
111 | 111 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
112 | 112 | |
|
113 | 113 | view = rc[:] |
|
114 | 114 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) |
|
115 | 115 | |
|
116 | 116 | # functions defining initial/boundary/source conditions |
|
117 | 117 | def I(x,y): |
|
118 | 118 | from numpy import exp |
|
119 | 119 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) |
|
120 | 120 | def f(x,y,t): |
|
121 | 121 | return 0.0 |
|
122 | 122 | # from numpy import exp,sin |
|
123 | 123 | # return 10*exp(-(x - sin(100*t))**2) |
|
124 | 124 | def bc(x,y,t): |
|
125 | 125 | return 0.0 |
|
126 | 126 | |
|
127 | 127 | # initial imports, setup rank |
|
128 | 128 | view.execute('\n'.join([ |
|
129 | 129 | "from mpi4py import MPI", |
|
130 | 130 | "import numpy", |
|
131 | 131 | "mpi = MPI.COMM_WORLD", |
|
132 | 132 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) |
|
133 | 133 | |
|
134 | 134 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
135 | 135 | view['t_hist'] = [] |
|
136 | 136 | view['u_hist'] = [] |
|
137 | 137 | |
|
138 | 138 | # set vector/scalar implementation details |
|
139 | 139 | impl = {} |
|
140 | 140 | impl['ic'] = 'vectorized' |
|
141 | 141 | impl['inner'] = 'scalar' |
|
142 | 142 | impl['bc'] = 'vectorized' |
|
143 | 143 | |
|
144 | 144 | # execute some files so that the classes we need will be defined on the engines: |
|
145 | 145 | view.run('RectPartitioner.py') |
|
146 | 146 | view.run('wavesolver.py') |
|
147 | 147 | |
|
148 | 148 | # setup remote partitioner |
|
149 | 149 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
150 | 150 | # object named 'my_id' in the engine's namespace |
|
151 | 151 | view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition) |
|
152 | 152 | # wait for initial communication to complete |
|
153 | 153 | view.execute('mpi.barrier()') |
|
154 | 154 | # setup remote solvers |
|
155 | 155 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
156 | 156 | |
|
157 | 157 | # lambda for calling solver.solve: |
|
158 | 158 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
159 | 159 | |
|
160 | 160 | if ns.scalar: |
|
161 | 161 | impl['inner'] = 'scalar' |
|
162 | 162 | # run first with element-wise Python operations for each cell |
|
163 | 163 | t0 = time.time() |
|
164 | 164 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) |
|
165 | 165 | if final_test: |
|
166 | 166 | # this sum is performed element-wise as results finish |
|
167 | 167 | s = sum(ar) |
|
168 | 168 | # the L2 norm (RMS) of the result: |
|
169 | 169 | norm = sqrt(s/num_cells) |
|
170 | 170 | else: |
|
171 | 171 | norm = -1 |
|
172 | 172 | t1 = time.time() |
|
173 | 173 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
174 | 174 | |
|
175 | 175 | impl['inner'] = 'vectorized' |
|
176 | 176 | # setup new solvers |
|
177 | 177 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
178 | 178 | view.execute('mpi.barrier()') |
|
179 | 179 | |
|
180 | 180 | # run again with numpy vectorized inner-implementation |
|
181 | 181 | t0 = time.time() |
|
182 |
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test) |
|
|
182 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
|
183 | 183 | if final_test: |
|
184 | 184 | # this sum is performed element-wise as results finish |
|
185 | 185 | s = sum(ar) |
|
186 | 186 | # the L2 norm (RMS) of the result: |
|
187 | 187 | norm = sqrt(s/num_cells) |
|
188 | 188 | else: |
|
189 | 189 | norm = -1 |
|
190 | 190 | t1 = time.time() |
|
191 | 191 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
192 | 192 | |
|
193 | 193 | # if ns.save is True, then u_hist stores the history of u as a list |
|
194 | 194 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
195 | 195 | if ns.save and partition[-1] == 1: |
|
196 | 196 | import pylab |
|
197 | 197 | view.execute('u_last=u_hist[-1]') |
|
198 | 198 | # map mpi IDs to IPython IDs, which may not match |
|
199 | 199 | ranks = view['my_id'] |
|
200 | 200 | targets = range(len(ranks)) |
|
201 | 201 | for idx in range(len(ranks)): |
|
202 | 202 | targets[idx] = ranks.index(idx) |
|
203 | 203 | u_last = rc[targets].gather('u_last', block=True) |
|
204 | 204 | pylab.pcolor(u_last) |
|
205 | 205 | pylab.show() |
@@ -1,209 +1,209 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """ |
|
3 | 3 | A simple python program of solving a 2D wave equation in parallel. |
|
4 | 4 | Domain partitioning and inter-processor communication |
|
5 | 5 | are done by an object of class ZMQRectPartitioner2D |
|
6 | 6 | (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) |
|
7 | 7 | |
|
8 | 8 | An example of running the program is (8 processors, 4x2 partition, |
|
9 | 9 | 200x200 grid cells):: |
|
10 | 10 | |
|
11 |
$ ipcluster |
|
|
12 |
$ |
|
|
11 | $ ipcluster start -n 8 # start 8 engines | |
|
12 | $ python parallelwave.py --grid 200 200 --partition 4 2 | |
|
13 | 13 | |
|
14 | 14 | See also parallelwave-mpi, which runs the same program, but uses MPI |
|
15 | 15 | (via mpi4py) for the inter-engine communication. |
|
16 | 16 | |
|
17 | 17 | Authors |
|
18 | 18 | ------- |
|
19 | 19 | |
|
20 | 20 | * Xing Cai |
|
21 | 21 | * Min Ragan-Kelley |
|
22 | 22 | |
|
23 | 23 | """ |
|
24 | 24 | # |
|
25 | 25 | import sys |
|
26 | 26 | import time |
|
27 | 27 | |
|
28 | 28 | from numpy import exp, zeros, newaxis, sqrt |
|
29 | 29 | |
|
30 | 30 | from IPython.external import argparse |
|
31 | 31 | from IPython.parallel import Client, Reference |
|
32 | 32 | |
|
33 | 33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): |
|
34 | 34 | """create a partitioner in the engine namespace""" |
|
35 | 35 | global partitioner |
|
36 | 36 | p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) |
|
37 | 37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
38 | 38 | p.prepare_communication() |
|
39 | 39 | # put the partitioner into the global namespace: |
|
40 | 40 | partitioner=p |
|
41 | 41 | |
|
42 | 42 | def setup_solver(*args, **kwargs): |
|
43 | 43 | """create a WaveSolver in the engine namespace.""" |
|
44 | 44 | global solver |
|
45 | 45 | solver = WaveSolver(*args, **kwargs) |
|
46 | 46 | |
|
47 | 47 | def wave_saver(u, x, y, t): |
|
48 | 48 | """save the wave state for each timestep.""" |
|
49 | 49 | global u_hist |
|
50 | 50 | global t_hist |
|
51 | 51 | t_hist.append(t) |
|
52 | 52 | u_hist.append(1.0*u) |
|
53 | 53 | |
|
54 | 54 | |
|
55 | 55 | # main program: |
|
56 | 56 | if __name__ == '__main__': |
|
57 | 57 | |
|
58 | 58 | parser = argparse.ArgumentParser() |
|
59 | 59 | paa = parser.add_argument |
|
60 | 60 | paa('--grid', '-g', |
|
61 | 61 | type=int, nargs=2, default=[100,100], dest='grid', |
|
62 | 62 | help="Cells in the grid, e.g. --grid 100 200") |
|
63 | 63 | paa('--partition', '-p', |
|
64 | 64 | type=int, nargs=2, default=None, |
|
65 | 65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") |
|
66 | 66 | paa('-c', |
|
67 | 67 | type=float, default=1., |
|
68 | 68 | help="Wave speed (I think)") |
|
69 | 69 | paa('-Ly', |
|
70 | 70 | type=float, default=1., |
|
71 | 71 | help="system size (in y)") |
|
72 | 72 | paa('-Lx', |
|
73 | 73 | type=float, default=1., |
|
74 | 74 | help="system size (in x)") |
|
75 | 75 | paa('-t', '--tstop', |
|
76 | 76 | type=float, default=1., |
|
77 | 77 | help="Time units to run") |
|
78 | 78 | paa('--profile', |
|
79 | 79 | type=unicode, default=u'default', |
|
80 | 80 | help="Specify the ipcluster profile for the client to connect to.") |
|
81 | 81 | paa('--save', |
|
82 | 82 | action='store_true', |
|
83 | 83 | help="Add this flag to save the time/wave history during the run.") |
|
84 | 84 | paa('--scalar', |
|
85 | 85 | action='store_true', |
|
86 | 86 | help="Also run with scalar interior implementation, to see vector speedup.") |
|
87 | 87 | |
|
88 | 88 | ns = parser.parse_args() |
|
89 | 89 | # set up arguments |
|
90 | 90 | grid = ns.grid |
|
91 | 91 | partition = ns.partition |
|
92 | 92 | Lx = ns.Lx |
|
93 | 93 | Ly = ns.Ly |
|
94 | 94 | c = ns.c |
|
95 | 95 | tstop = ns.tstop |
|
96 | 96 | if ns.save: |
|
97 | 97 | user_action = wave_saver |
|
98 | 98 | else: |
|
99 | 99 | user_action = None |
|
100 | 100 | |
|
101 | 101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) |
|
102 | 102 | final_test = True |
|
103 | 103 | |
|
104 | 104 | # create the Client |
|
105 | 105 | rc = Client(profile=ns.profile) |
|
106 | 106 | num_procs = len(rc.ids) |
|
107 | 107 | |
|
108 | 108 | if partition is None: |
|
109 | 109 | partition = [num_procs,1] |
|
110 | 110 | else: |
|
111 | 111 | num_procs = min(num_procs, partition[0]*partition[1]) |
|
112 | 112 | |
|
113 | 113 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
114 | 114 | |
|
115 | 115 | # construct the View: |
|
116 | 116 | view = rc[:num_procs] |
|
117 | 117 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) |
|
118 | 118 | |
|
119 | 119 | # functions defining initial/boundary/source conditions |
|
120 | 120 | def I(x,y): |
|
121 | 121 | from numpy import exp |
|
122 | 122 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) |
|
123 | 123 | def f(x,y,t): |
|
124 | 124 | return 0.0 |
|
125 | 125 | # from numpy import exp,sin |
|
126 | 126 | # return 10*exp(-(x - sin(100*t))**2) |
|
127 | 127 | def bc(x,y,t): |
|
128 | 128 | return 0.0 |
|
129 | 129 | |
|
130 | 130 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
131 | 131 | view['t_hist'] = [] |
|
132 | 132 | view['u_hist'] = [] |
|
133 | 133 | |
|
134 | 134 | # set vector/scalar implementation details |
|
135 | 135 | impl = {} |
|
136 | 136 | impl['ic'] = 'vectorized' |
|
137 | 137 | impl['inner'] = 'scalar' |
|
138 | 138 | impl['bc'] = 'vectorized' |
|
139 | 139 | |
|
140 | 140 | # execute some files so that the classes we need will be defined on the engines: |
|
141 | 141 | view.execute('import numpy') |
|
142 | 142 | view.run('communicator.py') |
|
143 | 143 | view.run('RectPartitioner.py') |
|
144 | 144 | view.run('wavesolver.py') |
|
145 | 145 | |
|
146 | 146 | # scatter engine IDs |
|
147 | 147 | view.scatter('my_id', range(num_procs), flatten=True) |
|
148 | 148 | |
|
149 | 149 | # create the engine connectors |
|
150 | 150 | view.execute('com = EngineCommunicator()') |
|
151 | 151 | |
|
152 | 152 | # gather the connection information into a single dict |
|
153 | 153 | ar = view.apply_async(lambda : com.info) |
|
154 | 154 | peers = ar.get_dict() |
|
155 | 155 | # print peers |
|
156 | 156 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators |
|
157 | 157 | |
|
158 | 158 | # setup remote partitioner |
|
159 | 159 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
160 | 160 | # object named 'com' in the engine's namespace |
|
161 | 161 | view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) |
|
162 | 162 | time.sleep(1) |
|
163 | 163 | # convenience lambda to call solver.solve: |
|
164 | 164 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
165 | 165 | |
|
166 | 166 | if ns.scalar: |
|
167 | 167 | impl['inner'] = 'scalar' |
|
168 | 168 | # setup remote solvers |
|
169 | 169 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
170 | 170 | |
|
171 | 171 | # run first with element-wise Python operations for each cell |
|
172 | 172 | t0 = time.time() |
|
173 | 173 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) |
|
174 | 174 | if final_test: |
|
175 | 175 | # this sum is performed element-wise as results finish |
|
176 | 176 | s = sum(ar) |
|
177 | 177 | # the L2 norm (RMS) of the result: |
|
178 | 178 | norm = sqrt(s/num_cells) |
|
179 | 179 | else: |
|
180 | 180 | norm = -1 |
|
181 | 181 | t1 = time.time() |
|
182 | 182 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
183 | 183 | |
|
184 | 184 | # run again with faster numpy-vectorized inner implementation: |
|
185 | 185 | impl['inner'] = 'vectorized' |
|
186 | 186 | # setup remote solvers |
|
187 | 187 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
188 | 188 | |
|
189 | 189 | t0 = time.time() |
|
190 | 190 | |
|
191 |
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test) |
|
|
191 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
|
192 | 192 | if final_test: |
|
193 | 193 | # this sum is performed element-wise as results finish |
|
194 | 194 | s = sum(ar) |
|
195 | 195 | # the L2 norm (RMS) of the result: |
|
196 | 196 | norm = sqrt(s/num_cells) |
|
197 | 197 | else: |
|
198 | 198 | norm = -1 |
|
199 | 199 | t1 = time.time() |
|
200 | 200 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
201 | 201 | |
|
202 | 202 | # if ns.save is True, then u_hist stores the history of u as a list |
|
203 | 203 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
204 | 204 | if ns.save and partition[-1] == 1: |
|
205 | 205 | import pylab |
|
206 | 206 | view.execute('u_last=u_hist[-1]') |
|
207 | 207 | u_last = view.gather('u_last', block=True) |
|
208 | 208 | pylab.pcolor(u_last) |
|
209 | 209 | pylab.show() No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now