Show More
@@ -0,0 +1,491 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A rectangular domain partitioner and associated communication | |||
|
3 | functionality for solving PDEs in (1D,2D) using FDM | |||
|
4 | written in the python language | |||
|
5 | ||||
|
6 | The global solution domain is assumed to be of rectangular shape, | |||
|
7 | where the number of cells in each direction is stored in nx, ny, nz | |||
|
8 | ||||
|
9 | The numerical scheme is fully explicit | |||
|
10 | ||||
|
11 | Authors | |||
|
12 | ------- | |||
|
13 | ||||
|
14 | * Xing Cai | |||
|
15 | * Min Ragan-Kelley | |||
|
16 | ||||
|
17 | """ | |||
|
18 | import time | |||
|
19 | ||||
|
20 | from numpy import zeros, ascontiguousarray, frombuffer | |||
|
21 | try: | |||
|
22 | from mpi4py import MPI | |||
|
23 | except ImportError: | |||
|
24 | pass | |||
|
25 | else: | |||
|
26 | mpi = MPI.COMM_WORLD | |||
|
27 | ||||
|
28 | class RectPartitioner: | |||
|
29 | """ | |||
|
30 | Responsible for a rectangular partitioning of a global domain, | |||
|
31 | which is expressed as the numbers of cells in the different | |||
|
32 | spatial directions. The partitioning info is expressed as an | |||
|
33 | array of integers, each indicating the number of subdomains in | |||
|
34 | one spatial direction. | |||
|
35 | """ | |||
|
36 | ||||
|
37 | def __init__(self, my_id=-1, num_procs=-1, \ | |||
|
38 | global_num_cells=[], num_parts=[]): | |||
|
39 | self.nsd = 0 | |||
|
40 | self.my_id = my_id | |||
|
41 | self.num_procs = num_procs | |||
|
42 | self.redim (global_num_cells, num_parts) | |||
|
43 | ||||
|
44 | def redim (self, global_num_cells, num_parts): | |||
|
45 | nsd_ = len(global_num_cells) | |||
|
46 | # print "Inside the redim function, nsd=%d" %nsd_ | |||
|
47 | ||||
|
48 | if nsd_<1 | nsd_>3 | nsd_!=len(num_parts): | |||
|
49 | print 'The input global_num_cells is not ok!' | |||
|
50 | return | |||
|
51 | ||||
|
52 | self.nsd = nsd_ | |||
|
53 | self.global_num_cells = global_num_cells | |||
|
54 | self.num_parts = num_parts | |||
|
55 | ||||
|
56 | def prepare_communication (self): | |||
|
57 | """ | |||
|
58 | Find the subdomain rank (tuple) for each processor and | |||
|
59 | determine the neighbor info. | |||
|
60 | """ | |||
|
61 | ||||
|
62 | nsd_ = self.nsd | |||
|
63 | if nsd_<1: | |||
|
64 | print 'Number of space dimensions is %d, nothing to do' %nsd_ | |||
|
65 | return | |||
|
66 | ||||
|
67 | self.subd_rank = [-1,-1,-1] | |||
|
68 | self.subd_lo_ix = [-1,-1,-1] | |||
|
69 | self.subd_hi_ix = [-1,-1,-1] | |||
|
70 | self.lower_neighbors = [-1,-1,-1] | |||
|
71 | self.upper_neighbors = [-1,-1,-1] | |||
|
72 | ||||
|
73 | num_procs = self.num_procs | |||
|
74 | my_id = self.my_id | |||
|
75 | ||||
|
76 | num_subds = 1 | |||
|
77 | for i in range(nsd_): | |||
|
78 | num_subds = num_subds*self.num_parts[i] | |||
|
79 | if my_id==0: | |||
|
80 | print "# subds=", num_subds | |||
|
81 | # should check num_subds againt num_procs | |||
|
82 | ||||
|
83 | offsets = [1, 0, 0] | |||
|
84 | ||||
|
85 | # find the subdomain rank | |||
|
86 | self.subd_rank[0] = my_id%self.num_parts[0] | |||
|
87 | if nsd_>=2: | |||
|
88 | offsets[1] = self.num_parts[0] | |||
|
89 | self.subd_rank[1] = my_id/offsets[1] | |||
|
90 | if nsd_==3: | |||
|
91 | offsets[1] = self.num_parts[0] | |||
|
92 | offsets[2] = self.num_parts[0]*self.num_parts[1] | |||
|
93 | self.subd_rank[1] = (my_id%offsets[2])/self.num_parts[0] | |||
|
94 | self.subd_rank[2] = my_id/offsets[2] | |||
|
95 | ||||
|
96 | print "my_id=%d, subd_rank: "%my_id, self.subd_rank | |||
|
97 | if my_id==0: | |||
|
98 | print "offsets=", offsets | |||
|
99 | ||||
|
100 | # find the neighbor ids | |||
|
101 | for i in range(nsd_): | |||
|
102 | rank = self.subd_rank[i] | |||
|
103 | if rank>0: | |||
|
104 | self.lower_neighbors[i] = my_id-offsets[i] | |||
|
105 | if rank<self.num_parts[i]-1: | |||
|
106 | self.upper_neighbors[i] = my_id+offsets[i] | |||
|
107 | ||||
|
108 | k = self.global_num_cells[i]/self.num_parts[i] | |||
|
109 | m = self.global_num_cells[i]%self.num_parts[i] | |||
|
110 | ||||
|
111 | ix = rank*k+max(0,rank+m-self.num_parts[i]) | |||
|
112 | self.subd_lo_ix[i] = ix | |||
|
113 | ||||
|
114 | ix = ix+k | |||
|
115 | if rank>=(self.num_parts[i]-m): | |||
|
116 | ix = ix+1 # load balancing | |||
|
117 | if rank<self.num_parts[i]-1: | |||
|
118 | ix = ix+1 # one cell of overlap | |||
|
119 | self.subd_hi_ix[i] = ix | |||
|
120 | ||||
|
121 | print "subd_rank:",self.subd_rank,\ | |||
|
122 | "lower_neig:", self.lower_neighbors, \ | |||
|
123 | "upper_neig:", self.upper_neighbors | |||
|
124 | print "subd_rank:",self.subd_rank,"subd_lo_ix:", self.subd_lo_ix, \ | |||
|
125 | "subd_hi_ix:", self.subd_hi_ix | |||
|
126 | ||||
|
127 | ||||
|
128 | class RectPartitioner1D(RectPartitioner): | |||
|
129 | """ | |||
|
130 | Subclass of RectPartitioner, for 1D problems | |||
|
131 | """ | |||
|
132 | def prepare_communication (self): | |||
|
133 | """ | |||
|
134 | Prepare the buffers to be used for later communications | |||
|
135 | """ | |||
|
136 | ||||
|
137 | RectPartitioner.prepare_communication (self) | |||
|
138 | ||||
|
139 | if self.lower_neighbors[0]>=0: | |||
|
140 | self.in_lower_buffers = [zeros(1, float)] | |||
|
141 | self.out_lower_buffers = [zeros(1, float)] | |||
|
142 | if self.upper_neighbors[0]>=0: | |||
|
143 | self.in_upper_buffers = [zeros(1, float)] | |||
|
144 | self.out_upper_buffers = [zeros(1, float)] | |||
|
145 | ||||
|
146 | def get_num_loc_cells(self): | |||
|
147 | return [self.subd_hi_ix[0]-self.subd_lo_ix[0]] | |||
|
148 | ||||
|
149 | ||||
|
150 | class RectPartitioner2D(RectPartitioner): | |||
|
151 | """ | |||
|
152 | Subclass of RectPartitioner, for 2D problems | |||
|
153 | """ | |||
|
154 | def prepare_communication (self): | |||
|
155 | """ | |||
|
156 | Prepare the buffers to be used for later communications | |||
|
157 | """ | |||
|
158 | ||||
|
159 | RectPartitioner.prepare_communication (self) | |||
|
160 | ||||
|
161 | self.in_lower_buffers = [[], []] | |||
|
162 | self.out_lower_buffers = [[], []] | |||
|
163 | self.in_upper_buffers = [[], []] | |||
|
164 | self.out_upper_buffers = [[], []] | |||
|
165 | ||||
|
166 | size1 = self.subd_hi_ix[1]-self.subd_lo_ix[1]+1 | |||
|
167 | if self.lower_neighbors[0]>=0: | |||
|
168 | self.in_lower_buffers[0] = zeros(size1, float) | |||
|
169 | self.out_lower_buffers[0] = zeros(size1, float) | |||
|
170 | if self.upper_neighbors[0]>=0: | |||
|
171 | self.in_upper_buffers[0] = zeros(size1, float) | |||
|
172 | self.out_upper_buffers[0] = zeros(size1, float) | |||
|
173 | ||||
|
174 | size0 = self.subd_hi_ix[0]-self.subd_lo_ix[0]+1 | |||
|
175 | if self.lower_neighbors[1]>=0: | |||
|
176 | self.in_lower_buffers[1] = zeros(size0, float) | |||
|
177 | self.out_lower_buffers[1] = zeros(size0, float) | |||
|
178 | if self.upper_neighbors[1]>=0: | |||
|
179 | self.in_upper_buffers[1] = zeros(size0, float) | |||
|
180 | self.out_upper_buffers[1] = zeros(size0, float) | |||
|
181 | ||||
|
182 | def get_num_loc_cells(self): | |||
|
183 | return [self.subd_hi_ix[0]-self.subd_lo_ix[0],\ | |||
|
184 | self.subd_hi_ix[1]-self.subd_lo_ix[1]] | |||
|
185 | ||||
|
186 | ||||
|
187 | class MPIRectPartitioner2D(RectPartitioner2D): | |||
|
188 | """ | |||
|
189 | Subclass of RectPartitioner2D, which uses MPI via mpi4py for communication | |||
|
190 | """ | |||
|
191 | ||||
|
192 | def __init__(self, my_id=-1, num_procs=-1, | |||
|
193 | global_num_cells=[], num_parts=[], | |||
|
194 | slice_copy=True): | |||
|
195 | RectPartitioner.__init__(self, my_id, num_procs, | |||
|
196 | global_num_cells, num_parts) | |||
|
197 | self.slice_copy = slice_copy | |||
|
198 | ||||
|
199 | def update_internal_boundary (self, solution_array): | |||
|
200 | nsd_ = self.nsd | |||
|
201 | if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): | |||
|
202 | print "Buffers for communicating with lower neighbors not ready" | |||
|
203 | return | |||
|
204 | if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): | |||
|
205 | print "Buffers for communicating with upper neighbors not ready" | |||
|
206 | return | |||
|
207 | ||||
|
208 | loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] | |||
|
209 | loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] | |||
|
210 | ||||
|
211 | lower_x_neigh = self.lower_neighbors[0] | |||
|
212 | upper_x_neigh = self.upper_neighbors[0] | |||
|
213 | lower_y_neigh = self.lower_neighbors[1] | |||
|
214 | upper_y_neigh = self.upper_neighbors[1] | |||
|
215 | ||||
|
216 | # communicate in the x-direction first | |||
|
217 | if lower_x_neigh>-1: | |||
|
218 | if self.slice_copy: | |||
|
219 | self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) | |||
|
220 | else: | |||
|
221 | for i in xrange(0,loc_ny+1): | |||
|
222 | self.out_lower_buffers[0][i] = solution_array[1,i] | |||
|
223 | mpi.Isend(self.out_lower_buffers[0], lower_x_neigh) | |||
|
224 | ||||
|
225 | if upper_x_neigh>-1: | |||
|
226 | mpi.Recv(self.in_upper_buffers[0], upper_x_neigh) | |||
|
227 | if self.slice_copy: | |||
|
228 | solution_array[loc_nx,:] = self.in_upper_buffers[0] | |||
|
229 | self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) | |||
|
230 | else: | |||
|
231 | for i in xrange(0,loc_ny+1): | |||
|
232 | solution_array[loc_nx,i] = self.in_upper_buffers[0][i] | |||
|
233 | self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] | |||
|
234 | mpi.Isend(self.out_upper_buffers[0], upper_x_neigh) | |||
|
235 | ||||
|
236 | if lower_x_neigh>-1: | |||
|
237 | mpi.Recv(self.in_lower_buffers[0], lower_x_neigh) | |||
|
238 | if self.slice_copy: | |||
|
239 | solution_array[0,:] = self.in_lower_buffers[0] | |||
|
240 | else: | |||
|
241 | for i in xrange(0,loc_ny+1): | |||
|
242 | solution_array[0,i] = self.in_lower_buffers[0][i] | |||
|
243 | ||||
|
244 | # communicate in the y-direction afterwards | |||
|
245 | if lower_y_neigh>-1: | |||
|
246 | if self.slice_copy: | |||
|
247 | self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) | |||
|
248 | else: | |||
|
249 | for i in xrange(0,loc_nx+1): | |||
|
250 | self.out_lower_buffers[1][i] = solution_array[i,1] | |||
|
251 | mpi.Isend(self.out_lower_buffers[1], lower_y_neigh) | |||
|
252 | ||||
|
253 | if upper_y_neigh>-1: | |||
|
254 | mpi.Recv(self.in_upper_buffers[1], upper_y_neigh) | |||
|
255 | if self.slice_copy: | |||
|
256 | solution_array[:,loc_ny] = self.in_upper_buffers[1] | |||
|
257 | self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) | |||
|
258 | else: | |||
|
259 | for i in xrange(0,loc_nx+1): | |||
|
260 | solution_array[i,loc_ny] = self.in_upper_buffers[1][i] | |||
|
261 | self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] | |||
|
262 | mpi.Isend(self.out_upper_buffers[1], upper_y_neigh) | |||
|
263 | ||||
|
264 | if lower_y_neigh>-1: | |||
|
265 | mpi.Recv(self.in_lower_buffers[1], lower_y_neigh) | |||
|
266 | if self.slice_copy: | |||
|
267 | solution_array[:,0] = self.in_lower_buffers[1] | |||
|
268 | else: | |||
|
269 | for i in xrange(0,loc_nx+1): | |||
|
270 | solution_array[i,0] = self.in_lower_buffers[1][i] | |||
|
271 | ||||
|
272 | class ZMQRectPartitioner2D(RectPartitioner2D): | |||
|
273 | """ | |||
|
274 | Subclass of RectPartitioner2D, which uses 0MQ via pyzmq for communication | |||
|
275 | The first two arguments must be `comm`, an EngineCommunicator object, | |||
|
276 | and `addrs`, a dict of connection information for other EngineCommunicator | |||
|
277 | objects. | |||
|
278 | """ | |||
|
279 | ||||
|
280 | def __init__(self, comm, addrs, my_id=-1, num_procs=-1, | |||
|
281 | global_num_cells=[], num_parts=[], | |||
|
282 | slice_copy=True): | |||
|
283 | RectPartitioner.__init__(self, my_id, num_procs, | |||
|
284 | global_num_cells, num_parts) | |||
|
285 | self.slice_copy = slice_copy | |||
|
286 | self.comm = comm # an Engine | |||
|
287 | self.addrs = addrs | |||
|
288 | ||||
|
289 | def prepare_communication(self): | |||
|
290 | RectPartitioner2D.prepare_communication(self) | |||
|
291 | # connect west/south to east/north | |||
|
292 | west_id,south_id = self.lower_neighbors[:2] | |||
|
293 | west = self.addrs.get(west_id, None) | |||
|
294 | south = self.addrs.get(south_id, None) | |||
|
295 | self.comm.connect(south, west) | |||
|
296 | ||||
|
297 | def update_internal_boundary_x_y (self, solution_array): | |||
|
298 | """update the inner boundary with the same send/recv pattern as the MPIPartitioner""" | |||
|
299 | nsd_ = self.nsd | |||
|
300 | dtype = solution_array.dtype | |||
|
301 | if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): | |||
|
302 | print "Buffers for communicating with lower neighbors not ready" | |||
|
303 | return | |||
|
304 | if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): | |||
|
305 | print "Buffers for communicating with upper neighbors not ready" | |||
|
306 | return | |||
|
307 | ||||
|
308 | loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] | |||
|
309 | loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] | |||
|
310 | ||||
|
311 | lower_x_neigh = self.lower_neighbors[0] | |||
|
312 | upper_x_neigh = self.upper_neighbors[0] | |||
|
313 | lower_y_neigh = self.lower_neighbors[1] | |||
|
314 | upper_y_neigh = self.upper_neighbors[1] | |||
|
315 | trackers = [] | |||
|
316 | flags = dict(copy=False, track=False) | |||
|
317 | # communicate in the x-direction first | |||
|
318 | if lower_x_neigh>-1: | |||
|
319 | if self.slice_copy: | |||
|
320 | self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) | |||
|
321 | else: | |||
|
322 | for i in xrange(0,loc_ny+1): | |||
|
323 | self.out_lower_buffers[0][i] = solution_array[1,i] | |||
|
324 | t = self.comm.west.send(self.out_lower_buffers[0], **flags) | |||
|
325 | trackers.append(t) | |||
|
326 | ||||
|
327 | if upper_x_neigh>-1: | |||
|
328 | msg = self.comm.east.recv(copy=False) | |||
|
329 | self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype) | |||
|
330 | if self.slice_copy: | |||
|
331 | solution_array[loc_nx,:] = self.in_upper_buffers[0] | |||
|
332 | self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) | |||
|
333 | else: | |||
|
334 | for i in xrange(0,loc_ny+1): | |||
|
335 | solution_array[loc_nx,i] = self.in_upper_buffers[0][i] | |||
|
336 | self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] | |||
|
337 | t = self.comm.east.send(self.out_upper_buffers[0], **flags) | |||
|
338 | trackers.append(t) | |||
|
339 | ||||
|
340 | ||||
|
341 | if lower_x_neigh>-1: | |||
|
342 | msg = self.comm.west.recv(copy=False) | |||
|
343 | self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype) | |||
|
344 | if self.slice_copy: | |||
|
345 | solution_array[0,:] = self.in_lower_buffers[0] | |||
|
346 | else: | |||
|
347 | for i in xrange(0,loc_ny+1): | |||
|
348 | solution_array[0,i] = self.in_lower_buffers[0][i] | |||
|
349 | ||||
|
350 | # communicate in the y-direction afterwards | |||
|
351 | if lower_y_neigh>-1: | |||
|
352 | if self.slice_copy: | |||
|
353 | self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) | |||
|
354 | else: | |||
|
355 | for i in xrange(0,loc_nx+1): | |||
|
356 | self.out_lower_buffers[1][i] = solution_array[i,1] | |||
|
357 | t = self.comm.south.send(self.out_lower_buffers[1], **flags) | |||
|
358 | trackers.append(t) | |||
|
359 | ||||
|
360 | ||||
|
361 | if upper_y_neigh>-1: | |||
|
362 | msg = self.comm.north.recv(copy=False) | |||
|
363 | self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype) | |||
|
364 | if self.slice_copy: | |||
|
365 | solution_array[:,loc_ny] = self.in_upper_buffers[1] | |||
|
366 | self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) | |||
|
367 | else: | |||
|
368 | for i in xrange(0,loc_nx+1): | |||
|
369 | solution_array[i,loc_ny] = self.in_upper_buffers[1][i] | |||
|
370 | self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] | |||
|
371 | t = self.comm.north.send(self.out_upper_buffers[1], **flags) | |||
|
372 | trackers.append(t) | |||
|
373 | ||||
|
374 | if lower_y_neigh>-1: | |||
|
375 | msg = self.comm.south.recv(copy=False) | |||
|
376 | self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype) | |||
|
377 | if self.slice_copy: | |||
|
378 | solution_array[:,0] = self.in_lower_buffers[1] | |||
|
379 | else: | |||
|
380 | for i in xrange(0,loc_nx+1): | |||
|
381 | solution_array[i,0] = self.in_lower_buffers[1][i] | |||
|
382 | ||||
|
383 | # wait for sends to complete: | |||
|
384 | if flags['track']: | |||
|
385 | for t in trackers: | |||
|
386 | t.wait() | |||
|
387 | ||||
|
388 | def update_internal_boundary_send_recv (self, solution_array): | |||
|
389 | """update the inner boundary, sending first, then recving""" | |||
|
390 | nsd_ = self.nsd | |||
|
391 | dtype = solution_array.dtype | |||
|
392 | if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers): | |||
|
393 | print "Buffers for communicating with lower neighbors not ready" | |||
|
394 | return | |||
|
395 | if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers): | |||
|
396 | print "Buffers for communicating with upper neighbors not ready" | |||
|
397 | return | |||
|
398 | ||||
|
399 | loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0] | |||
|
400 | loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1] | |||
|
401 | ||||
|
402 | lower_x_neigh = self.lower_neighbors[0] | |||
|
403 | upper_x_neigh = self.upper_neighbors[0] | |||
|
404 | lower_y_neigh = self.lower_neighbors[1] | |||
|
405 | upper_y_neigh = self.upper_neighbors[1] | |||
|
406 | trackers = [] | |||
|
407 | flags = dict(copy=False, track=False) | |||
|
408 | ||||
|
409 | # send in all directions first | |||
|
410 | if lower_x_neigh>-1: | |||
|
411 | if self.slice_copy: | |||
|
412 | self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:]) | |||
|
413 | else: | |||
|
414 | for i in xrange(0,loc_ny+1): | |||
|
415 | self.out_lower_buffers[0][i] = solution_array[1,i] | |||
|
416 | t = self.comm.west.send(self.out_lower_buffers[0], **flags) | |||
|
417 | trackers.append(t) | |||
|
418 | ||||
|
419 | if lower_y_neigh>-1: | |||
|
420 | if self.slice_copy: | |||
|
421 | self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1]) | |||
|
422 | else: | |||
|
423 | for i in xrange(0,loc_nx+1): | |||
|
424 | self.out_lower_buffers[1][i] = solution_array[i,1] | |||
|
425 | t = self.comm.south.send(self.out_lower_buffers[1], **flags) | |||
|
426 | trackers.append(t) | |||
|
427 | ||||
|
428 | if upper_x_neigh>-1: | |||
|
429 | if self.slice_copy: | |||
|
430 | self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:]) | |||
|
431 | else: | |||
|
432 | for i in xrange(0,loc_ny+1): | |||
|
433 | self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i] | |||
|
434 | t = self.comm.east.send(self.out_upper_buffers[0], **flags) | |||
|
435 | trackers.append(t) | |||
|
436 | ||||
|
437 | if upper_y_neigh>-1: | |||
|
438 | if self.slice_copy: | |||
|
439 | self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1]) | |||
|
440 | else: | |||
|
441 | for i in xrange(0,loc_nx+1): | |||
|
442 | self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1] | |||
|
443 | t = self.comm.north.send(self.out_upper_buffers[1], **flags) | |||
|
444 | trackers.append(t) | |||
|
445 | ||||
|
446 | ||||
|
447 | # now start receiving | |||
|
448 | if upper_x_neigh>-1: | |||
|
449 | msg = self.comm.east.recv(copy=False) | |||
|
450 | self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype) | |||
|
451 | if self.slice_copy: | |||
|
452 | solution_array[loc_nx,:] = self.in_upper_buffers[0] | |||
|
453 | else: | |||
|
454 | for i in xrange(0,loc_ny+1): | |||
|
455 | solution_array[loc_nx,i] = self.in_upper_buffers[0][i] | |||
|
456 | ||||
|
457 | if lower_x_neigh>-1: | |||
|
458 | msg = self.comm.west.recv(copy=False) | |||
|
459 | self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype) | |||
|
460 | if self.slice_copy: | |||
|
461 | solution_array[0,:] = self.in_lower_buffers[0] | |||
|
462 | else: | |||
|
463 | for i in xrange(0,loc_ny+1): | |||
|
464 | solution_array[0,i] = self.in_lower_buffers[0][i] | |||
|
465 | ||||
|
466 | if upper_y_neigh>-1: | |||
|
467 | msg = self.comm.north.recv(copy=False) | |||
|
468 | self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype) | |||
|
469 | if self.slice_copy: | |||
|
470 | solution_array[:,loc_ny] = self.in_upper_buffers[1] | |||
|
471 | else: | |||
|
472 | for i in xrange(0,loc_nx+1): | |||
|
473 | solution_array[i,loc_ny] = self.in_upper_buffers[1][i] | |||
|
474 | ||||
|
475 | if lower_y_neigh>-1: | |||
|
476 | msg = self.comm.south.recv(copy=False) | |||
|
477 | self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype) | |||
|
478 | if self.slice_copy: | |||
|
479 | solution_array[:,0] = self.in_lower_buffers[1] | |||
|
480 | else: | |||
|
481 | for i in xrange(0,loc_nx+1): | |||
|
482 | solution_array[i,0] = self.in_lower_buffers[1][i] | |||
|
483 | ||||
|
484 | # wait for sends to complete: | |||
|
485 | if flags['track']: | |||
|
486 | for t in trackers: | |||
|
487 | t.wait() | |||
|
488 | ||||
|
489 | # use send/recv pattern instead of x/y sweeps | |||
|
490 | update_internal_boundary = update_internal_boundary_send_recv | |||
|
491 |
@@ -0,0 +1,59 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A simple Communicator class that has N,E,S,W neighbors connected via 0MQ PEER sockets""" | |||
|
3 | ||||
|
4 | import socket | |||
|
5 | ||||
|
6 | import zmq | |||
|
7 | ||||
|
8 | from IPython.zmq.parallel.util import disambiguate_url | |||
|
9 | ||||
|
10 | class EngineCommunicator(object): | |||
|
11 | """An object that connects Engines to each other. | |||
|
12 | north and west sockets listen, while south and east sockets connect. | |||
|
13 | ||||
|
14 | This class is useful in cases where there is a set of nodes that | |||
|
15 | must communicate only with their nearest neighbors. | |||
|
16 | """ | |||
|
17 | ||||
|
18 | def __init__(self, interface='tcp://*', identity=None): | |||
|
19 | self._ctx = zmq.Context() | |||
|
20 | self.north = self._ctx.socket(zmq.PAIR) | |||
|
21 | self.west = self._ctx.socket(zmq.PAIR) | |||
|
22 | self.south = self._ctx.socket(zmq.PAIR) | |||
|
23 | self.east = self._ctx.socket(zmq.PAIR) | |||
|
24 | ||||
|
25 | # bind to ports | |||
|
26 | northport = self.north.bind_to_random_port(interface) | |||
|
27 | eastport = self.east.bind_to_random_port(interface) | |||
|
28 | ||||
|
29 | self.north_url = interface+":%i"%northport | |||
|
30 | self.east_url = interface+":%i"%eastport | |||
|
31 | ||||
|
32 | # guess first public IP from socket | |||
|
33 | self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0] | |||
|
34 | ||||
|
35 | def __del__(self): | |||
|
36 | self.north.close() | |||
|
37 | self.south.close() | |||
|
38 | self.east.close() | |||
|
39 | self.west.close() | |||
|
40 | self._ctx.term() | |||
|
41 | ||||
|
42 | @property | |||
|
43 | def info(self): | |||
|
44 | """return the connection info for this object's sockets.""" | |||
|
45 | return (self.location, self.north_url, self.east_url) | |||
|
46 | ||||
|
47 | def connect(self, south_peer=None, west_peer=None): | |||
|
48 | """connect to peers. `peers` will be a 3-tuples, of the form: | |||
|
49 | (location, north_addr, east_addr) | |||
|
50 | as produced by | |||
|
51 | """ | |||
|
52 | if south_peer is not None: | |||
|
53 | location, url, _ = south_peer | |||
|
54 | self.south.connect(disambiguate_url(url, location)) | |||
|
55 | if west_peer is not None: | |||
|
56 | location, _, url = west_peer | |||
|
57 | self.west.connect(disambiguate_url(url, location)) | |||
|
58 | ||||
|
59 |
@@ -0,0 +1,200 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | A simple python program of solving a 2D wave equation in parallel. | |||
|
4 | Domain partitioning and inter-processor communication | |||
|
5 | are done by an object of class MPIRectPartitioner2D | |||
|
6 | (which is a subclass of RectPartitioner2D and uses MPI via mpi4py) | |||
|
7 | ||||
|
8 | An example of running the program is (8 processors, 4x2 partition, | |||
|
9 | 400x100 grid cells):: | |||
|
10 | ||||
|
11 | $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured) | |||
|
12 | $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi | |||
|
13 | ||||
|
14 | See also parallelwave-mpi, which runs the same program, but uses MPI | |||
|
15 | (via mpi4py) for the inter-engine communication. | |||
|
16 | ||||
|
17 | Authors | |||
|
18 | ------- | |||
|
19 | ||||
|
20 | * Xing Cai | |||
|
21 | * Min Ragan-Kelley | |||
|
22 | ||||
|
23 | """ | |||
|
24 | ||||
|
25 | import sys | |||
|
26 | import time | |||
|
27 | ||||
|
28 | from numpy import exp, zeros, newaxis, sqrt | |||
|
29 | ||||
|
30 | from IPython.external import argparse | |||
|
31 | from IPython.zmq.parallel.client import Client, Reference | |||
|
32 | ||||
|
33 | def setup_partitioner(ns, index, num_procs, gnum_cells, parts): | |||
|
34 | """create a partitioner in the engine namespace""" | |||
|
35 | p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs) | |||
|
36 | p.redim(global_num_cells=gnum_cells, num_parts=parts) | |||
|
37 | p.prepare_communication() | |||
|
38 | # put the partitioner into the global namespace: | |||
|
39 | ns.partitioner=p | |||
|
40 | ||||
|
41 | def setup_solver(ns, *args, **kwargs): | |||
|
42 | """create a WaveSolver in the engine namespace""" | |||
|
43 | ns.solver = WaveSolver(*args, **kwargs) | |||
|
44 | ||||
|
45 | def wave_saver(u, x, y, t): | |||
|
46 | """save the wave log""" | |||
|
47 | global u_hist | |||
|
48 | global t_hist | |||
|
49 | t_hist.append(t) | |||
|
50 | u_hist.append(1.0*u) | |||
|
51 | ||||
|
52 | ||||
|
53 | # main program: | |||
|
54 | if __name__ == '__main__': | |||
|
55 | ||||
|
56 | parser = argparse.ArgumentParser() | |||
|
57 | paa = parser.add_argument | |||
|
58 | paa('--grid', '-g', | |||
|
59 | type=int, nargs=2, default=[100,100], dest='grid', | |||
|
60 | help="Cells in the grid, e.g. --grid 100 200") | |||
|
61 | paa('--partition', '-p', | |||
|
62 | type=int, nargs=2, default=None, | |||
|
63 | help="Process partition grid, e.g. --partition 4 2 for 4x2") | |||
|
64 | paa('-c', | |||
|
65 | type=float, default=1., | |||
|
66 | help="Wave speed (I think)") | |||
|
67 | paa('-Ly', | |||
|
68 | type=float, default=1., | |||
|
69 | help="system size (in y)") | |||
|
70 | paa('-Lx', | |||
|
71 | type=float, default=1., | |||
|
72 | help="system size (in x)") | |||
|
73 | paa('-t', '--tstop', | |||
|
74 | type=float, default=1., | |||
|
75 | help="Time units to run") | |||
|
76 | paa('--profile', | |||
|
77 | type=unicode, default=u'default', | |||
|
78 | help="Specify the ipcluster profile for the client to connect to.") | |||
|
79 | paa('--save', | |||
|
80 | action='store_true', | |||
|
81 | help="Add this flag to save the time/wave history during the run.") | |||
|
82 | paa('--scalar', | |||
|
83 | action='store_true', | |||
|
84 | help="Also run with scalar interior implementation, to see vector speedup.") | |||
|
85 | ||||
|
86 | ns = parser.parse_args() | |||
|
87 | # set up arguments | |||
|
88 | grid = ns.grid | |||
|
89 | partition = ns.partition | |||
|
90 | Lx = ns.Lx | |||
|
91 | Ly = ns.Ly | |||
|
92 | c = ns.c | |||
|
93 | tstop = ns.tstop | |||
|
94 | if ns.save: | |||
|
95 | user_action = wave_saver | |||
|
96 | else: | |||
|
97 | user_action = None | |||
|
98 | ||||
|
99 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) | |||
|
100 | final_test = True | |||
|
101 | ||||
|
102 | # create the Client | |||
|
103 | rc = Client(profile=ns.profile) | |||
|
104 | num_procs = len(rc.ids) | |||
|
105 | ||||
|
106 | if partition is None: | |||
|
107 | partition = [1,num_procs] | |||
|
108 | ||||
|
109 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |||
|
110 | ||||
|
111 | # functions defining initial/boundary/source conditions | |||
|
112 | def I(x,y): | |||
|
113 | from numpy import exp | |||
|
114 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) | |||
|
115 | def f(x,y,t): | |||
|
116 | return 0.0 | |||
|
117 | # from numpy import exp,sin | |||
|
118 | # return 10*exp(-(x - sin(100*t))**2) | |||
|
119 | def bc(x,y,t): | |||
|
120 | return 0.0 | |||
|
121 | ||||
|
122 | # initial imports, setup rank | |||
|
123 | rc[:].execute('\n'.join([ | |||
|
124 | "from mpi4py import MPI", | |||
|
125 | "import numpy", | |||
|
126 | "mpi = MPI.COMM_WORLD", | |||
|
127 | "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True) | |||
|
128 | ||||
|
129 | # initialize t_hist/u_hist for saving the state at each step (optional) | |||
|
130 | rc[:]['t_hist'] = [] | |||
|
131 | rc[:]['u_hist'] = [] | |||
|
132 | ||||
|
133 | # set vector/scalar implementation details | |||
|
134 | impl = {} | |||
|
135 | impl['ic'] = 'vectorized' | |||
|
136 | impl['inner'] = 'scalar' | |||
|
137 | impl['bc'] = 'vectorized' | |||
|
138 | ||||
|
139 | # execute some files so that the classes we need will be defined on the engines: | |||
|
140 | rc[:].run('RectPartitioner.py') | |||
|
141 | rc[:].run('wavesolver.py') | |||
|
142 | ||||
|
143 | # setup remote partitioner | |||
|
144 | # note that Reference means that the argument passed to setup_partitioner will be the | |||
|
145 | # object named 'my_id' in the engine's namespace | |||
|
146 | rc[:].apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition) | |||
|
147 | # wait for initial communication to complete | |||
|
148 | rc[:].execute('mpi.barrier()') | |||
|
149 | # setup remote solvers | |||
|
150 | rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
|
151 | ||||
|
152 | # lambda for calling solver.solve: | |||
|
153 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |||
|
154 | ||||
|
155 | if ns.scalar: | |||
|
156 | impl['inner'] = 'scalar' | |||
|
157 | # run first with element-wise Python operations for each cell | |||
|
158 | t0 = time.time() | |||
|
159 | ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |||
|
160 | if final_test: | |||
|
161 | # this sum is performed element-wise as results finish | |||
|
162 | s = sum(ar) | |||
|
163 | # the L2 norm (RMS) of the result: | |||
|
164 | norm = sqrt(s/num_cells) | |||
|
165 | else: | |||
|
166 | norm = -1 | |||
|
167 | t1 = time.time() | |||
|
168 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
|
169 | ||||
|
170 | impl['inner'] = 'vectorized' | |||
|
171 | # setup new solvers | |||
|
172 | rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
|
173 | rc[:].execute('mpi.barrier()') | |||
|
174 | ||||
|
175 | # run again with numpy vectorized inner-implementation | |||
|
176 | t0 = time.time() | |||
|
177 | ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) | |||
|
178 | if final_test: | |||
|
179 | # this sum is performed element-wise as results finish | |||
|
180 | s = sum(ar) | |||
|
181 | # the L2 norm (RMS) of the result: | |||
|
182 | norm = sqrt(s/num_cells) | |||
|
183 | else: | |||
|
184 | norm = -1 | |||
|
185 | t1 = time.time() | |||
|
186 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
|
187 | ||||
|
188 | # if ns.save is True, then u_hist stores the history of u as a list | |||
|
189 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |||
|
190 | if ns.save and partition[-1] == 1: | |||
|
191 | import pylab | |||
|
192 | rc[:].execute('u_last=u_hist[-1]') | |||
|
193 | # map mpi IDs to IPython IDs, which may not match | |||
|
194 | ranks = rc[:]['my_id'] | |||
|
195 | targets = range(len(ranks)) | |||
|
196 | for idx in range(len(ranks)): | |||
|
197 | targets[idx] = ranks.index(idx) | |||
|
198 | u_last = rc[targets].gather('u_last', block=True) | |||
|
199 | pylab.pcolor(u_last) | |||
|
200 | pylab.show() |
@@ -0,0 +1,201 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | A simple python program of solving a 2D wave equation in parallel. | |||
|
4 | Domain partitioning and inter-processor communication | |||
|
5 | are done by an object of class ZMQRectPartitioner2D | |||
|
6 | (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq) | |||
|
7 | ||||
|
8 | An example of running the program is (8 processors, 4x2 partition, | |||
|
9 | 200x200 grid cells):: | |||
|
10 | ||||
|
11 | $ ipclusterz start -n 8 # start 8 engines | |||
|
12 | $ ./parallelwave.py --grid 200 200 --partition 4 2 | |||
|
13 | ||||
|
14 | See also parallelwave-mpi, which runs the same program, but uses MPI | |||
|
15 | (via mpi4py) for the inter-engine communication. | |||
|
16 | ||||
|
17 | Authors | |||
|
18 | ------- | |||
|
19 | ||||
|
20 | * Xing Cai | |||
|
21 | * Min Ragan-Kelley | |||
|
22 | ||||
|
23 | """ | |||
|
24 | # | |||
|
25 | import sys | |||
|
26 | import time | |||
|
27 | ||||
|
28 | from numpy import exp, zeros, newaxis, sqrt | |||
|
29 | ||||
|
30 | from IPython.external import argparse | |||
|
31 | from IPython.zmq.parallel.client import Client, Reference | |||
|
32 | ||||
|
33 | def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts): | |||
|
34 | """create a partitioner in the engine namespace""" | |||
|
35 | p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs) | |||
|
36 | p.redim(global_num_cells=gnum_cells, num_parts=parts) | |||
|
37 | p.prepare_communication() | |||
|
38 | # put the partitioner into the global namespace: | |||
|
39 | ns.partitioner=p | |||
|
40 | ||||
|
41 | def setup_solver(ns, *args, **kwargs): | |||
|
42 | """create a WaveSolver in the engine namespace.""" | |||
|
43 | ns.solver = WaveSolver(*args, **kwargs) | |||
|
44 | ||||
|
45 | def wave_saver(u, x, y, t): | |||
|
46 | """save the wave state for each timestep.""" | |||
|
47 | global u_hist | |||
|
48 | global t_hist | |||
|
49 | t_hist.append(t) | |||
|
50 | u_hist.append(1.0*u) | |||
|
51 | ||||
|
52 | ||||
|
53 | # main program: | |||
|
54 | if __name__ == '__main__': | |||
|
55 | ||||
|
56 | parser = argparse.ArgumentParser() | |||
|
57 | paa = parser.add_argument | |||
|
58 | paa('--grid', '-g', | |||
|
59 | type=int, nargs=2, default=[100,100], dest='grid', | |||
|
60 | help="Cells in the grid, e.g. --grid 100 200") | |||
|
61 | paa('--partition', '-p', | |||
|
62 | type=int, nargs=2, default=None, | |||
|
63 | help="Process partition grid, e.g. --partition 4 2 for 4x2") | |||
|
64 | paa('-c', | |||
|
65 | type=float, default=1., | |||
|
66 | help="Wave speed (I think)") | |||
|
67 | paa('-Ly', | |||
|
68 | type=float, default=1., | |||
|
69 | help="system size (in y)") | |||
|
70 | paa('-Lx', | |||
|
71 | type=float, default=1., | |||
|
72 | help="system size (in x)") | |||
|
73 | paa('-t', '--tstop', | |||
|
74 | type=float, default=1., | |||
|
75 | help="Time units to run") | |||
|
76 | paa('--profile', | |||
|
77 | type=unicode, default=u'default', | |||
|
78 | help="Specify the ipcluster profile for the client to connect to.") | |||
|
79 | paa('--save', | |||
|
80 | action='store_true', | |||
|
81 | help="Add this flag to save the time/wave history during the run.") | |||
|
82 | paa('--scalar', | |||
|
83 | action='store_true', | |||
|
84 | help="Also run with scalar interior implementation, to see vector speedup.") | |||
|
85 | ||||
|
86 | ns = parser.parse_args() | |||
|
87 | # set up arguments | |||
|
88 | grid = ns.grid | |||
|
89 | partition = ns.partition | |||
|
90 | Lx = ns.Lx | |||
|
91 | Ly = ns.Ly | |||
|
92 | c = ns.c | |||
|
93 | tstop = ns.tstop | |||
|
94 | if ns.save: | |||
|
95 | user_action = wave_saver | |||
|
96 | else: | |||
|
97 | user_action = None | |||
|
98 | ||||
|
99 | num_cells = 1.0*(grid[0]-1)*(grid[1]-1) | |||
|
100 | final_test = True | |||
|
101 | ||||
|
102 | # create the Client | |||
|
103 | rc = Client(profile=ns.profile) | |||
|
104 | num_procs = len(rc.ids) | |||
|
105 | ||||
|
106 | if partition is None: | |||
|
107 | partition = [num_procs,1] | |||
|
108 | ||||
|
109 | assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs) | |||
|
110 | ||||
|
111 | # functions defining initial/boundary/source conditions | |||
|
112 | def I(x,y): | |||
|
113 | from numpy import exp | |||
|
114 | return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2)) | |||
|
115 | def f(x,y,t): | |||
|
116 | return 0.0 | |||
|
117 | # from numpy import exp,sin | |||
|
118 | # return 10*exp(-(x - sin(100*t))**2) | |||
|
119 | def bc(x,y,t): | |||
|
120 | return 0.0 | |||
|
121 | ||||
|
122 | # initialize t_hist/u_hist for saving the state at each step (optional) | |||
|
123 | rc[:]['t_hist'] = [] | |||
|
124 | rc[:]['u_hist'] = [] | |||
|
125 | ||||
|
126 | # set vector/scalar implementation details | |||
|
127 | impl = {} | |||
|
128 | impl['ic'] = 'vectorized' | |||
|
129 | impl['inner'] = 'scalar' | |||
|
130 | impl['bc'] = 'vectorized' | |||
|
131 | ||||
|
132 | # execute some files so that the classes we need will be defined on the engines: | |||
|
133 | rc[:].execute('import numpy') | |||
|
134 | rc[:].run('communicator.py') | |||
|
135 | rc[:].run('RectPartitioner.py') | |||
|
136 | rc[:].run('wavesolver.py') | |||
|
137 | ||||
|
138 | # scatter engine IDs | |||
|
139 | rc[:].scatter('my_id', rc.ids, flatten=True) | |||
|
140 | ||||
|
141 | # create the engine connectors | |||
|
142 | rc[:].execute('com = EngineCommunicator()') | |||
|
143 | ||||
|
144 | # gather the connection information into a single dict | |||
|
145 | ar = rc[:].apply_async(lambda : com.info) | |||
|
146 | peers = ar.get_dict() | |||
|
147 | # print peers | |||
|
148 | # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators | |||
|
149 | ||||
|
150 | # setup remote partitioner | |||
|
151 | # note that Reference means that the argument passed to setup_partitioner will be the | |||
|
152 | # object named 'com' in the engine's namespace | |||
|
153 | rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition) | |||
|
154 | time.sleep(1) | |||
|
155 | # convenience lambda to call solver.solve: | |||
|
156 | _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs) | |||
|
157 | ||||
|
158 | if ns.scalar: | |||
|
159 | impl['inner'] = 'scalar' | |||
|
160 | # setup remote solvers | |||
|
161 | rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
|
162 | ||||
|
163 | # run first with element-wise Python operations for each cell | |||
|
164 | t0 = time.time() | |||
|
165 | ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action) | |||
|
166 | if final_test: | |||
|
167 | # this sum is performed element-wise as results finish | |||
|
168 | s = sum(ar) | |||
|
169 | # the L2 norm (RMS) of the result: | |||
|
170 | norm = sqrt(s/num_cells) | |||
|
171 | else: | |||
|
172 | norm = -1 | |||
|
173 | t1 = time.time() | |||
|
174 | print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
|
175 | ||||
|
176 | # run again with faster numpy-vectorized inner implementation: | |||
|
177 | impl['inner'] = 'vectorized' | |||
|
178 | # setup remote solvers | |||
|
179 | rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl) | |||
|
180 | ||||
|
181 | t0 = time.time() | |||
|
182 | ||||
|
183 | ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver) | |||
|
184 | if final_test: | |||
|
185 | # this sum is performed element-wise as results finish | |||
|
186 | s = sum(ar) | |||
|
187 | # the L2 norm (RMS) of the result: | |||
|
188 | norm = sqrt(s/num_cells) | |||
|
189 | else: | |||
|
190 | norm = -1 | |||
|
191 | t1 = time.time() | |||
|
192 | print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm) | |||
|
193 | ||||
|
194 | # if ns.save is True, then u_hist stores the history of u as a list | |||
|
195 | # If the partion scheme is Nx1, then u can be reconstructed via 'gather': | |||
|
196 | if ns.save and partition[-1] == 1: | |||
|
197 | import pylab | |||
|
198 | rc[:].execute('u_last=u_hist[-1]') | |||
|
199 | u_last = rc[:].gather('u_last', block=True) | |||
|
200 | pylab.pcolor(u_last) | |||
|
201 | pylab.show() No newline at end of file |
@@ -0,0 +1,269 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | A simple WaveSolver class for evolving the wave equation in 2D. | |||
|
4 | This works in parallel by using a RectPartitioner object. | |||
|
5 | ||||
|
6 | Authors | |||
|
7 | ------- | |||
|
8 | ||||
|
9 | * Xing Cai | |||
|
10 | * Min Ragan-Kelley | |||
|
11 | ||||
|
12 | """ | |||
|
13 | import time | |||
|
14 | ||||
|
15 | from mpi4py import MPI | |||
|
16 | mpi = MPI.COMM_WORLD | |||
|
17 | from numpy import exp, zeros, newaxis, sqrt, arange | |||
|
18 | ||||
|
19 | def iseq(start=0, stop=None, inc=1): | |||
|
20 | """ | |||
|
21 | Generate integers from start to (and including!) stop, | |||
|
22 | with increment of inc. Alternative to range/xrange. | |||
|
23 | """ | |||
|
24 | if stop is None: # allow isequence(3) to be 0, 1, 2, 3 | |||
|
25 | # take 1st arg as stop, start as 0, and inc=1 | |||
|
26 | stop = start; start = 0; inc = 1 | |||
|
27 | return arange(start, stop+inc, inc) | |||
|
28 | ||||
|
29 | class WaveSolver(object): | |||
|
30 | """ | |||
|
31 | Solve the 2D wave equation u_tt = u_xx + u_yy + f(x,y,t) with | |||
|
32 | u = bc(x,y,t) on the boundary and initial condition du/dt = 0. | |||
|
33 | ||||
|
34 | Parallelization by using a RectPartitioner object 'partitioner' | |||
|
35 | ||||
|
36 | nx and ny are the total number of global grid cells in the x and y | |||
|
37 | directions. The global grid points are numbered as (0,0), (1,0), (2,0), | |||
|
38 | ..., (nx,0), (0,1), (1,1), ..., (nx, ny). | |||
|
39 | ||||
|
40 | dt is the time step. If dt<=0, an optimal time step is used. | |||
|
41 | ||||
|
42 | tstop is the stop time for the simulation. | |||
|
43 | ||||
|
44 | I, f are functions: I(x,y), f(x,y,t) | |||
|
45 | ||||
|
46 | user_action: function of (u, x, y, t) called at each time | |||
|
47 | level (x and y are one-dimensional coordinate vectors). | |||
|
48 | This function allows the calling code to plot the solution, | |||
|
49 | compute errors, etc. | |||
|
50 | ||||
|
51 | implementation: a dictionary specifying how the initial | |||
|
52 | condition ('ic'), the scheme over inner points ('inner'), | |||
|
53 | and the boundary conditions ('bc') are to be implemented. | |||
|
54 | Normally, values are legal: 'scalar' or 'vectorized'. | |||
|
55 | 'scalar' means straight loops over grid points, while | |||
|
56 | 'vectorized' means special NumPy vectorized operations. | |||
|
57 | ||||
|
58 | If a key in the implementation dictionary is missing, it | |||
|
59 | defaults in this function to 'scalar' (the safest strategy). | |||
|
60 | Note that if 'vectorized' is specified, the functions I, f, | |||
|
61 | and bc must work in vectorized mode. It is always recommended | |||
|
62 | to first run the 'scalar' mode and then compare 'vectorized' | |||
|
63 | results with the 'scalar' results to check that I, f, and bc | |||
|
64 | work. | |||
|
65 | ||||
|
66 | verbose: true if a message at each time step is written, | |||
|
67 | false implies no output during the simulation. | |||
|
68 | ||||
|
69 | final_test: true means the discrete L2-norm of the final solution is | |||
|
70 | to be computed. | |||
|
71 | """ | |||
|
72 | ||||
|
73 | def __init__(self, I, f, c, bc, Lx, Ly, partitioner=None, dt=-1, | |||
|
74 | user_action=None, | |||
|
75 | implementation={'ic': 'vectorized', # or 'scalar' | |||
|
76 | 'inner': 'vectorized', | |||
|
77 | 'bc': 'vectorized'}): | |||
|
78 | ||||
|
79 | nx = partitioner.global_num_cells[0] # number of global cells in x dir | |||
|
80 | ny = partitioner.global_num_cells[1] # number of global cells in y dir | |||
|
81 | dx = Lx/float(nx) | |||
|
82 | dy = Ly/float(ny) | |||
|
83 | loc_nx, loc_ny = partitioner.get_num_loc_cells() | |||
|
84 | nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead | |||
|
85 | lo_ix0 = partitioner.subd_lo_ix[0] | |||
|
86 | lo_ix1 = partitioner.subd_lo_ix[1] | |||
|
87 | hi_ix0 = partitioner.subd_hi_ix[0] | |||
|
88 | hi_ix1 = partitioner.subd_hi_ix[1] | |||
|
89 | x = iseq(dx*lo_ix0, dx*hi_ix0, dx) # local grid points in x dir | |||
|
90 | y = iseq(dy*lo_ix1, dy*hi_ix1, dy) # local grid points in y dir | |||
|
91 | self.x = x | |||
|
92 | self.y = y | |||
|
93 | xv = x[:,newaxis] # for vectorized expressions with f(xv,yv) | |||
|
94 | yv = y[newaxis,:] # -- " -- | |||
|
95 | if dt <= 0: | |||
|
96 | dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step | |||
|
97 | Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables | |||
|
98 | ||||
|
99 | u = zeros((nx+1,ny+1)) # solution array | |||
|
100 | u_1 = u.copy() # solution at t-dt | |||
|
101 | u_2 = u.copy() # solution at t-2*dt | |||
|
102 | ||||
|
103 | # preserve for self.solve | |||
|
104 | implementation=dict(implementation) # copy | |||
|
105 | ||||
|
106 | if 'ic' not in implementation: | |||
|
107 | implementation['ic'] = 'scalar' | |||
|
108 | if 'bc' not in implementation: | |||
|
109 | implementation['bc'] = 'scalar' | |||
|
110 | if 'inner' not in implementation: | |||
|
111 | implementation['inner'] = 'scalar' | |||
|
112 | ||||
|
113 | self.implementation = implementation | |||
|
114 | self.Lx = Lx | |||
|
115 | self.Ly = Ly | |||
|
116 | self.I=I | |||
|
117 | self.f=f | |||
|
118 | self.c=c | |||
|
119 | self.bc=bc | |||
|
120 | self.user_action = user_action | |||
|
121 | self.partitioner=partitioner | |||
|
122 | ||||
|
123 | # set initial condition (pointwise - allows straight if-tests in I(x,y)): | |||
|
124 | t=0.0 | |||
|
125 | if implementation['ic'] == 'scalar': | |||
|
126 | for i in xrange(0,nx+1): | |||
|
127 | for j in xrange(0,ny+1): | |||
|
128 | u_1[i,j] = I(x[i], y[j]) | |||
|
129 | ||||
|
130 | for i in xrange(1,nx): | |||
|
131 | for j in xrange(1,ny): | |||
|
132 | u_2[i,j] = u_1[i,j] + \ | |||
|
133 | 0.5*Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \ | |||
|
134 | 0.5*Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \ | |||
|
135 | dt2*f(x[i], y[j], 0.0) | |||
|
136 | ||||
|
137 | # boundary values of u_2 (equals u(t=dt) due to du/dt=0) | |||
|
138 | i = 0 | |||
|
139 | for j in xrange(0,ny+1): | |||
|
140 | u_2[i,j] = bc(x[i], y[j], t+dt) | |||
|
141 | j = 0 | |||
|
142 | for i in xrange(0,nx+1): | |||
|
143 | u_2[i,j] = bc(x[i], y[j], t+dt) | |||
|
144 | i = nx | |||
|
145 | for j in xrange(0,ny+1): | |||
|
146 | u_2[i,j] = bc(x[i], y[j], t+dt) | |||
|
147 | j = ny | |||
|
148 | for i in xrange(0,nx+1): | |||
|
149 | u_2[i,j] = bc(x[i], y[j], t+dt) | |||
|
150 | ||||
|
151 | elif implementation['ic'] == 'vectorized': | |||
|
152 | u_1 = I(xv,yv) | |||
|
153 | u_2[1:nx,1:ny] = u_1[1:nx,1:ny] + \ | |||
|
154 | 0.5*Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \ | |||
|
155 | 0.5*Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \ | |||
|
156 | dt2*(f(xv[1:nx,1:ny], yv[1:nx,1:ny], 0.0)) | |||
|
157 | # boundary values (t=dt): | |||
|
158 | i = 0; u_2[i,:] = bc(x[i], y, t+dt) | |||
|
159 | j = 0; u_2[:,j] = bc(x, y[j], t+dt) | |||
|
160 | i = nx; u_2[i,:] = bc(x[i], y, t+dt) | |||
|
161 | j = ny; u_2[:,j] = bc(x, y[j], t+dt) | |||
|
162 | ||||
|
163 | if user_action is not None: | |||
|
164 | user_action(u_1, x, y, t) # allow user to plot etc. | |||
|
165 | # print list(self.us[2][2]) | |||
|
166 | self.us = (u,u_1,u_2) | |||
|
167 | ||||
|
168 | ||||
|
169 | def solve(self, tstop, dt=-1, user_action=None, verbose=False, final_test=False): | |||
|
170 | t0=time.time() | |||
|
171 | f=self.f | |||
|
172 | c=self.c | |||
|
173 | bc=self.bc | |||
|
174 | partitioner = self.partitioner | |||
|
175 | implementation = self.implementation | |||
|
176 | nx = partitioner.global_num_cells[0] # number of global cells in x dir | |||
|
177 | ny = partitioner.global_num_cells[1] # number of global cells in y dir | |||
|
178 | dx = self.Lx/float(nx) | |||
|
179 | dy = self.Ly/float(ny) | |||
|
180 | loc_nx, loc_ny = partitioner.get_num_loc_cells() | |||
|
181 | nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead | |||
|
182 | x = self.x | |||
|
183 | y = self.y | |||
|
184 | xv = x[:,newaxis] # for vectorized expressions with f(xv,yv) | |||
|
185 | yv = y[newaxis,:] # -- " -- | |||
|
186 | if dt <= 0: | |||
|
187 | dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step | |||
|
188 | Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables | |||
|
189 | # id for the four possible neighbor subdomains | |||
|
190 | lower_x_neigh = partitioner.lower_neighbors[0] | |||
|
191 | upper_x_neigh = partitioner.upper_neighbors[0] | |||
|
192 | lower_y_neigh = partitioner.lower_neighbors[1] | |||
|
193 | upper_y_neigh = partitioner.upper_neighbors[1] | |||
|
194 | u,u_1,u_2 = self.us | |||
|
195 | # u_1 = self.u_1 | |||
|
196 | ||||
|
197 | t = 0.0 | |||
|
198 | while t <= tstop: | |||
|
199 | t_old = t; t += dt | |||
|
200 | if verbose: | |||
|
201 | print 'solving (%s version) at t=%g' % \ | |||
|
202 | (implementation['inner'], t) | |||
|
203 | # update all inner points: | |||
|
204 | if implementation['inner'] == 'scalar': | |||
|
205 | for i in xrange(1, nx): | |||
|
206 | for j in xrange(1, ny): | |||
|
207 | u[i,j] = - u_2[i,j] + 2*u_1[i,j] + \ | |||
|
208 | Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \ | |||
|
209 | Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \ | |||
|
210 | dt2*f(x[i], y[j], t_old) | |||
|
211 | elif implementation['inner'] == 'vectorized': | |||
|
212 | u[1:nx,1:ny] = - u_2[1:nx,1:ny] + 2*u_1[1:nx,1:ny] + \ | |||
|
213 | Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \ | |||
|
214 | Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \ | |||
|
215 | dt2*f(xv[1:nx,1:ny], yv[1:nx,1:ny], t_old) | |||
|
216 | ||||
|
217 | # insert boundary conditions (if there's no neighbor): | |||
|
218 | if lower_x_neigh < 0: | |||
|
219 | if implementation['bc'] == 'scalar': | |||
|
220 | i = 0 | |||
|
221 | for j in xrange(0, ny+1): | |||
|
222 | u[i,j] = bc(x[i], y[j], t) | |||
|
223 | elif implementation['bc'] == 'vectorized': | |||
|
224 | u[0,:] = bc(x[0], y, t) | |||
|
225 | if upper_x_neigh < 0: | |||
|
226 | if implementation['bc'] == 'scalar': | |||
|
227 | i = nx | |||
|
228 | for j in xrange(0, ny+1): | |||
|
229 | u[i,j] = bc(x[i], y[j], t) | |||
|
230 | elif implementation['bc'] == 'vectorized': | |||
|
231 | u[nx,:] = bc(x[nx], y, t) | |||
|
232 | if lower_y_neigh < 0: | |||
|
233 | if implementation['bc'] == 'scalar': | |||
|
234 | j = 0 | |||
|
235 | for i in xrange(0, nx+1): | |||
|
236 | u[i,j] = bc(x[i], y[j], t) | |||
|
237 | elif implementation['bc'] == 'vectorized': | |||
|
238 | u[:,0] = bc(x, y[0], t) | |||
|
239 | if upper_y_neigh < 0: | |||
|
240 | if implementation['bc'] == 'scalar': | |||
|
241 | j = ny | |||
|
242 | for i in xrange(0, nx+1): | |||
|
243 | u[i,j] = bc(x[i], y[j], t) | |||
|
244 | elif implementation['bc'] == 'vectorized': | |||
|
245 | u[:,ny] = bc(x, y[ny], t) | |||
|
246 | ||||
|
247 | # communication | |||
|
248 | partitioner.update_internal_boundary (u) | |||
|
249 | ||||
|
250 | if user_action is not None: | |||
|
251 | user_action(u, x, y, t) | |||
|
252 | # update data structures for next step | |||
|
253 | u_2, u_1, u = u_1, u, u_2 | |||
|
254 | ||||
|
255 | t1 = time.time() | |||
|
256 | print 'my_id=%2d, dt=%g, %s version, slice_copy=%s, net Wtime=%g'\ | |||
|
257 | %(partitioner.my_id,dt,implementation['inner'],\ | |||
|
258 | partitioner.slice_copy,t1-t0) | |||
|
259 | # save the us | |||
|
260 | self.us = u,u_1,u_2 | |||
|
261 | # check final results; compute discrete L2-norm of the solution | |||
|
262 | if final_test: | |||
|
263 | loc_res = 0.0 | |||
|
264 | for i in iseq(start=1, stop=nx-1): | |||
|
265 | for j in iseq(start=1, stop=ny-1): | |||
|
266 | loc_res += u_1[i,j]**2 | |||
|
267 | return loc_res | |||
|
268 | return dt | |||
|
269 |
General Comments 0
You need to be logged in to leave comments.
Login now