Show More
@@ -9,7 +9,7 b' from IPython.zmq.parallel.util import disambiguate_url' | |||||
9 |
|
9 | |||
10 | class EngineCommunicator(object): |
|
10 | class EngineCommunicator(object): | |
11 | """An object that connects Engines to each other. |
|
11 | """An object that connects Engines to each other. | |
12 |
north and |
|
12 | north and east sockets listen, while south and west sockets connect. | |
13 |
|
13 | |||
14 | This class is useful in cases where there is a set of nodes that |
|
14 | This class is useful in cases where there is a set of nodes that | |
15 | must communicate only with their nearest neighbors. |
|
15 | must communicate only with their nearest neighbors. |
@@ -108,6 +108,9 b" if __name__ == '__main__':" | |||||
108 |
|
108 | |||
109 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
109 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |
110 |
|
110 | |||
|
111 | view = rc[:] | |||
|
112 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) | |||
|
113 | ||||
111 | # functions defining initial/boundary/source conditions |
|
114 | # functions defining initial/boundary/source conditions | |
112 | def I(x,y): |
|
115 | def I(x,y): | |
113 | from numpy import exp |
|
116 | from numpy import exp | |
@@ -120,15 +123,15 b" if __name__ == '__main__':" | |||||
120 | return 0.0 |
|
123 | return 0.0 | |
121 |
|
124 | |||
122 | # initial imports, setup rank |
|
125 | # initial imports, setup rank | |
123 |
|
|
126 | view.execute('\n'.join([ | |
124 | "from mpi4py import MPI", |
|
127 | "from mpi4py import MPI", | |
125 | "import numpy", |
|
128 | "import numpy", | |
126 | "mpi = MPI.COMM_WORLD", |
|
129 | "mpi = MPI.COMM_WORLD", | |
127 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) |
|
130 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) | |
128 |
|
131 | |||
129 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
132 | # initialize t_hist/u_hist for saving the state at each step (optional) | |
130 |
|
|
133 | view['t_hist'] = [] | |
131 |
|
|
134 | view['u_hist'] = [] | |
132 |
|
135 | |||
133 | # set vector/scalar implementation details |
|
136 | # set vector/scalar implementation details | |
134 | impl = {} |
|
137 | impl = {} | |
@@ -137,17 +140,17 b" if __name__ == '__main__':" | |||||
137 | impl['bc'] = 'vectorized' |
|
140 | impl['bc'] = 'vectorized' | |
138 |
|
141 | |||
139 | # execute some files so that the classes we need will be defined on the engines: |
|
142 | # execute some files so that the classes we need will be defined on the engines: | |
140 |
|
|
143 | view.run('RectPartitioner.py') | |
141 |
|
|
144 | view.run('wavesolver.py') | |
142 |
|
145 | |||
143 | # setup remote partitioner |
|
146 | # setup remote partitioner | |
144 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
147 | # note that Reference means that the argument passed to setup_partitioner will be the | |
145 | # object named 'my_id' in the engine's namespace |
|
148 | # object named 'my_id' in the engine's namespace | |
146 |
|
|
149 | view.apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition) | |
147 | # wait for initial communication to complete |
|
150 | # wait for initial communication to complete | |
148 |
|
|
151 | view.execute('mpi.barrier()') | |
149 | # setup remote solvers |
|
152 | # setup remote solvers | |
150 |
|
|
153 | view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
151 |
|
154 | |||
152 | # lambda for calling solver.solve: |
|
155 | # lambda for calling solver.solve: | |
153 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
156 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |
@@ -156,7 +159,7 b" if __name__ == '__main__':" | |||||
156 | impl['inner'] = 'scalar' |
|
159 | impl['inner'] = 'scalar' | |
157 | # run first with element-wise Python operations for each cell |
|
160 | # run first with element-wise Python operations for each cell | |
158 | t0 = time.time() |
|
161 | t0 = time.time() | |
159 |
ar = |
|
162 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
160 | if final_test: |
|
163 | if final_test: | |
161 | # this sum is performed element-wise as results finish |
|
164 | # this sum is performed element-wise as results finish | |
162 | s = sum(ar) |
|
165 | s = sum(ar) | |
@@ -169,12 +172,12 b" if __name__ == '__main__':" | |||||
169 |
|
172 | |||
170 | impl['inner'] = 'vectorized' |
|
173 | impl['inner'] = 'vectorized' | |
171 | # setup new solvers |
|
174 | # setup new solvers | |
172 |
|
|
175 | view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
173 |
|
|
176 | view.execute('mpi.barrier()') | |
174 |
|
177 | |||
175 | # run again with numpy vectorized inner-implementation |
|
178 | # run again with numpy vectorized inner-implementation | |
176 | t0 = time.time() |
|
179 | t0 = time.time() | |
177 |
ar = |
|
180 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) | |
178 | if final_test: |
|
181 | if final_test: | |
179 | # this sum is performed element-wise as results finish |
|
182 | # this sum is performed element-wise as results finish | |
180 | s = sum(ar) |
|
183 | s = sum(ar) | |
@@ -189,9 +192,9 b" if __name__ == '__main__':" | |||||
189 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
192 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |
190 | if ns.save and partition[-1] == 1: |
|
193 | if ns.save and partition[-1] == 1: | |
191 | import pylab |
|
194 | import pylab | |
192 |
|
|
195 | view.execute('u_last=u_hist[-1]') | |
193 | # map mpi IDs to IPython IDs, which may not match |
|
196 | # map mpi IDs to IPython IDs, which may not match | |
194 |
ranks = |
|
197 | ranks = view['my_id'] | |
195 | targets = range(len(ranks)) |
|
198 | targets = range(len(ranks)) | |
196 | for idx in range(len(ranks)): |
|
199 | for idx in range(len(ranks)): | |
197 | targets[idx] = ranks.index(idx) |
|
200 | targets[idx] = ranks.index(idx) |
@@ -105,9 +105,15 b" if __name__ == '__main__':" | |||||
105 |
|
105 | |||
106 | if partition is None: |
|
106 | if partition is None: | |
107 | partition = [num_procs,1] |
|
107 | partition = [num_procs,1] | |
|
108 | else: | |||
|
109 | num_procs = min(num_procs, partition[0]*partition[1]) | |||
108 |
|
110 | |||
109 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) |
|
111 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |
110 |
|
112 | |||
|
113 | # construct the View: | |||
|
114 | view = rc[:num_procs] | |||
|
115 | print "Running %s system on %s processes until %f"%(grid, partition, tstop) | |||
|
116 | ||||
111 | # functions defining initial/boundary/source conditions |
|
117 | # functions defining initial/boundary/source conditions | |
112 | def I(x,y): |
|
118 | def I(x,y): | |
113 | from numpy import exp |
|
119 | from numpy import exp | |
@@ -120,8 +126,8 b" if __name__ == '__main__':" | |||||
120 | return 0.0 |
|
126 | return 0.0 | |
121 |
|
127 | |||
122 | # initialize t_hist/u_hist for saving the state at each step (optional) |
|
128 | # initialize t_hist/u_hist for saving the state at each step (optional) | |
123 |
|
|
129 | view['t_hist'] = [] | |
124 |
|
|
130 | view['u_hist'] = [] | |
125 |
|
131 | |||
126 | # set vector/scalar implementation details |
|
132 | # set vector/scalar implementation details | |
127 | impl = {} |
|
133 | impl = {} | |
@@ -130,19 +136,19 b" if __name__ == '__main__':" | |||||
130 | impl['bc'] = 'vectorized' |
|
136 | impl['bc'] = 'vectorized' | |
131 |
|
137 | |||
132 | # execute some files so that the classes we need will be defined on the engines: |
|
138 | # execute some files so that the classes we need will be defined on the engines: | |
133 |
|
|
139 | view.execute('import numpy') | |
134 |
|
|
140 | view.run('communicator.py') | |
135 |
|
|
141 | view.run('RectPartitioner.py') | |
136 |
|
|
142 | view.run('wavesolver.py') | |
137 |
|
143 | |||
138 | # scatter engine IDs |
|
144 | # scatter engine IDs | |
139 |
|
|
145 | view.scatter('my_id', range(num_procs), flatten=True) | |
140 |
|
146 | |||
141 | # create the engine connectors |
|
147 | # create the engine connectors | |
142 |
|
|
148 | view.execute('com = EngineCommunicator()') | |
143 |
|
149 | |||
144 | # gather the connection information into a single dict |
|
150 | # gather the connection information into a single dict | |
145 |
ar = |
|
151 | ar = view.apply_async(lambda : com.info) | |
146 | peers = ar.get_dict() |
|
152 | peers = ar.get_dict() | |
147 | # print peers |
|
153 | # print peers | |
148 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators |
|
154 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |
@@ -150,7 +156,7 b" if __name__ == '__main__':" | |||||
150 | # setup remote partitioner |
|
156 | # setup remote partitioner | |
151 | # note that Reference means that the argument passed to setup_partitioner will be the |
|
157 | # note that Reference means that the argument passed to setup_partitioner will be the | |
152 | # object named 'com' in the engine's namespace |
|
158 | # object named 'com' in the engine's namespace | |
153 |
|
|
159 | view.apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) | |
154 | time.sleep(1) |
|
160 | time.sleep(1) | |
155 | # convenience lambda to call solver.solve: |
|
161 | # convenience lambda to call solver.solve: | |
156 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) |
|
162 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |
@@ -158,11 +164,11 b" if __name__ == '__main__':" | |||||
158 | if ns.scalar: |
|
164 | if ns.scalar: | |
159 | impl['inner'] = 'scalar' |
|
165 | impl['inner'] = 'scalar' | |
160 | # setup remote solvers |
|
166 | # setup remote solvers | |
161 |
|
|
167 | view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
162 |
|
168 | |||
163 | # run first with element-wise Python operations for each cell |
|
169 | # run first with element-wise Python operations for each cell | |
164 | t0 = time.time() |
|
170 | t0 = time.time() | |
165 |
ar = |
|
171 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |
166 | if final_test: |
|
172 | if final_test: | |
167 | # this sum is performed element-wise as results finish |
|
173 | # this sum is performed element-wise as results finish | |
168 | s = sum(ar) |
|
174 | s = sum(ar) | |
@@ -176,11 +182,11 b" if __name__ == '__main__':" | |||||
176 | # run again with faster numpy-vectorized inner implementation: |
|
182 | # run again with faster numpy-vectorized inner implementation: | |
177 | impl['inner'] = 'vectorized' |
|
183 | impl['inner'] = 'vectorized' | |
178 | # setup remote solvers |
|
184 | # setup remote solvers | |
179 |
|
|
185 | view.apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |
180 |
|
186 | |||
181 | t0 = time.time() |
|
187 | t0 = time.time() | |
182 |
|
188 | |||
183 |
ar = |
|
189 | ar = view.apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) | |
184 | if final_test: |
|
190 | if final_test: | |
185 | # this sum is performed element-wise as results finish |
|
191 | # this sum is performed element-wise as results finish | |
186 | s = sum(ar) |
|
192 | s = sum(ar) | |
@@ -195,7 +201,7 b" if __name__ == '__main__':" | |||||
195 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': |
|
201 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |
196 | if ns.save and partition[-1] == 1: |
|
202 | if ns.save and partition[-1] == 1: | |
197 | import pylab |
|
203 | import pylab | |
198 |
|
|
204 | view.execute('u_last=u_hist[-1]') | |
199 |
u_last = |
|
205 | u_last = view.gather('u_last', block=True) | |
200 | pylab.pcolor(u_last) |
|
206 | pylab.pcolor(u_last) | |
201 | pylab.show() No newline at end of file |
|
207 | pylab.show() |
General Comments 0
You need to be logged in to leave comments.
Login now