Show More
@@ -1,205 +1,205 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | A simple python program of solving a 2D wave equation in parallel. |
|
3 | A simple python program of solving a 2D wave equation in parallel. | |
4 | Domain partitioning and inter-processor communication |
|
4 | Domain partitioning and inter-processor communication | |
5 | are done by an object of class MPIRectPartitioner2D |
|
5 | are done by an object of class MPIRectPartitioner2D | |
6 | (which is a subclass of RectPartitioner2D and uses MPI via mpi4py) |
|
6 | (which is a subclass of RectPartitioner2D and uses MPI via mpi4py) | |
7 |
|
7 | |||
8 | An example of running the program is (8 processors, 4x2 partition, |
|
8 | An example of running the program is (8 processors, 4x2 partition, | |
9 | 400x100 grid cells):: |
|
9 | 400x100 grid cells):: | |
10 |
|
10 | |||
11 |
$ ipcluster |
|
11 | $ ipcluster start --engines=MPIExec -n 8 # start 8 engines with mpiexec | |
12 |
$ |
|
12 | $ python parallelwave-mpi.py --grid 400 100 --partition 4 2 | |
13 |
|
13 | |||
14 | See also parallelwave-mpi, which runs the same program, but uses MPI |
|
14 | See also parallelwave-mpi, which runs the same program, but uses MPI | |
15 | (via mpi4py) for the inter-engine communication. |
|
15 | (via mpi4py) for the inter-engine communication. | |
16 |
|
16 | |||
17 | Authors |
|
17 | Authors | |
18 | ------- |
|
18 | ------- | |
19 |
|
19 | |||
20 | * Xing Cai |
|
20 | * Xing Cai | |
21 | * Min Ragan-Kelley |
|
21 | * Min Ragan-Kelley | |
22 |
|
22 | |||
23 | """ |
|
23 | """ | |
24 |
|
24 | |||
25 | import sys |
|
25 | import sys | |
26 | import time |
|
26 | import time | |
27 |
|
27 | |||
28 | from numpy import exp, zeros, newaxis, sqrt |
|
28 | from numpy import exp, zeros, newaxis, sqrt | |
29 |
|
29 | |||
30 | from IPython.external import argparse |
|
30 | from IPython.external import argparse | |
31 | from IPython.parallel import Client, Reference |
|
31 | from IPython.parallel import Client, Reference | |
32 |
|
32 | |||
33 | def setup_partitioner(index, num_procs, gnum_cells, parts): |
|
33 | def setup_partitioner(index, num_procs, gnum_cells, parts): | |
34 | """create a partitioner in the engine namespace""" |
|
34 | """create a partitioner in the engine namespace""" | |
35 | global partitioner |
|
35 | global partitioner | |
36 | p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) |
|
36 | p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) | |
37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) | |
38 | p.prepare_communication() |
|
38 | p.prepare_communication() | |
39 | # put the partitioner into the global namespace: |
|
39 | # put the partitioner into the global namespace: | |
40 | partitioner=p |
|
40 | partitioner=p | |
41 |
|
41 | |||
42 | def setup_solver(*args, **kwargs): |
|
42 | def setup_solver(*args, **kwargs): | |
43 | """create a WaveSolver in the engine namespace""" |
|
43 | """create a WaveSolver in the engine namespace""" | |
44 | global solver |
|
44 | global solver | |
45 | solver = WaveSolver(*args, **kwargs) |
|
45 | solver = WaveSolver(*args, **kwargs) | |
46 |
|
46 | |||
47 | def wave_saver(u, x, y, t): |
|
47 | def wave_saver(u, x, y, t): | |
48 | """save the wave log""" |
|
48 | """save the wave log""" | |
49 | global u_hist |
|
49 | global u_hist | |
50 | global t_hist |
|
50 | global t_hist | |
51 | t_hist.append(t) |
|
51 | t_hist.append(t) | |
52 | u_hist.append(1.0*u) |
|
52 | u_hist.append(1.0*u) | |
53 |
|
53 | |||
54 |
|
54 | |||
55 | # main program: |
|
55 | # main program: | |
56 | if __name__ == '__main__': |
|
56 | if __name__ == '__main__': | |
57 |
|
57 | |||
58 | parser = argparse.ArgumentParser() |
|
58 | parser = argparse.ArgumentParser() | |
59 | paa = parser.add_argument |
|
59 | paa = parser.add_argument | |
60 | paa('--grid', '-g', |
|
60 | paa('--grid', '-g', | |
61 | type=int, nargs=2, default=[100,100], dest='grid', |
|
61 | type=int, nargs=2, default=[100,100], dest='grid', | |
62 | help="Cells in the grid, e.g. --grid 100 200") |
|
62 | help="Cells in the grid, e.g. --grid 100 200") | |
63 | paa('--partition', '-p', |
|
63 | paa('--partition', '-p', | |
64 | type=int, nargs=2, default=None, |
|
64 | type=int, nargs=2, default=None, | |
65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") |
|
65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") | |
66 | paa('-c', |
|
66 | paa('-c', | |
67 | type=float, default=1., |
|
67 | type=float, default=1., | |
68 | help="Wave speed (I think)") |
|
68 | help="Wave speed (I think)") | |
69 | paa('-Ly', |
|
69 | paa('-Ly', | |
70 | type=float, default=1., |
|
70 | type=float, default=1., | |
71 | help="system size (in y)") |
|
71 | help="system size (in y)") | |
72 | paa('-Lx', |
|
72 | paa('-Lx', | |
73 | type=float, default=1., |
|
73 | type=float, default=1., | |
74 | help="system size (in x)") |
|
74 | help="system size (in x)") | |
75 | paa('-t', '--tstop', |
|
75 | paa('-t', '--tstop', | |
76 | type=float, default=1., |
|
76 | type=float, default=1., | |
77 | help="Time units to run") |
|
77 | help="Time units to run") | |
78 | paa('--profile', |
|
78 | paa('--profile', | |
79 | type=unicode, default=u'default', |
|
79 | type=unicode, default=u'default', | |
80 | help="Specify the ipcluster profile for the client to connect to.") |
|
80 | help="Specify the ipcluster profile for the client to connect to.") | |
81 | paa('--save', |
|
81 | paa('--save', | |
82 | action='store_true', |
|
82 | action='store_true', | |
83 | help="Add this flag to save the time/wave history during the run.") |
|
83 | help="Add this flag to save the time/wave history during the run.") | |
84 | paa('--scalar', |
|
84 | paa('--scalar', | |
85 | action='store_true', |
|
85 | action='store_true', | |
86 | help="Also run with scalar interior implementation, to see vector speedup.") |
|
86 | help="Also run with scalar interior implementation, to see vector speedup.") | |
87 |
|
87 | |||
88 | ns = parser.parse_args() |
|
88 | ns = parser.parse_args() | |
89 | # set up arguments |
|
89 | # set up arguments | |
90 | grid = ns.grid |
|
90 | grid = ns.grid | |
91 | partition = ns.partition |
|
91 | partition = ns.partition | |
92 | Lx = ns.Lx |
|
92 | Lx = ns.Lx | |
93 | Ly = ns.Ly |
|
93 | Ly = ns.Ly | |
94 | c = ns.c |
|
94 | c = ns.c | |
95 | tstop = ns.tstop |
|
95 | tstop = ns.tstop | |
96 | if ns.save: |
|
96 | if ns.save: | |
97 | user_action = wave_saver |
|
97 | user_action = wave_saver | |
98 | else: |
|
98 | else: | |
99 | user_action = None |
|
99 | user_action = None | |
100 |
|
100 | |||
101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) |
|
101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) | |
102 | final_test = True |
|
102 | final_test = True | |
103 |
|
103 | |||
104 | # create the Client |
|
104 | # create the Client | |
105 | rc = Client(profile=ns.profile) |
|
105 | rc = Client(profile=ns.profile) | |
106 | num_procs = len(rc.ids) |
|
106 | num_procs = len(rc.ids) | |
107 |
|
107 | |||
108 | if partition is None: |
|
108 | if partition is None: | |
109 | partition = [1,num_procs] |
|
109 | partition = [1,num_procs] | |
110 |
|
110 | |||
111 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
111 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |
112 |
|
112 | |||
113 | view = rc[:] |
|
113 | view = rc[:] | |
114 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) |
|
114 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) | |
115 |
|
115 | |||
116 | # functions defining initial/boundary/source conditions |
|
116 | # functions defining initial/boundary/source conditions | |
117 | def I(x,y): |
|
117 | def I(x,y): | |
118 | from numpy import exp |
|
118 | from numpy import exp | |
119 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) |
|
119 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) | |
120 | def f(x,y,t): |
|
120 | def f(x,y,t): | |
121 | return 0.0 |
|
121 | return 0.0 | |
122 | # from numpy import exp,sin |
|
122 | # from numpy import exp,sin | |
123 | # return 10*exp(-(x - sin(100*t))**2) |
|
123 | # return 10*exp(-(x - sin(100*t))**2) | |
124 | def bc(x,y,t): |
|
124 | def bc(x,y,t): | |
125 | return 0.0 |
|
125 | return 0.0 | |
126 |
|
126 | |||
127 | # initial imports, setup rank |
|
127 | # initial imports, setup rank | |
128 | view.execute('\n'.join([ |
|
128 | view.execute('\n'.join([ | |
129 | "from mpi4py import MPI", |
|
129 | "from mpi4py import MPI", | |
130 | "import numpy", |
|
130 | "import numpy", | |
131 | "mpi = MPI.COMM_WORLD", |
|
131 | "mpi = MPI.COMM_WORLD", | |
132 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) |
|
132 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) | |
133 |
|
133 | |||
134 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
134 | # initialize t_hist/u_hist for saving the state at each step (optional) | |
135 | view['t_hist'] = [] |
|
135 | view['t_hist'] = [] | |
136 | view['u_hist'] = [] |
|
136 | view['u_hist'] = [] | |
137 |
|
137 | |||
138 | # set vector/scalar implementation details |
|
138 | # set vector/scalar implementation details | |
139 | impl = {} |
|
139 | impl = {} | |
140 | impl['ic'] = 'vectorized' |
|
140 | impl['ic'] = 'vectorized' | |
141 | impl['inner'] = 'scalar' |
|
141 | impl['inner'] = 'scalar' | |
142 | impl['bc'] = 'vectorized' |
|
142 | impl['bc'] = 'vectorized' | |
143 |
|
143 | |||
144 | # execute some files so that the classes we need will be defined on the engines: |
|
144 | # execute some files so that the classes we need will be defined on the engines: | |
145 | view.run('RectPartitioner.py') |
|
145 | view.run('RectPartitioner.py') | |
146 | view.run('wavesolver.py') |
|
146 | view.run('wavesolver.py') | |
147 |
|
147 | |||
148 | # setup remote partitioner |
|
148 | # setup remote partitioner | |
149 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
149 | # note that Reference means that the argument passed to setup_partitioner will be the | |
150 | # object named 'my_id' in the engine's namespace |
|
150 | # object named 'my_id' in the engine's namespace | |
151 | view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition) |
|
151 | view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition) | |
152 | # wait for initial communication to complete |
|
152 | # wait for initial communication to complete | |
153 | view.execute('mpi.barrier()') |
|
153 | view.execute('mpi.barrier()') | |
154 | # setup remote solvers |
|
154 | # setup remote solvers | |
155 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
155 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
156 |
|
156 | |||
157 | # lambda for calling solver.solve: |
|
157 | # lambda for calling solver.solve: | |
158 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
158 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |
159 |
|
159 | |||
160 | if ns.scalar: |
|
160 | if ns.scalar: | |
161 | impl['inner'] = 'scalar' |
|
161 | impl['inner'] = 'scalar' | |
162 | # run first with element-wise Python operations for each cell |
|
162 | # run first with element-wise Python operations for each cell | |
163 | t0 = time.time() |
|
163 | t0 = time.time() | |
164 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) |
|
164 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
165 | if final_test: |
|
165 | if final_test: | |
166 | # this sum is performed element-wise as results finish |
|
166 | # this sum is performed element-wise as results finish | |
167 | s = sum(ar) |
|
167 | s = sum(ar) | |
168 | # the L2 norm (RMS) of the result: |
|
168 | # the L2 norm (RMS) of the result: | |
169 | norm = sqrt(s/num_cells) |
|
169 | norm = sqrt(s/num_cells) | |
170 | else: |
|
170 | else: | |
171 | norm = -1 |
|
171 | norm = -1 | |
172 | t1 = time.time() |
|
172 | t1 = time.time() | |
173 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
173 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |
174 |
|
174 | |||
175 | impl['inner'] = 'vectorized' |
|
175 | impl['inner'] = 'vectorized' | |
176 | # setup new solvers |
|
176 | # setup new solvers | |
177 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
177 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
178 | view.execute('mpi.barrier()') |
|
178 | view.execute('mpi.barrier()') | |
179 |
|
179 | |||
180 | # run again with numpy vectorized inner-implementation |
|
180 | # run again with numpy vectorized inner-implementation | |
181 | t0 = time.time() |
|
181 | t0 = time.time() | |
182 |
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test) |
|
182 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
183 | if final_test: |
|
183 | if final_test: | |
184 | # this sum is performed element-wise as results finish |
|
184 | # this sum is performed element-wise as results finish | |
185 | s = sum(ar) |
|
185 | s = sum(ar) | |
186 | # the L2 norm (RMS) of the result: |
|
186 | # the L2 norm (RMS) of the result: | |
187 | norm = sqrt(s/num_cells) |
|
187 | norm = sqrt(s/num_cells) | |
188 | else: |
|
188 | else: | |
189 | norm = -1 |
|
189 | norm = -1 | |
190 | t1 = time.time() |
|
190 | t1 = time.time() | |
191 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
191 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |
192 |
|
192 | |||
193 | # if ns.save is True, then u_hist stores the history of u as a list |
|
193 | # if ns.save is True, then u_hist stores the history of u as a list | |
194 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
194 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |
195 | if ns.save and partition[-1] == 1: |
|
195 | if ns.save and partition[-1] == 1: | |
196 | import pylab |
|
196 | import pylab | |
197 | view.execute('u_last=u_hist[-1]') |
|
197 | view.execute('u_last=u_hist[-1]') | |
198 | # map mpi IDs to IPython IDs, which may not match |
|
198 | # map mpi IDs to IPython IDs, which may not match | |
199 | ranks = view['my_id'] |
|
199 | ranks = view['my_id'] | |
200 | targets = range(len(ranks)) |
|
200 | targets = range(len(ranks)) | |
201 | for idx in range(len(ranks)): |
|
201 | for idx in range(len(ranks)): | |
202 | targets[idx] = ranks.index(idx) |
|
202 | targets[idx] = ranks.index(idx) | |
203 | u_last = rc[targets].gather('u_last', block=True) |
|
203 | u_last = rc[targets].gather('u_last', block=True) | |
204 | pylab.pcolor(u_last) |
|
204 | pylab.pcolor(u_last) | |
205 | pylab.show() |
|
205 | pylab.show() |
@@ -1,209 +1,209 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | A simple python program of solving a 2D wave equation in parallel. |
|
3 | A simple python program of solving a 2D wave equation in parallel. | |
4 | Domain partitioning and inter-processor communication |
|
4 | Domain partitioning and inter-processor communication | |
5 | are done by an object of class ZMQRectPartitioner2D |
|
5 | are done by an object of class ZMQRectPartitioner2D | |
6 | (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) |
|
6 | (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) | |
7 |
|
7 | |||
8 | An example of running the program is (8 processors, 4x2 partition, |
|
8 | An example of running the program is (8 processors, 4x2 partition, | |
9 | 200x200 grid cells):: |
|
9 | 200x200 grid cells):: | |
10 |
|
10 | |||
11 |
$ ipcluster |
|
11 | $ ipcluster start -n 8 # start 8 engines | |
12 |
$ |
|
12 | $ python parallelwave.py --grid 200 200 --partition 4 2 | |
13 |
|
13 | |||
14 | See also parallelwave-mpi, which runs the same program, but uses MPI |
|
14 | See also parallelwave-mpi, which runs the same program, but uses MPI | |
15 | (via mpi4py) for the inter-engine communication. |
|
15 | (via mpi4py) for the inter-engine communication. | |
16 |
|
16 | |||
17 | Authors |
|
17 | Authors | |
18 | ------- |
|
18 | ------- | |
19 |
|
19 | |||
20 | * Xing Cai |
|
20 | * Xing Cai | |
21 | * Min Ragan-Kelley |
|
21 | * Min Ragan-Kelley | |
22 |
|
22 | |||
23 | """ |
|
23 | """ | |
24 | # |
|
24 | # | |
25 | import sys |
|
25 | import sys | |
26 | import time |
|
26 | import time | |
27 |
|
27 | |||
28 | from numpy import exp, zeros, newaxis, sqrt |
|
28 | from numpy import exp, zeros, newaxis, sqrt | |
29 |
|
29 | |||
30 | from IPython.external import argparse |
|
30 | from IPython.external import argparse | |
31 | from IPython.parallel import Client, Reference |
|
31 | from IPython.parallel import Client, Reference | |
32 |
|
32 | |||
33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): |
|
33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): | |
34 | """create a partitioner in the engine namespace""" |
|
34 | """create a partitioner in the engine namespace""" | |
35 | global partitioner |
|
35 | global partitioner | |
36 | p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) |
|
36 | p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) | |
37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) |
|
37 | p.redim(global_num_cells=gnum_cells, num_parts=parts) | |
38 | p.prepare_communication() |
|
38 | p.prepare_communication() | |
39 | # put the partitioner into the global namespace: |
|
39 | # put the partitioner into the global namespace: | |
40 | partitioner=p |
|
40 | partitioner=p | |
41 |
|
41 | |||
42 | def setup_solver(*args, **kwargs): |
|
42 | def setup_solver(*args, **kwargs): | |
43 | """create a WaveSolver in the engine namespace.""" |
|
43 | """create a WaveSolver in the engine namespace.""" | |
44 | global solver |
|
44 | global solver | |
45 | solver = WaveSolver(*args, **kwargs) |
|
45 | solver = WaveSolver(*args, **kwargs) | |
46 |
|
46 | |||
47 | def wave_saver(u, x, y, t): |
|
47 | def wave_saver(u, x, y, t): | |
48 | """save the wave state for each timestep.""" |
|
48 | """save the wave state for each timestep.""" | |
49 | global u_hist |
|
49 | global u_hist | |
50 | global t_hist |
|
50 | global t_hist | |
51 | t_hist.append(t) |
|
51 | t_hist.append(t) | |
52 | u_hist.append(1.0*u) |
|
52 | u_hist.append(1.0*u) | |
53 |
|
53 | |||
54 |
|
54 | |||
55 | # main program: |
|
55 | # main program: | |
56 | if __name__ == '__main__': |
|
56 | if __name__ == '__main__': | |
57 |
|
57 | |||
58 | parser = argparse.ArgumentParser() |
|
58 | parser = argparse.ArgumentParser() | |
59 | paa = parser.add_argument |
|
59 | paa = parser.add_argument | |
60 | paa('--grid', '-g', |
|
60 | paa('--grid', '-g', | |
61 | type=int, nargs=2, default=[100,100], dest='grid', |
|
61 | type=int, nargs=2, default=[100,100], dest='grid', | |
62 | help="Cells in the grid, e.g. --grid 100 200") |
|
62 | help="Cells in the grid, e.g. --grid 100 200") | |
63 | paa('--partition', '-p', |
|
63 | paa('--partition', '-p', | |
64 | type=int, nargs=2, default=None, |
|
64 | type=int, nargs=2, default=None, | |
65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") |
|
65 | help="Process partition grid, e.g. --partition 4 2 for 4x2") | |
66 | paa('-c', |
|
66 | paa('-c', | |
67 | type=float, default=1., |
|
67 | type=float, default=1., | |
68 | help="Wave speed (I think)") |
|
68 | help="Wave speed (I think)") | |
69 | paa('-Ly', |
|
69 | paa('-Ly', | |
70 | type=float, default=1., |
|
70 | type=float, default=1., | |
71 | help="system size (in y)") |
|
71 | help="system size (in y)") | |
72 | paa('-Lx', |
|
72 | paa('-Lx', | |
73 | type=float, default=1., |
|
73 | type=float, default=1., | |
74 | help="system size (in x)") |
|
74 | help="system size (in x)") | |
75 | paa('-t', '--tstop', |
|
75 | paa('-t', '--tstop', | |
76 | type=float, default=1., |
|
76 | type=float, default=1., | |
77 | help="Time units to run") |
|
77 | help="Time units to run") | |
78 | paa('--profile', |
|
78 | paa('--profile', | |
79 | type=unicode, default=u'default', |
|
79 | type=unicode, default=u'default', | |
80 | help="Specify the ipcluster profile for the client to connect to.") |
|
80 | help="Specify the ipcluster profile for the client to connect to.") | |
81 | paa('--save', |
|
81 | paa('--save', | |
82 | action='store_true', |
|
82 | action='store_true', | |
83 | help="Add this flag to save the time/wave history during the run.") |
|
83 | help="Add this flag to save the time/wave history during the run.") | |
84 | paa('--scalar', |
|
84 | paa('--scalar', | |
85 | action='store_true', |
|
85 | action='store_true', | |
86 | help="Also run with scalar interior implementation, to see vector speedup.") |
|
86 | help="Also run with scalar interior implementation, to see vector speedup.") | |
87 |
|
87 | |||
88 | ns = parser.parse_args() |
|
88 | ns = parser.parse_args() | |
89 | # set up arguments |
|
89 | # set up arguments | |
90 | grid = ns.grid |
|
90 | grid = ns.grid | |
91 | partition = ns.partition |
|
91 | partition = ns.partition | |
92 | Lx = ns.Lx |
|
92 | Lx = ns.Lx | |
93 | Ly = ns.Ly |
|
93 | Ly = ns.Ly | |
94 | c = ns.c |
|
94 | c = ns.c | |
95 | tstop = ns.tstop |
|
95 | tstop = ns.tstop | |
96 | if ns.save: |
|
96 | if ns.save: | |
97 | user_action = wave_saver |
|
97 | user_action = wave_saver | |
98 | else: |
|
98 | else: | |
99 | user_action = None |
|
99 | user_action = None | |
100 |
|
100 | |||
101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) |
|
101 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) | |
102 | final_test = True |
|
102 | final_test = True | |
103 |
|
103 | |||
104 | # create the Client |
|
104 | # create the Client | |
105 | rc = Client(profile=ns.profile) |
|
105 | rc = Client(profile=ns.profile) | |
106 | num_procs = len(rc.ids) |
|
106 | num_procs = len(rc.ids) | |
107 |
|
107 | |||
108 | if partition is None: |
|
108 | if partition is None: | |
109 | partition = [num_procs,1] |
|
109 | partition = [num_procs,1] | |
110 | else: |
|
110 | else: | |
111 | num_procs = min(num_procs, partition[0]*partition[1]) |
|
111 | num_procs = min(num_procs, partition[0]*partition[1]) | |
112 |
|
112 | |||
113 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
113 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |
114 |
|
114 | |||
115 | # construct the View: |
|
115 | # construct the View: | |
116 | view = rc[:num_procs] |
|
116 | view = rc[:num_procs] | |
117 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) |
|
117 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) | |
118 |
|
118 | |||
119 | # functions defining initial/boundary/source conditions |
|
119 | # functions defining initial/boundary/source conditions | |
120 | def I(x,y): |
|
120 | def I(x,y): | |
121 | from numpy import exp |
|
121 | from numpy import exp | |
122 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) |
|
122 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) | |
123 | def f(x,y,t): |
|
123 | def f(x,y,t): | |
124 | return 0.0 |
|
124 | return 0.0 | |
125 | # from numpy import exp,sin |
|
125 | # from numpy import exp,sin | |
126 | # return 10*exp(-(x - sin(100*t))**2) |
|
126 | # return 10*exp(-(x - sin(100*t))**2) | |
127 | def bc(x,y,t): |
|
127 | def bc(x,y,t): | |
128 | return 0.0 |
|
128 | return 0.0 | |
129 |
|
129 | |||
130 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
130 | # initialize t_hist/u_hist for saving the state at each step (optional) | |
131 | view['t_hist'] = [] |
|
131 | view['t_hist'] = [] | |
132 | view['u_hist'] = [] |
|
132 | view['u_hist'] = [] | |
133 |
|
133 | |||
134 | # set vector/scalar implementation details |
|
134 | # set vector/scalar implementation details | |
135 | impl = {} |
|
135 | impl = {} | |
136 | impl['ic'] = 'vectorized' |
|
136 | impl['ic'] = 'vectorized' | |
137 | impl['inner'] = 'scalar' |
|
137 | impl['inner'] = 'scalar' | |
138 | impl['bc'] = 'vectorized' |
|
138 | impl['bc'] = 'vectorized' | |
139 |
|
139 | |||
140 | # execute some files so that the classes we need will be defined on the engines: |
|
140 | # execute some files so that the classes we need will be defined on the engines: | |
141 | view.execute('import numpy') |
|
141 | view.execute('import numpy') | |
142 | view.run('communicator.py') |
|
142 | view.run('communicator.py') | |
143 | view.run('RectPartitioner.py') |
|
143 | view.run('RectPartitioner.py') | |
144 | view.run('wavesolver.py') |
|
144 | view.run('wavesolver.py') | |
145 |
|
145 | |||
146 | # scatter engine IDs |
|
146 | # scatter engine IDs | |
147 | view.scatter('my_id', range(num_procs), flatten=True) |
|
147 | view.scatter('my_id', range(num_procs), flatten=True) | |
148 |
|
148 | |||
149 | # create the engine connectors |
|
149 | # create the engine connectors | |
150 | view.execute('com = EngineCommunicator()') |
|
150 | view.execute('com = EngineCommunicator()') | |
151 |
|
151 | |||
152 | # gather the connection information into a single dict |
|
152 | # gather the connection information into a single dict | |
153 | ar = view.apply_async(lambda : com.info) |
|
153 | ar = view.apply_async(lambda : com.info) | |
154 | peers = ar.get_dict() |
|
154 | peers = ar.get_dict() | |
155 | # print peers |
|
155 | # print peers | |
156 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators |
|
156 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
157 |
|
157 | |||
158 | # setup remote partitioner |
|
158 | # setup remote partitioner | |
159 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
159 | # note that Reference means that the argument passed to setup_partitioner will be the | |
160 | # object named 'com' in the engine's namespace |
|
160 | # object named 'com' in the engine's namespace | |
161 | view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) |
|
161 | view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) | |
162 | time.sleep(1) |
|
162 | time.sleep(1) | |
163 | # convenience lambda to call solver.solve: |
|
163 | # convenience lambda to call solver.solve: | |
164 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
164 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |
165 |
|
165 | |||
166 | if ns.scalar: |
|
166 | if ns.scalar: | |
167 | impl['inner'] = 'scalar' |
|
167 | impl['inner'] = 'scalar' | |
168 | # setup remote solvers |
|
168 | # setup remote solvers | |
169 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
169 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
170 |
|
170 | |||
171 | # run first with element-wise Python operations for each cell |
|
171 | # run first with element-wise Python operations for each cell | |
172 | t0 = time.time() |
|
172 | t0 = time.time() | |
173 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) |
|
173 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
174 | if final_test: |
|
174 | if final_test: | |
175 | # this sum is performed element-wise as results finish |
|
175 | # this sum is performed element-wise as results finish | |
176 | s = sum(ar) |
|
176 | s = sum(ar) | |
177 | # the L2 norm (RMS) of the result: |
|
177 | # the L2 norm (RMS) of the result: | |
178 | norm = sqrt(s/num_cells) |
|
178 | norm = sqrt(s/num_cells) | |
179 | else: |
|
179 | else: | |
180 | norm = -1 |
|
180 | norm = -1 | |
181 | t1 = time.time() |
|
181 | t1 = time.time() | |
182 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
182 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |
183 |
|
183 | |||
184 | # run again with faster numpy-vectorized inner implementation: |
|
184 | # run again with faster numpy-vectorized inner implementation: | |
185 | impl['inner'] = 'vectorized' |
|
185 | impl['inner'] = 'vectorized' | |
186 | # setup remote solvers |
|
186 | # setup remote solvers | |
187 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) |
|
187 | view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
188 |
|
188 | |||
189 | t0 = time.time() |
|
189 | t0 = time.time() | |
190 |
|
190 | |||
191 |
ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test) |
|
191 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
192 | if final_test: |
|
192 | if final_test: | |
193 | # this sum is performed element-wise as results finish |
|
193 | # this sum is performed element-wise as results finish | |
194 | s = sum(ar) |
|
194 | s = sum(ar) | |
195 | # the L2 norm (RMS) of the result: |
|
195 | # the L2 norm (RMS) of the result: | |
196 | norm = sqrt(s/num_cells) |
|
196 | norm = sqrt(s/num_cells) | |
197 | else: |
|
197 | else: | |
198 | norm = -1 |
|
198 | norm = -1 | |
199 | t1 = time.time() |
|
199 | t1 = time.time() | |
200 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) |
|
200 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |
201 |
|
201 | |||
202 | # if ns.save is True, then u_hist stores the history of u as a list |
|
202 | # if ns.save is True, then u_hist stores the history of u as a list | |
203 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
203 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |
204 | if ns.save and partition[-1] == 1: |
|
204 | if ns.save and partition[-1] == 1: | |
205 | import pylab |
|
205 | import pylab | |
206 | view.execute('u_last=u_hist[-1]') |
|
206 | view.execute('u_last=u_hist[-1]') | |
207 | u_last = view.gather('u_last', block=True) |
|
207 | u_last = view.gather('u_last', block=True) | |
208 | pylab.pcolor(u_last) |
|
208 | pylab.pcolor(u_last) | |
209 | pylab.show() No newline at end of file |
|
209 | pylab.show() |
General Comments 0
You need to be logged in to leave comments.
Login now