##// END OF EJS Templates
Add wave2D example
MinRK -
Show More
@@ -0,0 +1,491 b''
1 #!/usr/bin/env python
2 """A rectangular domain partitioner and associated communication
3 functionality for solving PDEs in (1D,2D) using FDM
4 written in the python language
5
6 The global solution domain is assumed to be of rectangular shape,
7 where the number of cells in each direction is stored in nx, ny, nz
8
9 The numerical scheme is fully explicit
10
11 Authors
12 -------
13
14 * Xing Cai
15 * Min Ragan-Kelley
16
17 """
18 import time
19
20 from numpy import zeros, ascontiguousarray, frombuffer
21 try:
22 from mpi4py import MPI
23 except ImportError:
24 pass
25 else:
26 mpi = MPI.COMM_WORLD
27
28 class RectPartitioner:
29 """
30 Responsible for a rectangular partitioning of a global domain,
31 which is expressed as the numbers of cells in the different
32 spatial directions. The partitioning info is expressed as an
33 array of integers, each indicating the number of subdomains in
34 one spatial direction.
35 """
36
37 def __init__(self, my_id=-1, num_procs=-1, \
38 global_num_cells=[], num_parts=[]):
39 self.nsd = 0
40 self.my_id = my_id
41 self.num_procs = num_procs
42 self.redim (global_num_cells, num_parts)
43
44 def redim (self, global_num_cells, num_parts):
45 nsd_ = len(global_num_cells)
46 # print "Inside the redim function, nsd=%d" %nsd_
47
48 if nsd_<1 | nsd_>3 | nsd_!=len(num_parts):
49 print 'The input global_num_cells is not ok!'
50 return
51
52 self.nsd = nsd_
53 self.global_num_cells = global_num_cells
54 self.num_parts = num_parts
55
56 def prepare_communication (self):
57 """
58 Find the subdomain rank (tuple) for each processor and
59 determine the neighbor info.
60 """
61
62 nsd_ = self.nsd
63 if nsd_<1:
64 print 'Number of space dimensions is %d, nothing to do' %nsd_
65 return
66
67 self.subd_rank = [-1,-1,-1]
68 self.subd_lo_ix = [-1,-1,-1]
69 self.subd_hi_ix = [-1,-1,-1]
70 self.lower_neighbors = [-1,-1,-1]
71 self.upper_neighbors = [-1,-1,-1]
72
73 num_procs = self.num_procs
74 my_id = self.my_id
75
76 num_subds = 1
77 for i in range(nsd_):
78 num_subds = num_subds*self.num_parts[i]
79 if my_id==0:
80 print "# subds=", num_subds
81 # should check num_subds againt num_procs
82
83 offsets = [1, 0, 0]
84
85 # find the subdomain rank
86 self.subd_rank[0] = my_id%self.num_parts[0]
87 if nsd_>=2:
88 offsets[1] = self.num_parts[0]
89 self.subd_rank[1] = my_id/offsets[1]
90 if nsd_==3:
91 offsets[1] = self.num_parts[0]
92 offsets[2] = self.num_parts[0]*self.num_parts[1]
93 self.subd_rank[1] = (my_id%offsets[2])/self.num_parts[0]
94 self.subd_rank[2] = my_id/offsets[2]
95
96 print "my_id=%d, subd_rank: "%my_id, self.subd_rank
97 if my_id==0:
98 print "offsets=", offsets
99
100 # find the neighbor ids
101 for i in range(nsd_):
102 rank = self.subd_rank[i]
103 if rank>0:
104 self.lower_neighbors[i] = my_id-offsets[i]
105 if rank<self.num_parts[i]-1:
106 self.upper_neighbors[i] = my_id+offsets[i]
107
108 k = self.global_num_cells[i]/self.num_parts[i]
109 m = self.global_num_cells[i]%self.num_parts[i]
110
111 ix = rank*k+max(0,rank+m-self.num_parts[i])
112 self.subd_lo_ix[i] = ix
113
114 ix = ix+k
115 if rank>=(self.num_parts[i]-m):
116 ix = ix+1 # load balancing
117 if rank<self.num_parts[i]-1:
118 ix = ix+1 # one cell of overlap
119 self.subd_hi_ix[i] = ix
120
121 print "subd_rank:",self.subd_rank,\
122 "lower_neig:", self.lower_neighbors, \
123 "upper_neig:", self.upper_neighbors
124 print "subd_rank:",self.subd_rank,"subd_lo_ix:", self.subd_lo_ix, \
125 "subd_hi_ix:", self.subd_hi_ix
126
127
128 class RectPartitioner1D(RectPartitioner):
129 """
130 Subclass of RectPartitioner, for 1D problems
131 """
132 def prepare_communication (self):
133 """
134 Prepare the buffers to be used for later communications
135 """
136
137 RectPartitioner.prepare_communication (self)
138
139 if self.lower_neighbors[0]>=0:
140 self.in_lower_buffers = [zeros(1, float)]
141 self.out_lower_buffers = [zeros(1, float)]
142 if self.upper_neighbors[0]>=0:
143 self.in_upper_buffers = [zeros(1, float)]
144 self.out_upper_buffers = [zeros(1, float)]
145
146 def get_num_loc_cells(self):
147 return [self.subd_hi_ix[0]-self.subd_lo_ix[0]]
148
149
150 class RectPartitioner2D(RectPartitioner):
151 """
152 Subclass of RectPartitioner, for 2D problems
153 """
154 def prepare_communication (self):
155 """
156 Prepare the buffers to be used for later communications
157 """
158
159 RectPartitioner.prepare_communication (self)
160
161 self.in_lower_buffers = [[], []]
162 self.out_lower_buffers = [[], []]
163 self.in_upper_buffers = [[], []]
164 self.out_upper_buffers = [[], []]
165
166 size1 = self.subd_hi_ix[1]-self.subd_lo_ix[1]+1
167 if self.lower_neighbors[0]>=0:
168 self.in_lower_buffers[0] = zeros(size1, float)
169 self.out_lower_buffers[0] = zeros(size1, float)
170 if self.upper_neighbors[0]>=0:
171 self.in_upper_buffers[0] = zeros(size1, float)
172 self.out_upper_buffers[0] = zeros(size1, float)
173
174 size0 = self.subd_hi_ix[0]-self.subd_lo_ix[0]+1
175 if self.lower_neighbors[1]>=0:
176 self.in_lower_buffers[1] = zeros(size0, float)
177 self.out_lower_buffers[1] = zeros(size0, float)
178 if self.upper_neighbors[1]>=0:
179 self.in_upper_buffers[1] = zeros(size0, float)
180 self.out_upper_buffers[1] = zeros(size0, float)
181
182 def get_num_loc_cells(self):
183 return [self.subd_hi_ix[0]-self.subd_lo_ix[0],\
184 self.subd_hi_ix[1]-self.subd_lo_ix[1]]
185
186
187 class MPIRectPartitioner2D(RectPartitioner2D):
188 """
189 Subclass of RectPartitioner2D, which uses MPI via mpi4py for communication
190 """
191
192 def __init__(self, my_id=-1, num_procs=-1,
193 global_num_cells=[], num_parts=[],
194 slice_copy=True):
195 RectPartitioner.__init__(self, my_id, num_procs,
196 global_num_cells, num_parts)
197 self.slice_copy = slice_copy
198
199 def update_internal_boundary (self, solution_array):
200 nsd_ = self.nsd
201 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
202 print "Buffers for communicating with lower neighbors not ready"
203 return
204 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
205 print "Buffers for communicating with upper neighbors not ready"
206 return
207
208 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
209 loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1]
210
211 lower_x_neigh = self.lower_neighbors[0]
212 upper_x_neigh = self.upper_neighbors[0]
213 lower_y_neigh = self.lower_neighbors[1]
214 upper_y_neigh = self.upper_neighbors[1]
215
216 # communicate in the x-direction first
217 if lower_x_neigh>-1:
218 if self.slice_copy:
219 self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:])
220 else:
221 for i in xrange(0,loc_ny+1):
222 self.out_lower_buffers[0][i] = solution_array[1,i]
223 mpi.Isend(self.out_lower_buffers[0], lower_x_neigh)
224
225 if upper_x_neigh>-1:
226 mpi.Recv(self.in_upper_buffers[0], upper_x_neigh)
227 if self.slice_copy:
228 solution_array[loc_nx,:] = self.in_upper_buffers[0]
229 self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:])
230 else:
231 for i in xrange(0,loc_ny+1):
232 solution_array[loc_nx,i] = self.in_upper_buffers[0][i]
233 self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i]
234 mpi.Isend(self.out_upper_buffers[0], upper_x_neigh)
235
236 if lower_x_neigh>-1:
237 mpi.Recv(self.in_lower_buffers[0], lower_x_neigh)
238 if self.slice_copy:
239 solution_array[0,:] = self.in_lower_buffers[0]
240 else:
241 for i in xrange(0,loc_ny+1):
242 solution_array[0,i] = self.in_lower_buffers[0][i]
243
244 # communicate in the y-direction afterwards
245 if lower_y_neigh>-1:
246 if self.slice_copy:
247 self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1])
248 else:
249 for i in xrange(0,loc_nx+1):
250 self.out_lower_buffers[1][i] = solution_array[i,1]
251 mpi.Isend(self.out_lower_buffers[1], lower_y_neigh)
252
253 if upper_y_neigh>-1:
254 mpi.Recv(self.in_upper_buffers[1], upper_y_neigh)
255 if self.slice_copy:
256 solution_array[:,loc_ny] = self.in_upper_buffers[1]
257 self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1])
258 else:
259 for i in xrange(0,loc_nx+1):
260 solution_array[i,loc_ny] = self.in_upper_buffers[1][i]
261 self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1]
262 mpi.Isend(self.out_upper_buffers[1], upper_y_neigh)
263
264 if lower_y_neigh>-1:
265 mpi.Recv(self.in_lower_buffers[1], lower_y_neigh)
266 if self.slice_copy:
267 solution_array[:,0] = self.in_lower_buffers[1]
268 else:
269 for i in xrange(0,loc_nx+1):
270 solution_array[i,0] = self.in_lower_buffers[1][i]
271
272 class ZMQRectPartitioner2D(RectPartitioner2D):
273 """
274 Subclass of RectPartitioner2D, which uses 0MQ via pyzmq for communication
275 The first two arguments must be `comm`, an EngineCommunicator object,
276 and `addrs`, a dict of connection information for other EngineCommunicator
277 objects.
278 """
279
280 def __init__(self, comm, addrs, my_id=-1, num_procs=-1,
281 global_num_cells=[], num_parts=[],
282 slice_copy=True):
283 RectPartitioner.__init__(self, my_id, num_procs,
284 global_num_cells, num_parts)
285 self.slice_copy = slice_copy
286 self.comm = comm # an Engine
287 self.addrs = addrs
288
289 def prepare_communication(self):
290 RectPartitioner2D.prepare_communication(self)
291 # connect west/south to east/north
292 west_id,south_id = self.lower_neighbors[:2]
293 west = self.addrs.get(west_id, None)
294 south = self.addrs.get(south_id, None)
295 self.comm.connect(south, west)
296
297 def update_internal_boundary_x_y (self, solution_array):
298 """update the inner boundary with the same send/recv pattern as the MPIPartitioner"""
299 nsd_ = self.nsd
300 dtype = solution_array.dtype
301 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
302 print "Buffers for communicating with lower neighbors not ready"
303 return
304 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
305 print "Buffers for communicating with upper neighbors not ready"
306 return
307
308 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
309 loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1]
310
311 lower_x_neigh = self.lower_neighbors[0]
312 upper_x_neigh = self.upper_neighbors[0]
313 lower_y_neigh = self.lower_neighbors[1]
314 upper_y_neigh = self.upper_neighbors[1]
315 trackers = []
316 flags = dict(copy=False, track=False)
317 # communicate in the x-direction first
318 if lower_x_neigh>-1:
319 if self.slice_copy:
320 self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:])
321 else:
322 for i in xrange(0,loc_ny+1):
323 self.out_lower_buffers[0][i] = solution_array[1,i]
324 t = self.comm.west.send(self.out_lower_buffers[0], **flags)
325 trackers.append(t)
326
327 if upper_x_neigh>-1:
328 msg = self.comm.east.recv(copy=False)
329 self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype)
330 if self.slice_copy:
331 solution_array[loc_nx,:] = self.in_upper_buffers[0]
332 self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:])
333 else:
334 for i in xrange(0,loc_ny+1):
335 solution_array[loc_nx,i] = self.in_upper_buffers[0][i]
336 self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i]
337 t = self.comm.east.send(self.out_upper_buffers[0], **flags)
338 trackers.append(t)
339
340
341 if lower_x_neigh>-1:
342 msg = self.comm.west.recv(copy=False)
343 self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype)
344 if self.slice_copy:
345 solution_array[0,:] = self.in_lower_buffers[0]
346 else:
347 for i in xrange(0,loc_ny+1):
348 solution_array[0,i] = self.in_lower_buffers[0][i]
349
350 # communicate in the y-direction afterwards
351 if lower_y_neigh>-1:
352 if self.slice_copy:
353 self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1])
354 else:
355 for i in xrange(0,loc_nx+1):
356 self.out_lower_buffers[1][i] = solution_array[i,1]
357 t = self.comm.south.send(self.out_lower_buffers[1], **flags)
358 trackers.append(t)
359
360
361 if upper_y_neigh>-1:
362 msg = self.comm.north.recv(copy=False)
363 self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype)
364 if self.slice_copy:
365 solution_array[:,loc_ny] = self.in_upper_buffers[1]
366 self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1])
367 else:
368 for i in xrange(0,loc_nx+1):
369 solution_array[i,loc_ny] = self.in_upper_buffers[1][i]
370 self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1]
371 t = self.comm.north.send(self.out_upper_buffers[1], **flags)
372 trackers.append(t)
373
374 if lower_y_neigh>-1:
375 msg = self.comm.south.recv(copy=False)
376 self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype)
377 if self.slice_copy:
378 solution_array[:,0] = self.in_lower_buffers[1]
379 else:
380 for i in xrange(0,loc_nx+1):
381 solution_array[i,0] = self.in_lower_buffers[1][i]
382
383 # wait for sends to complete:
384 if flags['track']:
385 for t in trackers:
386 t.wait()
387
388 def update_internal_boundary_send_recv (self, solution_array):
389 """update the inner boundary, sending first, then recving"""
390 nsd_ = self.nsd
391 dtype = solution_array.dtype
392 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
393 print "Buffers for communicating with lower neighbors not ready"
394 return
395 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
396 print "Buffers for communicating with upper neighbors not ready"
397 return
398
399 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
400 loc_ny = self.subd_hi_ix[1]-self.subd_lo_ix[1]
401
402 lower_x_neigh = self.lower_neighbors[0]
403 upper_x_neigh = self.upper_neighbors[0]
404 lower_y_neigh = self.lower_neighbors[1]
405 upper_y_neigh = self.upper_neighbors[1]
406 trackers = []
407 flags = dict(copy=False, track=False)
408
409 # send in all directions first
410 if lower_x_neigh>-1:
411 if self.slice_copy:
412 self.out_lower_buffers[0] = ascontiguousarray(solution_array[1,:])
413 else:
414 for i in xrange(0,loc_ny+1):
415 self.out_lower_buffers[0][i] = solution_array[1,i]
416 t = self.comm.west.send(self.out_lower_buffers[0], **flags)
417 trackers.append(t)
418
419 if lower_y_neigh>-1:
420 if self.slice_copy:
421 self.out_lower_buffers[1] = ascontiguousarray(solution_array[:,1])
422 else:
423 for i in xrange(0,loc_nx+1):
424 self.out_lower_buffers[1][i] = solution_array[i,1]
425 t = self.comm.south.send(self.out_lower_buffers[1], **flags)
426 trackers.append(t)
427
428 if upper_x_neigh>-1:
429 if self.slice_copy:
430 self.out_upper_buffers[0] = ascontiguousarray(solution_array[loc_nx-1,:])
431 else:
432 for i in xrange(0,loc_ny+1):
433 self.out_upper_buffers[0][i] = solution_array[loc_nx-1,i]
434 t = self.comm.east.send(self.out_upper_buffers[0], **flags)
435 trackers.append(t)
436
437 if upper_y_neigh>-1:
438 if self.slice_copy:
439 self.out_upper_buffers[1] = ascontiguousarray(solution_array[:,loc_ny-1])
440 else:
441 for i in xrange(0,loc_nx+1):
442 self.out_upper_buffers[1][i] = solution_array[i,loc_ny-1]
443 t = self.comm.north.send(self.out_upper_buffers[1], **flags)
444 trackers.append(t)
445
446
447 # now start receiving
448 if upper_x_neigh>-1:
449 msg = self.comm.east.recv(copy=False)
450 self.in_upper_buffers[0] = frombuffer(msg, dtype=dtype)
451 if self.slice_copy:
452 solution_array[loc_nx,:] = self.in_upper_buffers[0]
453 else:
454 for i in xrange(0,loc_ny+1):
455 solution_array[loc_nx,i] = self.in_upper_buffers[0][i]
456
457 if lower_x_neigh>-1:
458 msg = self.comm.west.recv(copy=False)
459 self.in_lower_buffers[0] = frombuffer(msg, dtype=dtype)
460 if self.slice_copy:
461 solution_array[0,:] = self.in_lower_buffers[0]
462 else:
463 for i in xrange(0,loc_ny+1):
464 solution_array[0,i] = self.in_lower_buffers[0][i]
465
466 if upper_y_neigh>-1:
467 msg = self.comm.north.recv(copy=False)
468 self.in_upper_buffers[1] = frombuffer(msg, dtype=dtype)
469 if self.slice_copy:
470 solution_array[:,loc_ny] = self.in_upper_buffers[1]
471 else:
472 for i in xrange(0,loc_nx+1):
473 solution_array[i,loc_ny] = self.in_upper_buffers[1][i]
474
475 if lower_y_neigh>-1:
476 msg = self.comm.south.recv(copy=False)
477 self.in_lower_buffers[1] = frombuffer(msg, dtype=dtype)
478 if self.slice_copy:
479 solution_array[:,0] = self.in_lower_buffers[1]
480 else:
481 for i in xrange(0,loc_nx+1):
482 solution_array[i,0] = self.in_lower_buffers[1][i]
483
484 # wait for sends to complete:
485 if flags['track']:
486 for t in trackers:
487 t.wait()
488
489 # use send/recv pattern instead of x/y sweeps
490 update_internal_boundary = update_internal_boundary_send_recv
491
@@ -0,0 +1,59 b''
1 #!/usr/bin/env python
2 """A simple Communicator class that has N,E,S,W neighbors connected via 0MQ PEER sockets"""
3
4 import socket
5
6 import zmq
7
8 from IPython.zmq.parallel.util import disambiguate_url
9
10 class EngineCommunicator(object):
11 """An object that connects Engines to each other.
12 north and west sockets listen, while south and east sockets connect.
13
14 This class is useful in cases where there is a set of nodes that
15 must communicate only with their nearest neighbors.
16 """
17
18 def __init__(self, interface='tcp://*', identity=None):
19 self._ctx = zmq.Context()
20 self.north = self._ctx.socket(zmq.PAIR)
21 self.west = self._ctx.socket(zmq.PAIR)
22 self.south = self._ctx.socket(zmq.PAIR)
23 self.east = self._ctx.socket(zmq.PAIR)
24
25 # bind to ports
26 northport = self.north.bind_to_random_port(interface)
27 eastport = self.east.bind_to_random_port(interface)
28
29 self.north_url = interface+":%i"%northport
30 self.east_url = interface+":%i"%eastport
31
32 # guess first public IP from socket
33 self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
34
35 def __del__(self):
36 self.north.close()
37 self.south.close()
38 self.east.close()
39 self.west.close()
40 self._ctx.term()
41
42 @property
43 def info(self):
44 """return the connection info for this object's sockets."""
45 return (self.location, self.north_url, self.east_url)
46
47 def connect(self, south_peer=None, west_peer=None):
48 """connect to peers. `peers` will be a 3-tuples, of the form:
49 (location, north_addr, east_addr)
50 as produced by
51 """
52 if south_peer is not None:
53 location, url, _ = south_peer
54 self.south.connect(disambiguate_url(url, location))
55 if west_peer is not None:
56 location, _, url = west_peer
57 self.west.connect(disambiguate_url(url, location))
58
59
@@ -0,0 +1,200 b''
1 #!/usr/bin/env python
2 """
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
5 are done by an object of class MPIRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses MPI via mpi4py)
7
8 An example of running the program is (8 processors, 4x2 partition,
9 400x100 grid cells)::
10
11 $ ipclusterz start --profile mpi -n 8 # start 8 engines (assuming mpi profile has been configured)
12 $ ./parallelwave-mpi.py --grid 400 100 --partition 4 2 --profile mpi
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
16
17 Authors
18 -------
19
20 * Xing Cai
21 * Min Ragan-Kelley
22
23 """
24
25 import sys
26 import time
27
28 from numpy import exp, zeros, newaxis, sqrt
29
30 from IPython.external import argparse
31 from IPython.zmq.parallel.client import Client, Reference
32
33 def setup_partitioner(ns, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
35 p = MPIRectPartitioner2D(my_id=index, num_procs=num_procs)
36 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.prepare_communication()
38 # put the partitioner into the global namespace:
39 ns.partitioner=p
40
41 def setup_solver(ns, *args, **kwargs):
42 """create a WaveSolver in the engine namespace"""
43 ns.solver = WaveSolver(*args, **kwargs)
44
45 def wave_saver(u, x, y, t):
46 """save the wave log"""
47 global u_hist
48 global t_hist
49 t_hist.append(t)
50 u_hist.append(1.0*u)
51
52
53 # main program:
54 if __name__ == '__main__':
55
56 parser = argparse.ArgumentParser()
57 paa = parser.add_argument
58 paa('--grid', '-g',
59 type=int, nargs=2, default=[100,100], dest='grid',
60 help="Cells in the grid, e.g. --grid 100 200")
61 paa('--partition', '-p',
62 type=int, nargs=2, default=None,
63 help="Process partition grid, e.g. --partition 4 2 for 4x2")
64 paa('-c',
65 type=float, default=1.,
66 help="Wave speed (I think)")
67 paa('-Ly',
68 type=float, default=1.,
69 help="system size (in y)")
70 paa('-Lx',
71 type=float, default=1.,
72 help="system size (in x)")
73 paa('-t', '--tstop',
74 type=float, default=1.,
75 help="Time units to run")
76 paa('--profile',
77 type=unicode, default=u'default',
78 help="Specify the ipcluster profile for the client to connect to.")
79 paa('--save',
80 action='store_true',
81 help="Add this flag to save the time/wave history during the run.")
82 paa('--scalar',
83 action='store_true',
84 help="Also run with scalar interior implementation, to see vector speedup.")
85
86 ns = parser.parse_args()
87 # set up arguments
88 grid = ns.grid
89 partition = ns.partition
90 Lx = ns.Lx
91 Ly = ns.Ly
92 c = ns.c
93 tstop = ns.tstop
94 if ns.save:
95 user_action = wave_saver
96 else:
97 user_action = None
98
99 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
100 final_test = True
101
102 # create the Client
103 rc = Client(profile=ns.profile)
104 num_procs = len(rc.ids)
105
106 if partition is None:
107 partition = [1,num_procs]
108
109 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
110
111 # functions defining initial/boundary/source conditions
112 def I(x,y):
113 from numpy import exp
114 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
115 def f(x,y,t):
116 return 0.0
117 # from numpy import exp,sin
118 # return 10*exp(-(x - sin(100*t))**2)
119 def bc(x,y,t):
120 return 0.0
121
122 # initial imports, setup rank
123 rc[:].execute('\n'.join([
124 "from mpi4py import MPI",
125 "import numpy",
126 "mpi = MPI.COMM_WORLD",
127 "my_id = MPI.COMM_WORLD.Get_rank()"]), block=True)
128
129 # initialize t_hist/u_hist for saving the state at each step (optional)
130 rc[:]['t_hist'] = []
131 rc[:]['u_hist'] = []
132
133 # set vector/scalar implementation details
134 impl = {}
135 impl['ic'] = 'vectorized'
136 impl['inner'] = 'scalar'
137 impl['bc'] = 'vectorized'
138
139 # execute some files so that the classes we need will be defined on the engines:
140 rc[:].run('RectPartitioner.py')
141 rc[:].run('wavesolver.py')
142
143 # setup remote partitioner
144 # note that Reference means that the argument passed to setup_partitioner will be the
145 # object named 'my_id' in the engine's namespace
146 rc[:].apply_sync_bound(setup_partitioner, Reference('my_id'), num_procs, grid, partition)
147 # wait for initial communication to complete
148 rc[:].execute('mpi.barrier()')
149 # setup remote solvers
150 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
151
152 # lambda for calling solver.solve:
153 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
154
155 if ns.scalar:
156 impl['inner'] = 'scalar'
157 # run first with element-wise Python operations for each cell
158 t0 = time.time()
159 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
160 if final_test:
161 # this sum is performed element-wise as results finish
162 s = sum(ar)
163 # the L2 norm (RMS) of the result:
164 norm = sqrt(s/num_cells)
165 else:
166 norm = -1
167 t1 = time.time()
168 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
169
170 impl['inner'] = 'vectorized'
171 # setup new solvers
172 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
173 rc[:].execute('mpi.barrier()')
174
175 # run again with numpy vectorized inner-implementation
176 t0 = time.time()
177 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
178 if final_test:
179 # this sum is performed element-wise as results finish
180 s = sum(ar)
181 # the L2 norm (RMS) of the result:
182 norm = sqrt(s/num_cells)
183 else:
184 norm = -1
185 t1 = time.time()
186 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
187
188 # if ns.save is True, then u_hist stores the history of u as a list
189 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
190 if ns.save and partition[-1] == 1:
191 import pylab
192 rc[:].execute('u_last=u_hist[-1]')
193 # map mpi IDs to IPython IDs, which may not match
194 ranks = rc[:]['my_id']
195 targets = range(len(ranks))
196 for idx in range(len(ranks)):
197 targets[idx] = ranks.index(idx)
198 u_last = rc[targets].gather('u_last', block=True)
199 pylab.pcolor(u_last)
200 pylab.show()
@@ -0,0 +1,201 b''
1 #!/usr/bin/env python
2 """
3 A simple python program of solving a 2D wave equation in parallel.
4 Domain partitioning and inter-processor communication
5 are done by an object of class ZMQRectPartitioner2D
6 (which is a subclass of RectPartitioner2D and uses 0MQ via pyzmq)
7
8 An example of running the program is (8 processors, 4x2 partition,
9 200x200 grid cells)::
10
11 $ ipclusterz start -n 8 # start 8 engines
12 $ ./parallelwave.py --grid 200 200 --partition 4 2
13
14 See also parallelwave-mpi, which runs the same program, but uses MPI
15 (via mpi4py) for the inter-engine communication.
16
17 Authors
18 -------
19
20 * Xing Cai
21 * Min Ragan-Kelley
22
23 """
24 #
25 import sys
26 import time
27
28 from numpy import exp, zeros, newaxis, sqrt
29
30 from IPython.external import argparse
31 from IPython.zmq.parallel.client import Client, Reference
32
33 def setup_partitioner(ns, comm, addrs, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
35 p = ZMQRectPartitioner2D(comm, addrs, my_id=index, num_procs=num_procs)
36 p.redim(global_num_cells=gnum_cells, num_parts=parts)
37 p.prepare_communication()
38 # put the partitioner into the global namespace:
39 ns.partitioner=p
40
41 def setup_solver(ns, *args, **kwargs):
42 """create a WaveSolver in the engine namespace."""
43 ns.solver = WaveSolver(*args, **kwargs)
44
45 def wave_saver(u, x, y, t):
46 """save the wave state for each timestep."""
47 global u_hist
48 global t_hist
49 t_hist.append(t)
50 u_hist.append(1.0*u)
51
52
53 # main program:
54 if __name__ == '__main__':
55
56 parser = argparse.ArgumentParser()
57 paa = parser.add_argument
58 paa('--grid', '-g',
59 type=int, nargs=2, default=[100,100], dest='grid',
60 help="Cells in the grid, e.g. --grid 100 200")
61 paa('--partition', '-p',
62 type=int, nargs=2, default=None,
63 help="Process partition grid, e.g. --partition 4 2 for 4x2")
64 paa('-c',
65 type=float, default=1.,
66 help="Wave speed (I think)")
67 paa('-Ly',
68 type=float, default=1.,
69 help="system size (in y)")
70 paa('-Lx',
71 type=float, default=1.,
72 help="system size (in x)")
73 paa('-t', '--tstop',
74 type=float, default=1.,
75 help="Time units to run")
76 paa('--profile',
77 type=unicode, default=u'default',
78 help="Specify the ipcluster profile for the client to connect to.")
79 paa('--save',
80 action='store_true',
81 help="Add this flag to save the time/wave history during the run.")
82 paa('--scalar',
83 action='store_true',
84 help="Also run with scalar interior implementation, to see vector speedup.")
85
86 ns = parser.parse_args()
87 # set up arguments
88 grid = ns.grid
89 partition = ns.partition
90 Lx = ns.Lx
91 Ly = ns.Ly
92 c = ns.c
93 tstop = ns.tstop
94 if ns.save:
95 user_action = wave_saver
96 else:
97 user_action = None
98
99 num_cells = 1.0*(grid[0]-1)*(grid[1]-1)
100 final_test = True
101
102 # create the Client
103 rc = Client(profile=ns.profile)
104 num_procs = len(rc.ids)
105
106 if partition is None:
107 partition = [num_procs,1]
108
109 assert partition[0]*partition[1] == num_procs, "can't map partition %s to %i engines"%(partition, num_procs)
110
111 # functions defining initial/boundary/source conditions
112 def I(x,y):
113 from numpy import exp
114 return 1.5*exp(-100*((x-0.5)**2+(y-0.5)**2))
115 def f(x,y,t):
116 return 0.0
117 # from numpy import exp,sin
118 # return 10*exp(-(x - sin(100*t))**2)
119 def bc(x,y,t):
120 return 0.0
121
122 # initialize t_hist/u_hist for saving the state at each step (optional)
123 rc[:]['t_hist'] = []
124 rc[:]['u_hist'] = []
125
126 # set vector/scalar implementation details
127 impl = {}
128 impl['ic'] = 'vectorized'
129 impl['inner'] = 'scalar'
130 impl['bc'] = 'vectorized'
131
132 # execute some files so that the classes we need will be defined on the engines:
133 rc[:].execute('import numpy')
134 rc[:].run('communicator.py')
135 rc[:].run('RectPartitioner.py')
136 rc[:].run('wavesolver.py')
137
138 # scatter engine IDs
139 rc[:].scatter('my_id', rc.ids, flatten=True)
140
141 # create the engine connectors
142 rc[:].execute('com = EngineCommunicator()')
143
144 # gather the connection information into a single dict
145 ar = rc[:].apply_async(lambda : com.info)
146 peers = ar.get_dict()
147 # print peers
148 # this is a dict, keyed by engine ID, of the connection info for the EngineCommunicators
149
150 # setup remote partitioner
151 # note that Reference means that the argument passed to setup_partitioner will be the
152 # object named 'com' in the engine's namespace
153 rc[:].apply_sync_bound(setup_partitioner, Reference('com'), peers, Reference('my_id'), num_procs, grid, partition)
154 time.sleep(1)
155 # convenience lambda to call solver.solve:
156 _solve = lambda *args, **kwargs: solver.solve(*args, **kwargs)
157
158 if ns.scalar:
159 impl['inner'] = 'scalar'
160 # setup remote solvers
161 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly, partitioner=Reference('partitioner'), dt=0,implementation=impl)
162
163 # run first with element-wise Python operations for each cell
164 t0 = time.time()
165 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test, user_action=user_action)
166 if final_test:
167 # this sum is performed element-wise as results finish
168 s = sum(ar)
169 # the L2 norm (RMS) of the result:
170 norm = sqrt(s/num_cells)
171 else:
172 norm = -1
173 t1 = time.time()
174 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
175
176 # run again with faster numpy-vectorized inner implementation:
177 impl['inner'] = 'vectorized'
178 # setup remote solvers
179 rc[:].apply_sync_bound(setup_solver, I,f,c,bc,Lx,Ly,partitioner=Reference('partitioner'), dt=0,implementation=impl)
180
181 t0 = time.time()
182
183 ar = rc[:].apply_async(_solve, tstop, dt=0, verbose=True, final_test=final_test)#, user_action=wave_saver)
184 if final_test:
185 # this sum is performed element-wise as results finish
186 s = sum(ar)
187 # the L2 norm (RMS) of the result:
188 norm = sqrt(s/num_cells)
189 else:
190 norm = -1
191 t1 = time.time()
192 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
193
194 # if ns.save is True, then u_hist stores the history of u as a list
195 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
196 if ns.save and partition[-1] == 1:
197 import pylab
198 rc[:].execute('u_last=u_hist[-1]')
199 u_last = rc[:].gather('u_last', block=True)
200 pylab.pcolor(u_last)
201 pylab.show() No newline at end of file
@@ -0,0 +1,269 b''
1 #!/usr/bin/env python
2 """
3 A simple WaveSolver class for evolving the wave equation in 2D.
4 This works in parallel by using a RectPartitioner object.
5
6 Authors
7 -------
8
9 * Xing Cai
10 * Min Ragan-Kelley
11
12 """
13 import time
14
15 from mpi4py import MPI
16 mpi = MPI.COMM_WORLD
17 from numpy import exp, zeros, newaxis, sqrt, arange
18
19 def iseq(start=0, stop=None, inc=1):
20 """
21 Generate integers from start to (and including!) stop,
22 with increment of inc. Alternative to range/xrange.
23 """
24 if stop is None: # allow isequence(3) to be 0, 1, 2, 3
25 # take 1st arg as stop, start as 0, and inc=1
26 stop = start; start = 0; inc = 1
27 return arange(start, stop+inc, inc)
28
29 class WaveSolver(object):
30 """
31 Solve the 2D wave equation u_tt = u_xx + u_yy + f(x,y,t) with
32 u = bc(x,y,t) on the boundary and initial condition du/dt = 0.
33
34 Parallelization by using a RectPartitioner object 'partitioner'
35
36 nx and ny are the total number of global grid cells in the x and y
37 directions. The global grid points are numbered as (0,0), (1,0), (2,0),
38 ..., (nx,0), (0,1), (1,1), ..., (nx, ny).
39
40 dt is the time step. If dt<=0, an optimal time step is used.
41
42 tstop is the stop time for the simulation.
43
44 I, f are functions: I(x,y), f(x,y,t)
45
46 user_action: function of (u, x, y, t) called at each time
47 level (x and y are one-dimensional coordinate vectors).
48 This function allows the calling code to plot the solution,
49 compute errors, etc.
50
51 implementation: a dictionary specifying how the initial
52 condition ('ic'), the scheme over inner points ('inner'),
53 and the boundary conditions ('bc') are to be implemented.
54 Normally, values are legal: 'scalar' or 'vectorized'.
55 'scalar' means straight loops over grid points, while
56 'vectorized' means special NumPy vectorized operations.
57
58 If a key in the implementation dictionary is missing, it
59 defaults in this function to 'scalar' (the safest strategy).
60 Note that if 'vectorized' is specified, the functions I, f,
61 and bc must work in vectorized mode. It is always recommended
62 to first run the 'scalar' mode and then compare 'vectorized'
63 results with the 'scalar' results to check that I, f, and bc
64 work.
65
66 verbose: true if a message at each time step is written,
67 false implies no output during the simulation.
68
69 final_test: true means the discrete L2-norm of the final solution is
70 to be computed.
71 """
72
73 def __init__(self, I, f, c, bc, Lx, Ly, partitioner=None, dt=-1,
74 user_action=None,
75 implementation={'ic': 'vectorized', # or 'scalar'
76 'inner': 'vectorized',
77 'bc': 'vectorized'}):
78
79 nx = partitioner.global_num_cells[0] # number of global cells in x dir
80 ny = partitioner.global_num_cells[1] # number of global cells in y dir
81 dx = Lx/float(nx)
82 dy = Ly/float(ny)
83 loc_nx, loc_ny = partitioner.get_num_loc_cells()
84 nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead
85 lo_ix0 = partitioner.subd_lo_ix[0]
86 lo_ix1 = partitioner.subd_lo_ix[1]
87 hi_ix0 = partitioner.subd_hi_ix[0]
88 hi_ix1 = partitioner.subd_hi_ix[1]
89 x = iseq(dx*lo_ix0, dx*hi_ix0, dx) # local grid points in x dir
90 y = iseq(dy*lo_ix1, dy*hi_ix1, dy) # local grid points in y dir
91 self.x = x
92 self.y = y
93 xv = x[:,newaxis] # for vectorized expressions with f(xv,yv)
94 yv = y[newaxis,:] # -- " --
95 if dt <= 0:
96 dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step
97 Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables
98
99 u = zeros((nx+1,ny+1)) # solution array
100 u_1 = u.copy() # solution at t-dt
101 u_2 = u.copy() # solution at t-2*dt
102
103 # preserve for self.solve
104 implementation=dict(implementation) # copy
105
106 if 'ic' not in implementation:
107 implementation['ic'] = 'scalar'
108 if 'bc' not in implementation:
109 implementation['bc'] = 'scalar'
110 if 'inner' not in implementation:
111 implementation['inner'] = 'scalar'
112
113 self.implementation = implementation
114 self.Lx = Lx
115 self.Ly = Ly
116 self.I=I
117 self.f=f
118 self.c=c
119 self.bc=bc
120 self.user_action = user_action
121 self.partitioner=partitioner
122
123 # set initial condition (pointwise - allows straight if-tests in I(x,y)):
124 t=0.0
125 if implementation['ic'] == 'scalar':
126 for i in xrange(0,nx+1):
127 for j in xrange(0,ny+1):
128 u_1[i,j] = I(x[i], y[j])
129
130 for i in xrange(1,nx):
131 for j in xrange(1,ny):
132 u_2[i,j] = u_1[i,j] + \
133 0.5*Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \
134 0.5*Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \
135 dt2*f(x[i], y[j], 0.0)
136
137 # boundary values of u_2 (equals u(t=dt) due to du/dt=0)
138 i = 0
139 for j in xrange(0,ny+1):
140 u_2[i,j] = bc(x[i], y[j], t+dt)
141 j = 0
142 for i in xrange(0,nx+1):
143 u_2[i,j] = bc(x[i], y[j], t+dt)
144 i = nx
145 for j in xrange(0,ny+1):
146 u_2[i,j] = bc(x[i], y[j], t+dt)
147 j = ny
148 for i in xrange(0,nx+1):
149 u_2[i,j] = bc(x[i], y[j], t+dt)
150
151 elif implementation['ic'] == 'vectorized':
152 u_1 = I(xv,yv)
153 u_2[1:nx,1:ny] = u_1[1:nx,1:ny] + \
154 0.5*Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \
155 0.5*Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \
156 dt2*(f(xv[1:nx,1:ny], yv[1:nx,1:ny], 0.0))
157 # boundary values (t=dt):
158 i = 0; u_2[i,:] = bc(x[i], y, t+dt)
159 j = 0; u_2[:,j] = bc(x, y[j], t+dt)
160 i = nx; u_2[i,:] = bc(x[i], y, t+dt)
161 j = ny; u_2[:,j] = bc(x, y[j], t+dt)
162
163 if user_action is not None:
164 user_action(u_1, x, y, t) # allow user to plot etc.
165 # print list(self.us[2][2])
166 self.us = (u,u_1,u_2)
167
168
169 def solve(self, tstop, dt=-1, user_action=None, verbose=False, final_test=False):
170 t0=time.time()
171 f=self.f
172 c=self.c
173 bc=self.bc
174 partitioner = self.partitioner
175 implementation = self.implementation
176 nx = partitioner.global_num_cells[0] # number of global cells in x dir
177 ny = partitioner.global_num_cells[1] # number of global cells in y dir
178 dx = self.Lx/float(nx)
179 dy = self.Ly/float(ny)
180 loc_nx, loc_ny = partitioner.get_num_loc_cells()
181 nx = loc_nx; ny = loc_ny # now use loc_nx and loc_ny instead
182 x = self.x
183 y = self.y
184 xv = x[:,newaxis] # for vectorized expressions with f(xv,yv)
185 yv = y[newaxis,:] # -- " --
186 if dt <= 0:
187 dt = (1/float(c))*(1/sqrt(1/dx**2 + 1/dy**2)) # max time step
188 Cx2 = (c*dt/dx)**2; Cy2 = (c*dt/dy)**2; dt2 = dt**2 # help variables
189 # id for the four possible neighbor subdomains
190 lower_x_neigh = partitioner.lower_neighbors[0]
191 upper_x_neigh = partitioner.upper_neighbors[0]
192 lower_y_neigh = partitioner.lower_neighbors[1]
193 upper_y_neigh = partitioner.upper_neighbors[1]
194 u,u_1,u_2 = self.us
195 # u_1 = self.u_1
196
197 t = 0.0
198 while t <= tstop:
199 t_old = t; t += dt
200 if verbose:
201 print 'solving (%s version) at t=%g' % \
202 (implementation['inner'], t)
203 # update all inner points:
204 if implementation['inner'] == 'scalar':
205 for i in xrange(1, nx):
206 for j in xrange(1, ny):
207 u[i,j] = - u_2[i,j] + 2*u_1[i,j] + \
208 Cx2*(u_1[i-1,j] - 2*u_1[i,j] + u_1[i+1,j]) + \
209 Cy2*(u_1[i,j-1] - 2*u_1[i,j] + u_1[i,j+1]) + \
210 dt2*f(x[i], y[j], t_old)
211 elif implementation['inner'] == 'vectorized':
212 u[1:nx,1:ny] = - u_2[1:nx,1:ny] + 2*u_1[1:nx,1:ny] + \
213 Cx2*(u_1[0:nx-1,1:ny] - 2*u_1[1:nx,1:ny] + u_1[2:nx+1,1:ny]) + \
214 Cy2*(u_1[1:nx,0:ny-1] - 2*u_1[1:nx,1:ny] + u_1[1:nx,2:ny+1]) + \
215 dt2*f(xv[1:nx,1:ny], yv[1:nx,1:ny], t_old)
216
217 # insert boundary conditions (if there's no neighbor):
218 if lower_x_neigh < 0:
219 if implementation['bc'] == 'scalar':
220 i = 0
221 for j in xrange(0, ny+1):
222 u[i,j] = bc(x[i], y[j], t)
223 elif implementation['bc'] == 'vectorized':
224 u[0,:] = bc(x[0], y, t)
225 if upper_x_neigh < 0:
226 if implementation['bc'] == 'scalar':
227 i = nx
228 for j in xrange(0, ny+1):
229 u[i,j] = bc(x[i], y[j], t)
230 elif implementation['bc'] == 'vectorized':
231 u[nx,:] = bc(x[nx], y, t)
232 if lower_y_neigh < 0:
233 if implementation['bc'] == 'scalar':
234 j = 0
235 for i in xrange(0, nx+1):
236 u[i,j] = bc(x[i], y[j], t)
237 elif implementation['bc'] == 'vectorized':
238 u[:,0] = bc(x, y[0], t)
239 if upper_y_neigh < 0:
240 if implementation['bc'] == 'scalar':
241 j = ny
242 for i in xrange(0, nx+1):
243 u[i,j] = bc(x[i], y[j], t)
244 elif implementation['bc'] == 'vectorized':
245 u[:,ny] = bc(x, y[ny], t)
246
247 # communication
248 partitioner.update_internal_boundary (u)
249
250 if user_action is not None:
251 user_action(u, x, y, t)
252 # update data structures for next step
253 u_2, u_1, u = u_1, u, u_2
254
255 t1 = time.time()
256 print 'my_id=%2d, dt=%g, %s version, slice_copy=%s, net Wtime=%g'\
257 %(partitioner.my_id,dt,implementation['inner'],\
258 partitioner.slice_copy,t1-t0)
259 # save the us
260 self.us = u,u_1,u_2
261 # check final results; compute discrete L2-norm of the solution
262 if final_test:
263 loc_res = 0.0
264 for i in iseq(start=1, stop=nx-1):
265 for j in iseq(start=1, stop=ny-1):
266 loc_res += u_1[i,j]**2
267 return loc_res
268 return dt
269
General Comments 0
You need to be logged in to leave comments. Login now