##// END OF EJS Templates
fix typo in wave2d examples
MinRK -
Show More
@@ -1,205 +1,205 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A simple python program of solving a 2D wave equation in parallel.
4 4 Domain partitioning and inter-processor communication
5 5 are done by an object of class MPIRectPartitioner2D
6 6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
7 7
8 8 An example of running the program is (8 processors, 4x2 partition,
9 9 400x100 grid cells)::
10 10
11 $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured)
12 $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi
11 $ ipcluster start --engines=MPIExec -n 8 # start 8 engines with mpiexec
12 $ python parallelwave-mpi.py --grid 400 100 --partition 4 2
13 13
14 14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 15 (via mpi4py) for the inter-engine communication.
16 16
17 17 Authors
18 18 -------
19 19
20 20 * Xing Cai
21 21 * Min Ragan-Kelley
22 22
23 23 """
24 24
25 25 import sys
26 26 import time
27 27
28 28 from numpy import exp, zeros, newaxis, sqrt
29 29
30 30 from IPython.external import argparse
31 31 from IPython.parallel import Client, Reference
32 32
33 33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
35 35 global partitioner
36 36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
37 37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 38 p.prepare_communication()
39 39 # put the partitioner into the global namespace:
40 40 partitioner=p
41 41
42 42 def setup_solver(*args, **kwargs):
43 43 """create a WaveSolver in the engine namespace"""
44 44 global solver
45 45 solver = WaveSolver(*args, **kwargs)
46 46
47 47 def wave_saver(u, x, y, t):
48 48 """save the wave log"""
49 49 global u_hist
50 50 global t_hist
51 51 t_hist.append(t)
52 52 u_hist.append(1.0*u)
53 53
54 54
55 55 # main program:
56 56 if __name__ == '__main__':
57 57
58 58 parser = argparse.ArgumentParser()
59 59 paa = parser.add_argument
60 60 paa('--grid', '-g',
61 61 type=int, nargs=2, default=[100,100], dest='grid',
62 62 help="Cells in the grid, e.g. --grid 100 200")
63 63 paa('--partition', '-p',
64 64 type=int, nargs=2, default=None,
65 65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 66 paa('-c',
67 67 type=float, default=1.,
68 68 help="Wave speed (I think)")
69 69 paa('-Ly',
70 70 type=float, default=1.,
71 71 help="system size (in y)")
72 72 paa('-Lx',
73 73 type=float, default=1.,
74 74 help="system size (in x)")
75 75 paa('-t', '--tstop',
76 76 type=float, default=1.,
77 77 help="Time units to run")
78 78 paa('--profile',
79 79 type=unicode, default=u'default',
80 80 help="Specify the ipcluster profile for the client to connect to.")
81 81 paa('--save',
82 82 action='store_true',
83 83 help="Add this flag to save the time/wave history during the run.")
84 84 paa('--scalar',
85 85 action='store_true',
86 86 help="Also run with scalar interior implementation, to see vector speedup.")
87 87
88 88 ns = parser.parse_args()
89 89 # set up arguments
90 90 grid = ns.grid
91 91 partition = ns.partition
92 92 Lx = ns.Lx
93 93 Ly = ns.Ly
94 94 c = ns.c
95 95 tstop = ns.tstop
96 96 if ns.save:
97 97 user_action = wave_saver
98 98 else:
99 99 user_action = None
100 100
101 101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 102 final_test = True
103 103
104 104 # create the Client
105 105 rc = Client(profile=ns.profile)
106 106 num_procs = len(rc.ids)
107 107
108 108 if partition is None:
109 109 partition = [1,num_procs]
110 110
111 111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
112 112
113 113 view = rc[:]
114 114 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
115 115
116 116 # functions defining initial/boundary/source conditions
117 117 def I(x,y):
118 118 from numpy import exp
119 119 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
120 120 def f(x,y,t):
121 121 return 0.0
122 122 # from numpy import exp,sin
123 123 # return 10*exp(-(x - sin(100*t))**2)
124 124 def bc(x,y,t):
125 125 return 0.0
126 126
127 127 # initial imports, setup rank
128 128 view.execute('\n'.join([
129 129 "from mpi4py import MPI",
130 130 "import numpy",
131 131 "mpi = MPI.COMM_WORLD",
132 132 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
133 133
134 134 # initialize t_hist/u_hist for saving the state at each step (optional)
135 135 view['t_hist'] = []
136 136 view['u_hist'] = []
137 137
138 138 # set vector/scalar implementation details
139 139 impl = {}
140 140 impl['ic'] = 'vectorized'
141 141 impl['inner'] = 'scalar'
142 142 impl['bc'] = 'vectorized'
143 143
144 144 # execute some files so that the classes we need will be defined on the engines:
145 145 view.run('RectPartitioner.py')
146 146 view.run('wavesolver.py')
147 147
148 148 # setup remote partitioner
149 149 # note that Reference means that the argument passed to setup_partitioner will be the
150 150 # object named 'my_id' in the engine's namespace
151 151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
152 152 # wait for initial communication to complete
153 153 view.execute('mpi.barrier()')
154 154 # setup remote solvers
155 155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
156 156
157 157 # lambda for calling solver.solve:
158 158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
159 159
160 160 if ns.scalar:
161 161 impl['inner'] = 'scalar'
162 162 # run first with element-wise Python operations for each cell
163 163 t0 = time.time()
164 164 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
165 165 if final_test:
166 166 # this sum is performed element-wise as results finish
167 167 s = sum(ar)
168 168 # the L2 norm (RMS) of the result:
169 169 norm = sqrt(s/num_cells)
170 170 else:
171 171 norm = -1
172 172 t1 = time.time()
173 173 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
174 174
175 175 impl['inner'] = 'vectorized'
176 176 # setup new solvers
177 177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
178 178 view.execute('mpi.barrier()')
179 179
180 180 # run again with numpy vectorized inner-implementation
181 181 t0 = time.time()
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
183 183 if final_test:
184 184 # this sum is performed element-wise as results finish
185 185 s = sum(ar)
186 186 # the L2 norm (RMS) of the result:
187 187 norm = sqrt(s/num_cells)
188 188 else:
189 189 norm = -1
190 190 t1 = time.time()
191 191 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
192 192
193 193 # if ns.save is True, then u_hist stores the history of u as a list
194 194 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
195 195 if ns.save and partition[-1] == 1:
196 196 import pylab
197 197 view.execute('u_last=u_hist[-1]')
198 198 # map mpi IDs to IPython IDs, which may not match
199 199 ranks = view['my_id']
200 200 targets = range(len(ranks))
201 201 for idx in range(len(ranks)):
202 202 targets[idx] = ranks.index(idx)
203 203 u_last = rc[targets].gather('u_last', block=True)
204 204 pylab.pcolor(u_last)
205 205 pylab.show()
@@ -1,209 +1,209 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A simple python program of solving a 2D wave equation in parallel.
4 4 Domain partitioning and inter-processor communication
5 5 are done by an object of class ZMQRectPartitioner2D
6 6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
7 7
8 8 An example of running the program is (8 processors, 4x2 partition,
9 9 200x200 grid cells)::
10 10
11 $ ipclusterz start -n 8 # start 8 engines
12 $ ./parallelwave.py --grid 200 200 --partition 4 2
11 $ ipcluster start -n 8 # start 8 engines
12 $ python parallelwave.py --grid 200 200 --partition 4 2
13 13
14 14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 15 (via mpi4py) for the inter-engine communication.
16 16
17 17 Authors
18 18 -------
19 19
20 20 * Xing Cai
21 21 * Min Ragan-Kelley
22 22
23 23 """
24 24 #
25 25 import sys
26 26 import time
27 27
28 28 from numpy import exp, zeros, newaxis, sqrt
29 29
30 30 from IPython.external import argparse
31 31 from IPython.parallel import Client, Reference
32 32
33 33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
35 35 global partitioner
36 36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
37 37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 38 p.prepare_communication()
39 39 # put the partitioner into the global namespace:
40 40 partitioner=p
41 41
42 42 def setup_solver(*args, **kwargs):
43 43 """create a WaveSolver in the engine namespace."""
44 44 global solver
45 45 solver = WaveSolver(*args, **kwargs)
46 46
47 47 def wave_saver(u, x, y, t):
48 48 """save the wave state for each timestep."""
49 49 global u_hist
50 50 global t_hist
51 51 t_hist.append(t)
52 52 u_hist.append(1.0*u)
53 53
54 54
55 55 # main program:
56 56 if __name__ == '__main__':
57 57
58 58 parser = argparse.ArgumentParser()
59 59 paa = parser.add_argument
60 60 paa('--grid', '-g',
61 61 type=int, nargs=2, default=[100,100], dest='grid',
62 62 help="Cells in the grid, e.g. --grid 100 200")
63 63 paa('--partition', '-p',
64 64 type=int, nargs=2, default=None,
65 65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 66 paa('-c',
67 67 type=float, default=1.,
68 68 help="Wave speed (I think)")
69 69 paa('-Ly',
70 70 type=float, default=1.,
71 71 help="system size (in y)")
72 72 paa('-Lx',
73 73 type=float, default=1.,
74 74 help="system size (in x)")
75 75 paa('-t', '--tstop',
76 76 type=float, default=1.,
77 77 help="Time units to run")
78 78 paa('--profile',
79 79 type=unicode, default=u'default',
80 80 help="Specify the ipcluster profile for the client to connect to.")
81 81 paa('--save',
82 82 action='store_true',
83 83 help="Add this flag to save the time/wave history during the run.")
84 84 paa('--scalar',
85 85 action='store_true',
86 86 help="Also run with scalar interior implementation, to see vector speedup.")
87 87
88 88 ns = parser.parse_args()
89 89 # set up arguments
90 90 grid = ns.grid
91 91 partition = ns.partition
92 92 Lx = ns.Lx
93 93 Ly = ns.Ly
94 94 c = ns.c
95 95 tstop = ns.tstop
96 96 if ns.save:
97 97 user_action = wave_saver
98 98 else:
99 99 user_action = None
100 100
101 101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 102 final_test = True
103 103
104 104 # create the Client
105 105 rc = Client(profile=ns.profile)
106 106 num_procs = len(rc.ids)
107 107
108 108 if partition is None:
109 109 partition = [num_procs,1]
110 110 else:
111 111 num_procs = min(num_procs, partition[0]*partition[1])
112 112
113 113 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
114 114
115 115 # construct the View:
116 116 view = rc[:num_procs]
117 117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
118 118
119 119 # functions defining initial/boundary/source conditions
120 120 def I(x,y):
121 121 from numpy import exp
122 122 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
123 123 def f(x,y,t):
124 124 return 0.0
125 125 # from numpy import exp,sin
126 126 # return 10*exp(-(x - sin(100*t))**2)
127 127 def bc(x,y,t):
128 128 return 0.0
129 129
130 130 # initialize t_hist/u_hist for saving the state at each step (optional)
131 131 view['t_hist'] = []
132 132 view['u_hist'] = []
133 133
134 134 # set vector/scalar implementation details
135 135 impl = {}
136 136 impl['ic'] = 'vectorized'
137 137 impl['inner'] = 'scalar'
138 138 impl['bc'] = 'vectorized'
139 139
140 140 # execute some files so that the classes we need will be defined on the engines:
141 141 view.execute('import numpy')
142 142 view.run('communicator.py')
143 143 view.run('RectPartitioner.py')
144 144 view.run('wavesolver.py')
145 145
146 146 # scatter engine IDs
147 147 view.scatter('my_id', range(num_procs), flatten=True)
148 148
149 149 # create the engine connectors
150 150 view.execute('com = EngineCommunicator()')
151 151
152 152 # gather the connection information into a single dict
153 153 ar = view.apply_async(lambda : com.info)
154 154 peers = ar.get_dict()
155 155 # print peers
156 156 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
157 157
158 158 # setup remote partitioner
159 159 # note that Reference means that the argument passed to setup_partitioner will be the
160 160 # object named 'com' in the engine's namespace
161 161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
162 162 time.sleep(1)
163 163 # convenience lambda to call solver.solve:
164 164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
165 165
166 166 if ns.scalar:
167 167 impl['inner'] = 'scalar'
168 168 # setup remote solvers
169 169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
170 170
171 171 # run first with element-wise Python operations for each cell
172 172 t0 = time.time()
173 173 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
174 174 if final_test:
175 175 # this sum is performed element-wise as results finish
176 176 s = sum(ar)
177 177 # the L2 norm (RMS) of the result:
178 178 norm = sqrt(s/num_cells)
179 179 else:
180 180 norm = -1
181 181 t1 = time.time()
182 182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
183 183
184 184 # run again with faster numpy-vectorized inner implementation:
185 185 impl['inner'] = 'vectorized'
186 186 # setup remote solvers
187 187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
188 188
189 189 t0 = time.time()
190 190
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
192 192 if final_test:
193 193 # this sum is performed element-wise as results finish
194 194 s = sum(ar)
195 195 # the L2 norm (RMS) of the result:
196 196 norm = sqrt(s/num_cells)
197 197 else:
198 198 norm = -1
199 199 t1 = time.time()
200 200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
201 201
202 202 # if ns.save is True, then u_hist stores the history of u as a list
203 203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
204 204 if ns.save and partition[-1] == 1:
205 205 import pylab
206 206 view.execute('u_last=u_hist[-1]')
207 207 u_last = view.gather('u_last', block=True)
208 208 pylab.pcolor(u_last)
209 209 pylab.show() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now