##// END OF EJS Templates
fix typo in wave2d examples
MinRK -
Show More
@@ -1,205 +1,205 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A simple python program of solving a 2D wave equation in parallel.
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
4 Domain partitioning and inter-processor communication
5 are done by an object of class MPIRectPartitioner2D
5 are done by an object of class MPIRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
7
7
8 An example of running the program is (8 processors, 4x2 partition,
8 An example of running the program is (8 processors, 4x2 partition,
9 400x100 grid cells)::
9 400x100 grid cells)::
10
10
11 $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured)
11 $ ipcluster start --engines=MPIExec -n 8 # start 8 engines with mpiexec
12 $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi
12 $ python parallelwave-mpi.py --grid 400 100 --partition 4 2
13
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
15 (via mpi4py) for the inter-engine communication.
16
16
17 Authors
17 Authors
18 -------
18 -------
19
19
20 * Xing Cai
20 * Xing Cai
21 * Min Ragan-Kelley
21 * Min Ragan-Kelley
22
22
23 """
23 """
24
24
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 global partitioner
36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 p.prepare_communication()
38 p.prepare_communication()
39 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
40 partitioner=p
40 partitioner=p
41
41
42 def setup_solver(*args, **kwargs):
42 def setup_solver(*args, **kwargs):
43 """create a WaveSolver in the engine namespace"""
43 """create a WaveSolver in the engine namespace"""
44 global solver
44 global solver
45 solver = WaveSolver(*args, **kwargs)
45 solver = WaveSolver(*args, **kwargs)
46
46
47 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
48 """save the wave log"""
48 """save the wave log"""
49 global u_hist
49 global u_hist
50 global t_hist
50 global t_hist
51 t_hist.append(t)
51 t_hist.append(t)
52 u_hist.append(1.0*u)
52 u_hist.append(1.0*u)
53
53
54
54
55 # main program:
55 # main program:
56 if __name__ == '__main__':
56 if __name__ == '__main__':
57
57
58 parser = argparse.ArgumentParser()
58 parser = argparse.ArgumentParser()
59 paa = parser.add_argument
59 paa = parser.add_argument
60 paa('--grid', '-g',
60 paa('--grid', '-g',
61 type=int, nargs=2, default=[100,100], dest='grid',
61 type=int, nargs=2, default=[100,100], dest='grid',
62 help="Cells in the grid, e.g. --grid 100 200")
62 help="Cells in the grid, e.g. --grid 100 200")
63 paa('--partition', '-p',
63 paa('--partition', '-p',
64 type=int, nargs=2, default=None,
64 type=int, nargs=2, default=None,
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 paa('-c',
66 paa('-c',
67 type=float, default=1.,
67 type=float, default=1.,
68 help="Wave speed (I think)")
68 help="Wave speed (I think)")
69 paa('-Ly',
69 paa('-Ly',
70 type=float, default=1.,
70 type=float, default=1.,
71 help="system size (in y)")
71 help="system size (in y)")
72 paa('-Lx',
72 paa('-Lx',
73 type=float, default=1.,
73 type=float, default=1.,
74 help="system size (in x)")
74 help="system size (in x)")
75 paa('-t', '--tstop',
75 paa('-t', '--tstop',
76 type=float, default=1.,
76 type=float, default=1.,
77 help="Time units to run")
77 help="Time units to run")
78 paa('--profile',
78 paa('--profile',
79 type=unicode, default=u'default',
79 type=unicode, default=u'default',
80 help="Specify the ipcluster profile for the client to connect to.")
80 help="Specify the ipcluster profile for the client to connect to.")
81 paa('--save',
81 paa('--save',
82 action='store_true',
82 action='store_true',
83 help="Add this flag to save the time/wave history during the run.")
83 help="Add this flag to save the time/wave history during the run.")
84 paa('--scalar',
84 paa('--scalar',
85 action='store_true',
85 action='store_true',
86 help="Also run with scalar interior implementation, to see vector speedup.")
86 help="Also run with scalar interior implementation, to see vector speedup.")
87
87
88 ns = parser.parse_args()
88 ns = parser.parse_args()
89 # set up arguments
89 # set up arguments
90 grid = ns.grid
90 grid = ns.grid
91 partition = ns.partition
91 partition = ns.partition
92 Lx = ns.Lx
92 Lx = ns.Lx
93 Ly = ns.Ly
93 Ly = ns.Ly
94 c = ns.c
94 c = ns.c
95 tstop = ns.tstop
95 tstop = ns.tstop
96 if ns.save:
96 if ns.save:
97 user_action = wave_saver
97 user_action = wave_saver
98 else:
98 else:
99 user_action = None
99 user_action = None
100
100
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 final_test = True
102 final_test = True
103
103
104 # create the Client
104 # create the Client
105 rc = Client(profile=ns.profile)
105 rc = Client(profile=ns.profile)
106 num_procs = len(rc.ids)
106 num_procs = len(rc.ids)
107
107
108 if partition is None:
108 if partition is None:
109 partition = [1,num_procs]
109 partition = [1,num_procs]
110
110
111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
112
112
113 view = rc[:]
113 view = rc[:]
114 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
114 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
115
115
116 # functions defining initial/boundary/source conditions
116 # functions defining initial/boundary/source conditions
117 def I(x,y):
117 def I(x,y):
118 from numpy import exp
118 from numpy import exp
119 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
119 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
120 def f(x,y,t):
120 def f(x,y,t):
121 return 0.0
121 return 0.0
122 # from numpy import exp,sin
122 # from numpy import exp,sin
123 # return 10*exp(-(x - sin(100*t))**2)
123 # return 10*exp(-(x - sin(100*t))**2)
124 def bc(x,y,t):
124 def bc(x,y,t):
125 return 0.0
125 return 0.0
126
126
127 # initial imports, setup rank
127 # initial imports, setup rank
128 view.execute('\n'.join([
128 view.execute('\n'.join([
129 "from mpi4py import MPI",
129 "from mpi4py import MPI",
130 "import numpy",
130 "import numpy",
131 "mpi = MPI.COMM_WORLD",
131 "mpi = MPI.COMM_WORLD",
132 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
132 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
133
133
134 # initialize t_hist/u_hist for saving the state at each step (optional)
134 # initialize t_hist/u_hist for saving the state at each step (optional)
135 view['t_hist'] = []
135 view['t_hist'] = []
136 view['u_hist'] = []
136 view['u_hist'] = []
137
137
138 # set vector/scalar implementation details
138 # set vector/scalar implementation details
139 impl = {}
139 impl = {}
140 impl['ic'] = 'vectorized'
140 impl['ic'] = 'vectorized'
141 impl['inner'] = 'scalar'
141 impl['inner'] = 'scalar'
142 impl['bc'] = 'vectorized'
142 impl['bc'] = 'vectorized'
143
143
144 # execute some files so that the classes we need will be defined on the engines:
144 # execute some files so that the classes we need will be defined on the engines:
145 view.run('RectPartitioner.py')
145 view.run('RectPartitioner.py')
146 view.run('wavesolver.py')
146 view.run('wavesolver.py')
147
147
148 # setup remote partitioner
148 # setup remote partitioner
149 # note that Reference means that the argument passed to setup_partitioner will be the
149 # note that Reference means that the argument passed to setup_partitioner will be the
150 # object named 'my_id' in the engine's namespace
150 # object named 'my_id' in the engine's namespace
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
151 view.apply_sync(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
152 # wait for initial communication to complete
152 # wait for initial communication to complete
153 view.execute('mpi.barrier()')
153 view.execute('mpi.barrier()')
154 # setup remote solvers
154 # setup remote solvers
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
155 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
156
156
157 # lambda for calling solver.solve:
157 # lambda for calling solver.solve:
158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
158 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
159
159
160 if ns.scalar:
160 if ns.scalar:
161 impl['inner'] = 'scalar'
161 impl['inner'] = 'scalar'
162 # run first with element-wise Python operations for each cell
162 # run first with element-wise Python operations for each cell
163 t0 = time.time()
163 t0 = time.time()
164 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
164 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
165 if final_test:
165 if final_test:
166 # this sum is performed element-wise as results finish
166 # this sum is performed element-wise as results finish
167 s = sum(ar)
167 s = sum(ar)
168 # the L2 norm (RMS) of the result:
168 # the L2 norm (RMS) of the result:
169 norm = sqrt(s/num_cells)
169 norm = sqrt(s/num_cells)
170 else:
170 else:
171 norm = -1
171 norm = -1
172 t1 = time.time()
172 t1 = time.time()
173 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
173 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
174
174
175 impl['inner'] = 'vectorized'
175 impl['inner'] = 'vectorized'
176 # setup new solvers
176 # setup new solvers
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
177 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
178 view.execute('mpi.barrier()')
178 view.execute('mpi.barrier()')
179
179
180 # run again with numpy vectorized inner-implementation
180 # run again with numpy vectorized inner-implementation
181 t0 = time.time()
181 t0 = time.time()
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
182 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
183 if final_test:
183 if final_test:
184 # this sum is performed element-wise as results finish
184 # this sum is performed element-wise as results finish
185 s = sum(ar)
185 s = sum(ar)
186 # the L2 norm (RMS) of the result:
186 # the L2 norm (RMS) of the result:
187 norm = sqrt(s/num_cells)
187 norm = sqrt(s/num_cells)
188 else:
188 else:
189 norm = -1
189 norm = -1
190 t1 = time.time()
190 t1 = time.time()
191 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
191 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
192
192
193 # if ns.save is True, then u_hist stores the history of u as a list
193 # if ns.save is True, then u_hist stores the history of u as a list
194 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
194 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
195 if ns.save and partition[-1] == 1:
195 if ns.save and partition[-1] == 1:
196 import pylab
196 import pylab
197 view.execute('u_last=u_hist[-1]')
197 view.execute('u_last=u_hist[-1]')
198 # map mpi IDs to IPython IDs, which may not match
198 # map mpi IDs to IPython IDs, which may not match
199 ranks = view['my_id']
199 ranks = view['my_id']
200 targets = range(len(ranks))
200 targets = range(len(ranks))
201 for idx in range(len(ranks)):
201 for idx in range(len(ranks)):
202 targets[idx] = ranks.index(idx)
202 targets[idx] = ranks.index(idx)
203 u_last = rc[targets].gather('u_last', block=True)
203 u_last = rc[targets].gather('u_last', block=True)
204 pylab.pcolor(u_last)
204 pylab.pcolor(u_last)
205 pylab.show()
205 pylab.show()
@@ -1,209 +1,209 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A simple python program of solving a 2D wave equation in parallel.
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
4 Domain partitioning and inter-processor communication
5 are done by an object of class ZMQRectPartitioner2D
5 are done by an object of class ZMQRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
7
7
8 An example of running the program is (8 processors, 4x2 partition,
8 An example of running the program is (8 processors, 4x2 partition,
9 200x200 grid cells)::
9 200x200 grid cells)::
10
10
11 $ ipclusterz start -n 8 # start 8 engines
11 $ ipcluster start -n 8 # start 8 engines
12 $ ./parallelwave.py --grid 200 200 --partition 4 2
12 $ python parallelwave.py --grid 200 200 --partition 4 2
13
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
15 (via mpi4py) for the inter-engine communication.
16
16
17 Authors
17 Authors
18 -------
18 -------
19
19
20 * Xing Cai
20 * Xing Cai
21 * Min Ragan-Kelley
21 * Min Ragan-Kelley
22
22
23 """
23 """
24 #
24 #
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
35 global partitioner
35 global partitioner
36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.redim(global_num_cells=gnum_cells, num_parts=parts)
38 p.prepare_communication()
38 p.prepare_communication()
39 # put the partitioner into the global namespace:
39 # put the partitioner into the global namespace:
40 partitioner=p
40 partitioner=p
41
41
42 def setup_solver(*args, **kwargs):
42 def setup_solver(*args, **kwargs):
43 """create a WaveSolver in the engine namespace."""
43 """create a WaveSolver in the engine namespace."""
44 global solver
44 global solver
45 solver = WaveSolver(*args, **kwargs)
45 solver = WaveSolver(*args, **kwargs)
46
46
47 def wave_saver(u, x, y, t):
47 def wave_saver(u, x, y, t):
48 """save the wave state for each timestep."""
48 """save the wave state for each timestep."""
49 global u_hist
49 global u_hist
50 global t_hist
50 global t_hist
51 t_hist.append(t)
51 t_hist.append(t)
52 u_hist.append(1.0*u)
52 u_hist.append(1.0*u)
53
53
54
54
55 # main program:
55 # main program:
56 if __name__ == '__main__':
56 if __name__ == '__main__':
57
57
58 parser = argparse.ArgumentParser()
58 parser = argparse.ArgumentParser()
59 paa = parser.add_argument
59 paa = parser.add_argument
60 paa('--grid', '-g',
60 paa('--grid', '-g',
61 type=int, nargs=2, default=[100,100], dest='grid',
61 type=int, nargs=2, default=[100,100], dest='grid',
62 help="Cells in the grid, e.g. --grid 100 200")
62 help="Cells in the grid, e.g. --grid 100 200")
63 paa('--partition', '-p',
63 paa('--partition', '-p',
64 type=int, nargs=2, default=None,
64 type=int, nargs=2, default=None,
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
65 help="Process partition grid, e.g. --partition 4 2 for 4x2")
66 paa('-c',
66 paa('-c',
67 type=float, default=1.,
67 type=float, default=1.,
68 help="Wave speed (I think)")
68 help="Wave speed (I think)")
69 paa('-Ly',
69 paa('-Ly',
70 type=float, default=1.,
70 type=float, default=1.,
71 help="system size (in y)")
71 help="system size (in y)")
72 paa('-Lx',
72 paa('-Lx',
73 type=float, default=1.,
73 type=float, default=1.,
74 help="system size (in x)")
74 help="system size (in x)")
75 paa('-t', '--tstop',
75 paa('-t', '--tstop',
76 type=float, default=1.,
76 type=float, default=1.,
77 help="Time units to run")
77 help="Time units to run")
78 paa('--profile',
78 paa('--profile',
79 type=unicode, default=u'default',
79 type=unicode, default=u'default',
80 help="Specify the ipcluster profile for the client to connect to.")
80 help="Specify the ipcluster profile for the client to connect to.")
81 paa('--save',
81 paa('--save',
82 action='store_true',
82 action='store_true',
83 help="Add this flag to save the time/wave history during the run.")
83 help="Add this flag to save the time/wave history during the run.")
84 paa('--scalar',
84 paa('--scalar',
85 action='store_true',
85 action='store_true',
86 help="Also run with scalar interior implementation, to see vector speedup.")
86 help="Also run with scalar interior implementation, to see vector speedup.")
87
87
88 ns = parser.parse_args()
88 ns = parser.parse_args()
89 # set up arguments
89 # set up arguments
90 grid = ns.grid
90 grid = ns.grid
91 partition = ns.partition
91 partition = ns.partition
92 Lx = ns.Lx
92 Lx = ns.Lx
93 Ly = ns.Ly
93 Ly = ns.Ly
94 c = ns.c
94 c = ns.c
95 tstop = ns.tstop
95 tstop = ns.tstop
96 if ns.save:
96 if ns.save:
97 user_action = wave_saver
97 user_action = wave_saver
98 else:
98 else:
99 user_action = None
99 user_action = None
100
100
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
101 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
102 final_test = True
102 final_test = True
103
103
104 # create the Client
104 # create the Client
105 rc = Client(profile=ns.profile)
105 rc = Client(profile=ns.profile)
106 num_procs = len(rc.ids)
106 num_procs = len(rc.ids)
107
107
108 if partition is None:
108 if partition is None:
109 partition = [num_procs,1]
109 partition = [num_procs,1]
110 else:
110 else:
111 num_procs = min(num_procs, partition[0]*partition[1])
111 num_procs = min(num_procs, partition[0]*partition[1])
112
112
113 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
113 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
114
114
115 # construct the View:
115 # construct the View:
116 view = rc[:num_procs]
116 view = rc[:num_procs]
117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
118
118
119 # functions defining initial/boundary/source conditions
119 # functions defining initial/boundary/source conditions
120 def I(x,y):
120 def I(x,y):
121 from numpy import exp
121 from numpy import exp
122 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
122 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
123 def f(x,y,t):
123 def f(x,y,t):
124 return 0.0
124 return 0.0
125 # from numpy import exp,sin
125 # from numpy import exp,sin
126 # return 10*exp(-(x - sin(100*t))**2)
126 # return 10*exp(-(x - sin(100*t))**2)
127 def bc(x,y,t):
127 def bc(x,y,t):
128 return 0.0
128 return 0.0
129
129
130 # initialize t_hist/u_hist for saving the state at each step (optional)
130 # initialize t_hist/u_hist for saving the state at each step (optional)
131 view['t_hist'] = []
131 view['t_hist'] = []
132 view['u_hist'] = []
132 view['u_hist'] = []
133
133
134 # set vector/scalar implementation details
134 # set vector/scalar implementation details
135 impl = {}
135 impl = {}
136 impl['ic'] = 'vectorized'
136 impl['ic'] = 'vectorized'
137 impl['inner'] = 'scalar'
137 impl['inner'] = 'scalar'
138 impl['bc'] = 'vectorized'
138 impl['bc'] = 'vectorized'
139
139
140 # execute some files so that the classes we need will be defined on the engines:
140 # execute some files so that the classes we need will be defined on the engines:
141 view.execute('import numpy')
141 view.execute('import numpy')
142 view.run('communicator.py')
142 view.run('communicator.py')
143 view.run('RectPartitioner.py')
143 view.run('RectPartitioner.py')
144 view.run('wavesolver.py')
144 view.run('wavesolver.py')
145
145
146 # scatter engine IDs
146 # scatter engine IDs
147 view.scatter('my_id', range(num_procs), flatten=True)
147 view.scatter('my_id', range(num_procs), flatten=True)
148
148
149 # create the engine connectors
149 # create the engine connectors
150 view.execute('com = EngineCommunicator()')
150 view.execute('com = EngineCommunicator()')
151
151
152 # gather the connection information into a single dict
152 # gather the connection information into a single dict
153 ar = view.apply_async(lambda : com.info)
153 ar = view.apply_async(lambda : com.info)
154 peers = ar.get_dict()
154 peers = ar.get_dict()
155 # print peers
155 # print peers
156 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
156 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
157
157
158 # setup remote partitioner
158 # setup remote partitioner
159 # note that Reference means that the argument passed to setup_partitioner will be the
159 # note that Reference means that the argument passed to setup_partitioner will be the
160 # object named 'com' in the engine's namespace
160 # object named 'com' in the engine's namespace
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
161 view.apply_sync(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
162 time.sleep(1)
162 time.sleep(1)
163 # convenience lambda to call solver.solve:
163 # convenience lambda to call solver.solve:
164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
164 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
165
165
166 if ns.scalar:
166 if ns.scalar:
167 impl['inner'] = 'scalar'
167 impl['inner'] = 'scalar'
168 # setup remote solvers
168 # setup remote solvers
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
169 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
170
170
171 # run first with element-wise Python operations for each cell
171 # run first with element-wise Python operations for each cell
172 t0 = time.time()
172 t0 = time.time()
173 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
173 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
174 if final_test:
174 if final_test:
175 # this sum is performed element-wise as results finish
175 # this sum is performed element-wise as results finish
176 s = sum(ar)
176 s = sum(ar)
177 # the L2 norm (RMS) of the result:
177 # the L2 norm (RMS) of the result:
178 norm = sqrt(s/num_cells)
178 norm = sqrt(s/num_cells)
179 else:
179 else:
180 norm = -1
180 norm = -1
181 t1 = time.time()
181 t1 = time.time()
182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
183
183
184 # run again with faster numpy-vectorized inner implementation:
184 # run again with faster numpy-vectorized inner implementation:
185 impl['inner'] = 'vectorized'
185 impl['inner'] = 'vectorized'
186 # setup remote solvers
186 # setup remote solvers
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
187 view.apply_sync(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
188
188
189 t0 = time.time()
189 t0 = time.time()
190
190
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
191 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
192 if final_test:
192 if final_test:
193 # this sum is performed element-wise as results finish
193 # this sum is performed element-wise as results finish
194 s = sum(ar)
194 s = sum(ar)
195 # the L2 norm (RMS) of the result:
195 # the L2 norm (RMS) of the result:
196 norm = sqrt(s/num_cells)
196 norm = sqrt(s/num_cells)
197 else:
197 else:
198 norm = -1
198 norm = -1
199 t1 = time.time()
199 t1 = time.time()
200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
201
201
202 # if ns.save is True, then u_hist stores the history of u as a list
202 # if ns.save is True, then u_hist stores the history of u as a list
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
204 if ns.save and partition[-1] == 1:
204 if ns.save and partition[-1] == 1:
205 import pylab
205 import pylab
206 view.execute('u_last=u_hist[-1]')
206 view.execute('u_last=u_hist[-1]')
207 u_last = view.gather('u_last', block=True)
207 u_last = view.gather('u_last', block=True)
208 pylab.pcolor(u_last)
208 pylab.pcolor(u_last)
209 pylab.show() No newline at end of file
209 pylab.show()
General Comments 0
You need to be logged in to leave comments. Login now