##// END OF EJS Templates
wave2d example using single view, instead of repeated 'rc[:]'
MinRK -
Show More
@@ -9,7 +9,7 b' from IPython.zmq.parallel.util import disambiguate_url'
9
9
10 class EngineCommunicator(object):
10 class EngineCommunicator(object):
11 """An object that connects Engines to each other.
11 """An object that connects Engines to each other.
12 north and west sockets listen, while south and east sockets connect.
12 north and east sockets listen, while south and west sockets connect.
13
13
14 This class is useful in cases where there is a set of nodes that
14 This class is useful in cases where there is a set of nodes that
15 must communicate only with their nearest neighbors.
15 must communicate only with their nearest neighbors.
@@ -108,6 +108,9 b" if __name__ == '__main__':"
108
108
109 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
109 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
110
110
111 view = rc[:]
112 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
113
111 # functions defining initial/boundary/source conditions
114 # functions defining initial/boundary/source conditions
112 def I(x,y):
115 def I(x,y):
113 from numpy import exp
116 from numpy import exp
@@ -120,15 +123,15 b" if __name__ == '__main__':"
120 return 0.0
123 return 0.0
121
124
122 # initial imports, setup rank
125 # initial imports, setup rank
123 rc[:].execute('\n'.join([
126 view.execute('\n'.join([
124 "from mpi4py import MPI",
127 "from mpi4py import MPI",
125 "import numpy",
128 "import numpy",
126 "mpi = MPI.COMM_WORLD",
129 "mpi = MPI.COMM_WORLD",
127 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
130 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
128
131
129 # initialize t_hist/u_hist for saving the state at each step (optional)
132 # initialize t_hist/u_hist for saving the state at each step (optional)
130 rc[:]['t_hist'] = []
133 view['t_hist'] = []
131 rc[:]['u_hist'] = []
134 view['u_hist'] = []
132
135
133 # set vector/scalar implementation details
136 # set vector/scalar implementation details
134 impl = {}
137 impl = {}
@@ -137,17 +140,17 b" if __name__ == '__main__':"
137 impl['bc'] = 'vectorized'
140 impl['bc'] = 'vectorized'
138
141
139 # execute some files so that the classes we need will be defined on the engines:
142 # execute some files so that the classes we need will be defined on the engines:
140 rc[:].run('RectPartitioner.py')
143 view.run('RectPartitioner.py')
141 rc[:].run('wavesolver.py')
144 view.run('wavesolver.py')
142
145
143 # setup remote partitioner
146 # setup remote partitioner
144 # note that Reference means that the argument passed to setup_partitioner will be the
147 # note that Reference means that the argument passed to setup_partitioner will be the
145 # object named 'my_id' in the engine's namespace
148 # object named 'my_id' in the engine's namespace
146 rc[:].apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
149 view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
147 # wait for initial communication to complete
150 # wait for initial communication to complete
148 rc[:].execute('mpi.barrier()')
151 view.execute('mpi.barrier()')
149 # setup remote solvers
152 # setup remote solvers
150 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
153 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
151
154
152 # lambda for calling solver.solve:
155 # lambda for calling solver.solve:
153 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
156 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -156,7 +159,7 b" if __name__ == '__main__':"
156 impl['inner'] = 'scalar'
159 impl['inner'] = 'scalar'
157 # run first with element-wise Python operations for each cell
160 # run first with element-wise Python operations for each cell
158 t0 = time.time()
161 t0 = time.time()
159 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
162 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
160 if final_test:
163 if final_test:
161 # this sum is performed element-wise as results finish
164 # this sum is performed element-wise as results finish
162 s = sum(ar)
165 s = sum(ar)
@@ -169,12 +172,12 b" if __name__ == '__main__':"
169
172
170 impl['inner'] = 'vectorized'
173 impl['inner'] = 'vectorized'
171 # setup new solvers
174 # setup new solvers
172 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
175 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
173 rc[:].execute('mpi.barrier()')
176 view.execute('mpi.barrier()')
174
177
175 # run again with numpy vectorized inner-implementation
178 # run again with numpy vectorized inner-implementation
176 t0 = time.time()
179 t0 = time.time()
177 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
180 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
178 if final_test:
181 if final_test:
179 # this sum is performed element-wise as results finish
182 # this sum is performed element-wise as results finish
180 s = sum(ar)
183 s = sum(ar)
@@ -189,9 +192,9 b" if __name__ == '__main__':"
189 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
192 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
190 if ns.save and partition[-1] == 1:
193 if ns.save and partition[-1] == 1:
191 import pylab
194 import pylab
192 rc[:].execute('u_last=u_hist[-1]')
195 view.execute('u_last=u_hist[-1]')
193 # map mpi IDs to IPython IDs, which may not match
196 # map mpi IDs to IPython IDs, which may not match
194 ranks = rc[:]['my_id']
197 ranks = view['my_id']
195 targets = range(len(ranks))
198 targets = range(len(ranks))
196 for idx in range(len(ranks)):
199 for idx in range(len(ranks)):
197 targets[idx] = ranks.index(idx)
200 targets[idx] = ranks.index(idx)
@@ -105,9 +105,15 b" if __name__ == '__main__':"
105
105
106 if partition is None:
106 if partition is None:
107 partition = [num_procs,1]
107 partition = [num_procs,1]
108 else:
109 num_procs = min(num_procs, partition[0]*partition[1])
108
110
109 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
111 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
110
112
113 # construct the View:
114 view = rc[:num_procs]
115 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
116
111 # functions defining initial/boundary/source conditions
117 # functions defining initial/boundary/source conditions
112 def I(x,y):
118 def I(x,y):
113 from numpy import exp
119 from numpy import exp
@@ -120,8 +126,8 b" if __name__ == '__main__':"
120 return 0.0
126 return 0.0
121
127
122 # initialize t_hist/u_hist for saving the state at each step (optional)
128 # initialize t_hist/u_hist for saving the state at each step (optional)
123 rc[:]['t_hist'] = []
129 view['t_hist'] = []
124 rc[:]['u_hist'] = []
130 view['u_hist'] = []
125
131
126 # set vector/scalar implementation details
132 # set vector/scalar implementation details
127 impl = {}
133 impl = {}
@@ -130,19 +136,19 b" if __name__ == '__main__':"
130 impl['bc'] = 'vectorized'
136 impl['bc'] = 'vectorized'
131
137
132 # execute some files so that the classes we need will be defined on the engines:
138 # execute some files so that the classes we need will be defined on the engines:
133 rc[:].execute('import numpy')
139 view.execute('import numpy')
134 rc[:].run('communicator.py')
140 view.run('communicator.py')
135 rc[:].run('RectPartitioner.py')
141 view.run('RectPartitioner.py')
136 rc[:].run('wavesolver.py')
142 view.run('wavesolver.py')
137
143
138 # scatter engine IDs
144 # scatter engine IDs
139 rc[:].scatter('my_id', rc.ids, flatten=True)
145 view.scatter('my_id', range(num_procs), flatten=True)
140
146
141 # create the engine connectors
147 # create the engine connectors
142 rc[:].execute('com = EngineCommunicator()')
148 view.execute('com = EngineCommunicator()')
143
149
144 # gather the connection information into a single dict
150 # gather the connection information into a single dict
145 ar = rc[:].apply_async(lambda : com.info)
151 ar = view.apply_async(lambda : com.info)
146 peers = ar.get_dict()
152 peers = ar.get_dict()
147 # print peers
153 # print peers
148 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
154 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
@@ -150,7 +156,7 b" if __name__ == '__main__':"
150 # setup remote partitioner
156 # setup remote partitioner
151 # note that Reference means that the argument passed to setup_partitioner will be the
157 # note that Reference means that the argument passed to setup_partitioner will be the
152 # object named 'com' in the engine's namespace
158 # object named 'com' in the engine's namespace
153 rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
159 view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
154 time.sleep(1)
160 time.sleep(1)
155 # convenience lambda to call solver.solve:
161 # convenience lambda to call solver.solve:
156 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
162 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
@@ -158,11 +164,11 b" if __name__ == '__main__':"
158 if ns.scalar:
164 if ns.scalar:
159 impl['inner'] = 'scalar'
165 impl['inner'] = 'scalar'
160 # setup remote solvers
166 # setup remote solvers
161 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
167 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
162
168
163 # run first with element-wise Python operations for each cell
169 # run first with element-wise Python operations for each cell
164 t0 = time.time()
170 t0 = time.time()
165 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
171 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
166 if final_test:
172 if final_test:
167 # this sum is performed element-wise as results finish
173 # this sum is performed element-wise as results finish
168 s = sum(ar)
174 s = sum(ar)
@@ -176,11 +182,11 b" if __name__ == '__main__':"
176 # run again with faster numpy-vectorized inner implementation:
182 # run again with faster numpy-vectorized inner implementation:
177 impl['inner'] = 'vectorized'
183 impl['inner'] = 'vectorized'
178 # setup remote solvers
184 # setup remote solvers
179 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
185 view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
180
186
181 t0 = time.time()
187 t0 = time.time()
182
188
183 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
189 ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
184 if final_test:
190 if final_test:
185 # this sum is performed element-wise as results finish
191 # this sum is performed element-wise as results finish
186 s = sum(ar)
192 s = sum(ar)
@@ -195,7 +201,7 b" if __name__ == '__main__':"
195 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
201 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
196 if ns.save and partition[-1] == 1:
202 if ns.save and partition[-1] == 1:
197 import pylab
203 import pylab
198 rc[:].execute('u_last=u_hist[-1]')
204 view.execute('u_last=u_hist[-1]')
199 u_last = rc[:].gather('u_last', block=True)
205 u_last = view.gather('u_last', block=True)
200 pylab.pcolor(u_last)
206 pylab.pcolor(u_last)
201 pylab.show() No newline at end of file
207 pylab.show()
General Comments 0
You need to be logged in to leave comments. Login now