##// END OF EJS Templates
Update print syntax in parallel examples.
Thomas Kluyver -
Show More
@@ -22,13 +22,13 b' v = rc.load_balanced_view()'
22
22
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
23 # scatter 'id', so id=0,1,2 on engines 0,1,2
24 dv.scatter('id', rc.ids, flatten=True)
24 dv.scatter('id', rc.ids, flatten=True)
25 print dv['id']
25 print(dv['id'])
26
26
27
27
28 def sleep_here(count, t):
28 def sleep_here(count, t):
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
29 """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
30 import time,sys
30 import time,sys
31 print "hi from engine %i" % id
31 print("hi from engine %i" % id)
32 sys.stdout.flush()
32 sys.stdout.flush()
33 time.sleep(t)
33 time.sleep(t)
34 return count,t
34 return count,t
@@ -49,13 +49,13 b' while pending:'
49 for msg_id in finished:
49 for msg_id in finished:
50 # we know these are done, so don't worry about blocking
50 # we know these are done, so don't worry about blocking
51 ar = rc.get_result(msg_id)
51 ar = rc.get_result(msg_id)
52 print "job id %s finished on engine %i" % (msg_id, ar.engine_id)
52 print("job id %s finished on engine %i" % (msg_id, ar.engine_id))
53 print "with stdout:"
53 print("with stdout:")
54 print ' ' + ar.stdout.replace('\n', '\n ').rstrip()
54 print(' ' + ar.stdout.replace('\n', '\n ').rstrip())
55 print "and results:"
55 print("and results:")
56
56
57 # note that each job in a map always returns a list of length chunksize
57 # note that each job in a map always returns a list of length chunksize
58 # even if chunksize == 1
58 # even if chunksize == 1
59 for (count,t) in ar.result:
59 for (count,t) in ar.result:
60 print " item %i: slept for %.2fs" % (count, t)
60 print(" item %i: slept for %.2fs" % (count, t))
61
61
@@ -79,7 +79,7 b' def main(nodes, edges):'
79 from matplotlib import pyplot as plt
79 from matplotlib import pyplot as plt
80 from matplotlib.dates import date2num
80 from matplotlib.dates import date2num
81 from matplotlib.cm import gist_rainbow
81 from matplotlib.cm import gist_rainbow
82 print "building DAG"
82 print("building DAG")
83 G = random_dag(nodes, edges)
83 G = random_dag(nodes, edges)
84 jobs = {}
84 jobs = {}
85 pos = {}
85 pos = {}
@@ -89,11 +89,11 b' def main(nodes, edges):'
89
89
90 client = parallel.Client()
90 client = parallel.Client()
91 view = client.load_balanced_view()
91 view = client.load_balanced_view()
92 print "submitting %i tasks with %i dependencies"%(nodes,edges)
92 print("submitting %i tasks with %i dependencies"%(nodes,edges))
93 results = submit_jobs(view, G, jobs)
93 results = submit_jobs(view, G, jobs)
94 print "waiting for results"
94 print("waiting for results")
95 view.wait()
95 view.wait()
96 print "done"
96 print("done")
97 for node in G:
97 for node in G:
98 md = results[node].metadata
98 md = results[node].metadata
99 start = date2num(md.started)
99 start = date2num(md.started)
@@ -117,4 +117,4 b" if __name__ == '__main__':"
117 # main(5,10)
117 # main(5,10)
118 main(32,96)
118 main(32,96)
119 plt.show()
119 plt.show()
120 No newline at end of file
120
@@ -44,21 +44,21 b" if __name__ == '__main__':"
44
44
45 if not os.path.exists('davinci.txt'):
45 if not os.path.exists('davinci.txt'):
46 # download from project gutenberg
46 # download from project gutenberg
47 print "Downloading Da Vinci's notebooks from Project Gutenberg"
47 print("Downloading Da Vinci's notebooks from Project Gutenberg")
48 urllib.urlretrieve(davinci_url, 'davinci.txt')
48 urllib.urlretrieve(davinci_url, 'davinci.txt')
49
49
50 # Run the serial version
50 # Run the serial version
51 print "Serial word frequency count:"
51 print("Serial word frequency count:")
52 text = open('davinci.txt').read()
52 text = open('davinci.txt').read()
53 tic = time.time()
53 tic = time.time()
54 freqs = wordfreq(text)
54 freqs = wordfreq(text)
55 toc = time.time()
55 toc = time.time()
56 print_wordfreq(freqs, 10)
56 print_wordfreq(freqs, 10)
57 print "Took %.3f s to calcluate"%(toc-tic)
57 print("Took %.3f s to calcluate"%(toc-tic))
58
58
59
59
60 # The parallel version
60 # The parallel version
61 print "\nParallel word frequency count:"
61 print("\nParallel word frequency count:")
62 # split the davinci.txt into one file per engine:
62 # split the davinci.txt into one file per engine:
63 lines = text.splitlines()
63 lines = text.splitlines()
64 nlines = len(lines)
64 nlines = len(lines)
@@ -75,6 +75,6 b" if __name__ == '__main__':"
75 pfreqs = pwordfreq(view,fnames)
75 pfreqs = pwordfreq(view,fnames)
76 toc = time.time()
76 toc = time.time()
77 print_wordfreq(freqs)
77 print_wordfreq(freqs)
78 print "Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets))
78 print("Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets)))
79 # cleanup split files
79 # cleanup split files
80 map(os.remove, fnames)
80 map(os.remove, fnames)
@@ -1,6 +1,7 b''
1 """Count the frequencies of words in a string"""
1 """Count the frequencies of words in a string"""
2
2
3 from __future__ import division
3 from __future__ import division
4 from __future__ import print_function
4
5
5 import cmath as math
6 import cmath as math
6
7
@@ -24,7 +25,7 b' def print_wordfreq(freqs, n=10):'
24 items = zip(counts, words)
25 items = zip(counts, words)
25 items.sort(reverse=True)
26 items.sort(reverse=True)
26 for (count, word) in items[:n]:
27 for (count, word) in items[:n]:
27 print word, count
28 print(word, count)
28
29
29
30
30 def wordfreq_to_weightsize(worddict, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
31 def wordfreq_to_weightsize(worddict, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0):
@@ -65,4 +66,4 b' def tagcloud(worddict, n=10, minsize=25, maxsize=50, minalpha=0.5, maxalpha=1.0)'
65 ax.autoscale_view()
66 ax.autoscale_view()
66 return ax
67 return ax
67
68
68 No newline at end of file
69
@@ -34,12 +34,12 b' view.block=True'
34
34
35 # will run on anything:
35 # will run on anything:
36 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
36 pids1 = [ view.apply(getpid) for i in range(len(client.ids)) ]
37 print pids1
37 print(pids1)
38 # will only run on e0:
38 # will only run on e0:
39 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
39 pids2 = [ view.apply(getpid2) for i in range(len(client.ids)) ]
40 print pids2
40 print(pids2)
41
41
42 print "now test some dependency behaviors"
42 print("now test some dependency behaviors")
43
43
44 def wait(t):
44 def wait(t):
45 import time
45 import time
@@ -98,31 +98,31 b' def should_fail(f):'
98 except error.KernelError:
98 except error.KernelError:
99 pass
99 pass
100 else:
100 else:
101 print 'should have raised'
101 print('should have raised')
102 # raise Exception("should have raised")
102 # raise Exception("should have raised")
103
103
104 # print r1a.msg_ids
104 # print(r1a.msg_ids)
105 r1a.get()
105 r1a.get()
106 # print r1b.msg_ids
106 # print(r1b.msg_ids)
107 r1b.get()
107 r1b.get()
108 # print r2a.msg_ids
108 # print(r2a.msg_ids)
109 should_fail(r2a.get)
109 should_fail(r2a.get)
110 # print r2b.msg_ids
110 # print(r2b.msg_ids)
111 should_fail(r2b.get)
111 should_fail(r2b.get)
112 # print r3.msg_ids
112 # print(r3.msg_ids)
113 should_fail(r3.get)
113 should_fail(r3.get)
114 # print r4a.msg_ids
114 # print(r4a.msg_ids)
115 r4a.get()
115 r4a.get()
116 # print r4b.msg_ids
116 # print(r4b.msg_ids)
117 r4b.get()
117 r4b.get()
118 # print r4c.msg_ids
118 # print(r4c.msg_ids)
119 should_fail(r4c.get)
119 should_fail(r4c.get)
120 # print r5.msg_ids
120 # print(r5.msg_ids)
121 r5.get()
121 r5.get()
122 # print r5b.msg_ids
122 # print(r5b.msg_ids)
123 should_fail(r5b.get)
123 should_fail(r5b.get)
124 # print r6.msg_ids
124 # print(r6.msg_ids)
125 should_fail(r6.get) # assuming > 1 engine
125 should_fail(r6.get) # assuming > 1 engine
126 # print r6b.msg_ids
126 # print(r6b.msg_ids)
127 should_fail(r6b.get)
127 should_fail(r6b.get)
128 print 'done'
128 print('done')
@@ -1,3 +1,5 b''
1 from __future__ import print_function
2
1 from IPython.parallel import *
3 from IPython.parallel import *
2
4
3 client = Client()
5 client = Client()
@@ -32,5 +34,5 b' ar = psquare.map(range(42))'
32
34
33 # wait for the results to be done:
35 # wait for the results to be done:
34 squares3 = ar.get()
36 squares3 = ar.get()
35 print squares == squares2, squares3==squares
37 print(squares == squares2, squares3==squares)
36 # True No newline at end of file
38 # True
@@ -30,14 +30,14 b' B1 = numpy.frombuffer(msg1.buffer, dtype=A.dtype).reshape(A.shape)'
30 msg2 = s2.recv(copy=False)
30 msg2 = s2.recv(copy=False)
31 B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape)
31 B2 = numpy.frombuffer(buffer(msg2.bytes), dtype=A.dtype).reshape(A.shape)
32
32
33 print (B1==B2).all()
33 print((B1==B2).all())
34 print (B1==A).all()
34 print((B1==A).all())
35 A[0][0] += 10
35 A[0][0] += 10
36 print "~"
36 print("~")
37 # after changing A in-place, B1 changes too, proving non-copying sends
37 # after changing A in-place, B1 changes too, proving non-copying sends
38 print (B1==A).all()
38 print((B1==A).all())
39 # but B2 is fixed, since it called the msg.bytes attr, which copies
39 # but B2 is fixed, since it called the msg.bytes attr, which copies
40 print (B1==B2).all()
40 print((B1==B2).all())
41
41
42
42
43
43
@@ -1,3 +1,5 b''
1 from __future__ import print_function
2
1 import time
3 import time
2 import numpy as np
4 import numpy as np
3 from IPython import parallel
5 from IPython import parallel
@@ -43,7 +45,7 b' def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):'
43 t2 /= trials
45 t2 /= trials
44 A[i] = (t1,t2)
46 A[i] = (t1,t2)
45 A[i] = n/A[i]
47 A[i] = n/A[i]
46 print n,A[i]
48 print(n,A[i])
47 return A
49 return A
48
50
49 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
51 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
@@ -59,6 +61,6 b' def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):'
59 t2 /= trials
61 t2 /= trials
60 A[i] = (t1,t2)
62 A[i] = (t1,t2)
61 A[i] = n/A[i]
63 A[i] = n/A[i]
62 print t,A[i]
64 print(t,A[i])
63 return A
65 return A
64 No newline at end of file
66
@@ -8,8 +8,8 b' for id in client.ids:'
8 v = client[0]
8 v = client[0]
9 v['a'] = 5
9 v['a'] = 5
10
10
11 print v['a']
11 print(v['a'])
12
12
13 remotes = client[:]
13 remotes = client[:]
14
14
15 print remotes['ids'] No newline at end of file
15 print(remotes['ids'])
@@ -8,8 +8,10 b' This module gives an example of how the task interface to the'
8 IPython controller works. Before running this script start the IPython controller
8 IPython controller works. Before running this script start the IPython controller
9 and some engines using something like::
9 and some engines using something like::
10
10
11 ipclusterz start -n 4
11 ipcluster start -n 4
12 """
12 """
13 from __future__ import print_function
14
13 import sys
15 import sys
14 from IPython.parallel import Client, error
16 from IPython.parallel import Client, error
15 import time
17 import time
@@ -53,11 +55,11 b' class DistributedSpider(object):'
53 if url not in self.allLinks:
55 if url not in self.allLinks:
54 self.allLinks.append(url)
56 self.allLinks.append(url)
55 if url.startswith(self.site):
57 if url.startswith(self.site):
56 print ' ', url
58 print(' ', url)
57 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
59 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
58
60
59 def onVisitDone(self, links, url):
61 def onVisitDone(self, links, url):
60 print url, ':'
62 print(url, ':')
61 self.linksDone[url] = None
63 self.linksDone[url] = None
62 del self.linksWorking[url]
64 del self.linksWorking[url]
63 for link in links:
65 for link in links:
@@ -66,7 +68,7 b' class DistributedSpider(object):'
66 def run(self):
68 def run(self):
67 self.visitLink(self.site)
69 self.visitLink(self.site)
68 while self.linksWorking:
70 while self.linksWorking:
69 print len(self.linksWorking), 'pending...'
71 print(len(self.linksWorking), 'pending...')
70 self.synchronize()
72 self.synchronize()
71 time.sleep(self.pollingDelay)
73 time.sleep(self.pollingDelay)
72
74
@@ -81,7 +83,7 b' class DistributedSpider(object):'
81 except Exception as e:
83 except Exception as e:
82 self.linksDone[url] = None
84 self.linksDone[url] = None
83 del self.linksWorking[url]
85 del self.linksWorking[url]
84 print url, ':', e.traceback
86 print(url, ':', e.traceback)
85 else:
87 else:
86 self.onVisitDone(links, url)
88 self.onVisitDone(links, url)
87
89
@@ -7,6 +7,7 b''
7 # Originally by Ken Kinder (ken at kenkinder dom com)
7 # Originally by Ken Kinder (ken at kenkinder dom com)
8
8
9 # <codecell>
9 # <codecell>
10 from __future__ import print_function
10
11
11 from IPython.parallel import Client
12 from IPython.parallel import Client
12
13
@@ -29,6 +30,6 b" hello = view.apply_async(sleep_and_echo, 2, 'Hello')"
29
30
30 # <codecell>
31 # <codecell>
31
32
32 print "Submitted tasks:", hello.msg_ids, world.msg_ids
33 print("Submitted tasks:", hello.msg_ids, world.msg_ids)
33 print hello.get(), world.get()
34 print(hello.get(), world.get())
34
35
@@ -15,7 +15,7 b' class EngineCommunicator(object):'
15
15
16 # configure sockets
16 # configure sockets
17 self.identity = identity or bytes(uuid.uuid4())
17 self.identity = identity or bytes(uuid.uuid4())
18 print self.identity
18 print(self.identity)
19 self.socket.setsockopt(zmq.IDENTITY, self.identity)
19 self.socket.setsockopt(zmq.IDENTITY, self.identity)
20 self.sub.setsockopt(zmq.SUBSCRIBE, b'')
20 self.sub.setsockopt(zmq.SUBSCRIBE, b'')
21
21
@@ -65,14 +65,14 b' def main(connection_file):'
65 # stdout/stderr
65 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
66 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
67 # them differently
68 print "%s: %s" % (topic, msg['content']['data'])
68 print("%s: %s" % (topic, msg['content']['data']))
69 elif msg['msg_type'] == 'pyerr':
69 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
70 # Python traceback
71 c = msg['content']
71 c = msg['content']
72 print topic + ':'
72 print(topic + ':')
73 for line in c['traceback']:
73 for line in c['traceback']:
74 # indent lines
74 # indent lines
75 print ' ' + line
75 print(' ' + line)
76
76
77 if __name__ == '__main__':
77 if __name__ == '__main__':
78 if len(sys.argv) > 1:
78 if len(sys.argv) > 1:
@@ -80,4 +80,4 b" if __name__ == '__main__':"
80 else:
80 else:
81 # This gets the security file for the default profile:
81 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
82 cf = get_security_file('ipcontroller-client.json')
83 main(cf) No newline at end of file
83 main(cf)
@@ -17,6 +17,8 b' Authors'
17 -------
17 -------
18 * MinRK
18 * MinRK
19 """
19 """
20 from __future__ import print_function
21
20 import time
22 import time
21
23
22 from IPython import parallel
24 from IPython import parallel
@@ -28,15 +30,15 b' v = rc.load_balanced_view()'
28
30
29 # scatter 'id', so id=0,1,2 on engines 0,1,2
31 # scatter 'id', so id=0,1,2 on engines 0,1,2
30 dv.scatter('id', rc.ids, flatten=True)
32 dv.scatter('id', rc.ids, flatten=True)
31 print "Engine IDs: ", dv['id']
33 print("Engine IDs: ", dv['id'])
32
34
33 # create a Reference to `id`. This will be a different value on each engine
35 # create a Reference to `id`. This will be a different value on each engine
34 ref = parallel.Reference('id')
36 ref = parallel.Reference('id')
35 print "sleeping for `id` seconds on each engine"
37 print("sleeping for `id` seconds on each engine")
36 tic = time.time()
38 tic = time.time()
37 ar = dv.apply(time.sleep, ref)
39 ar = dv.apply(time.sleep, ref)
38 for i,r in enumerate(ar):
40 for i,r in enumerate(ar):
39 print "%i: %.3f"%(i, time.time()-tic)
41 print("%i: %.3f"%(i, time.time()-tic))
40
42
41 def sleep_here(t):
43 def sleep_here(t):
42 import time
44 import time
@@ -44,22 +46,22 b' def sleep_here(t):'
44 return id,t
46 return id,t
45
47
46 # one call per task
48 # one call per task
47 print "running with one call per task"
49 print("running with one call per task")
48 amr = v.map(sleep_here, [.01*t for t in range(100)])
50 amr = v.map(sleep_here, [.01*t for t in range(100)])
49 tic = time.time()
51 tic = time.time()
50 for i,r in enumerate(amr):
52 for i,r in enumerate(amr):
51 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
53 print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
52
54
53 print "running with four calls per task"
55 print("running with four calls per task")
54 # with chunksize, we can have four calls per task
56 # with chunksize, we can have four calls per task
55 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
57 amr = v.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
56 tic = time.time()
58 tic = time.time()
57 for i,r in enumerate(amr):
59 for i,r in enumerate(amr):
58 print "task %i on engine %i: %.3f" % (i, r[0], time.time()-tic)
60 print("task %i on engine %i: %.3f" % (i, r[0], time.time()-tic))
59
61
60 print "running with two calls per task, with unordered results"
62 print("running with two calls per task, with unordered results")
61 # We can even iterate through faster results first, with ordered=False
63 # We can even iterate through faster results first, with ordered=False
62 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
64 amr = v.map(sleep_here, [.01*t for t in range(100,0,-1)], ordered=False, chunksize=2)
63 tic = time.time()
65 tic = time.time()
64 for i,r in enumerate(amr):
66 for i,r in enumerate(amr):
65 print "slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic)
67 print("slept %.2fs on engine %i: %.3f" % (r[1], r[0], time.time()-tic))
@@ -1,6 +1,7 b''
1 #-------------------------------------------------------------------------------
1 #-------------------------------------------------------------------------------
2 # Imports
2 # Imports
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 from __future__ import print_function
4
5
5 import time
6 import time
6
7
@@ -20,9 +21,9 b' ar1 = mux.apply(time.sleep, 5)'
20 ar2 = mux.push(dict(a=10,b=30,c=range(20000),d='The dog went swimming.'))
21 ar2 = mux.push(dict(a=10,b=30,c=range(20000),d='The dog went swimming.'))
21 ar3 = mux.pull(('a','b','d'), block=False)
22 ar3 = mux.pull(('a','b','d'), block=False)
22
23
23 print "Try a non-blocking get_result"
24 print("Try a non-blocking get_result")
24 ar4 = mux.get_result()
25 ar4 = mux.get_result()
25
26
26 print "Now wait for all the results"
27 print("Now wait for all the results")
27 mux.wait([ar1,ar2,ar3,ar4])
28 mux.wait([ar1,ar2,ar3,ar4])
28 print "The last pull got:", ar4.r
29 print("The last pull got:", ar4.r)
@@ -1,17 +1,19 b''
1 from __future__ import print_function
2
1 from IPython.parallel import Client
3 from IPython.parallel import Client
2
4
3 rc = Client()
5 rc = Client()
4 view = rc[:]
6 view = rc[:]
5 result = view.map_sync(lambda x: 2*x, range(10))
7 result = view.map_sync(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
8 print("Simple, default map: ", result)
7
9
8 ar = view.map_async(lambda x: 2*x, range(10))
10 ar = view.map_async(lambda x: 2*x, range(10))
9 print "Submitted map, got AsyncResult: ", ar
11 print("Submitted map, got AsyncResult: ", ar)
10 result = ar.r
12 result = ar.r
11 print "Using map_async: ", result
13 print("Using map_async: ", result)
12
14
13 @view.parallel(block=True)
15 @view.parallel(block=True)
14 def f(x): return 2*x
16 def f(x): return 2*x
15
17
16 result = f.map(range(10))
18 result = f.map(range(10))
17 print "Using a parallel function: ", result No newline at end of file
19 print("Using a parallel function: ", result)
@@ -2,6 +2,7 b''
2 """
2 """
3 # Slightly modified version of:
3 # Slightly modified version of:
4 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
4 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
5 from __future__ import print_function
5
6
6 import heapq
7 import heapq
7 from IPython.parallel.error import RemoteError
8 from IPython.parallel.error import RemoteError
@@ -82,7 +83,7 b' def remote_iterator(view,name):'
82 try:
83 try:
83 result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
84 result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
84 # This causes the StopIteration exception to be raised.
85 # This causes the StopIteration exception to be raised.
85 except RemoteError, e:
86 except RemoteError as e:
86 if e.ename == 'StopIteration':
87 if e.ename == 'StopIteration':
87 raise StopIteration
88 raise StopIteration
88 else:
89 else:
@@ -96,7 +97,7 b" if __name__ == '__main__':"
96 from IPython.parallel import Client, Reference
97 from IPython.parallel import Client, Reference
97 rc = Client()
98 rc = Client()
98 view = rc[:]
99 view = rc[:]
99 print 'Engine IDs:', rc.ids
100 print('Engine IDs:', rc.ids)
100
101
101 # Make a set of 'sorted datasets'
102 # Make a set of 'sorted datasets'
102 a0 = range(5,20)
103 a0 = range(5,20)
@@ -116,8 +117,8 b" if __name__ == '__main__':"
116 aa2 = remote_iterator(rc[2],'a')
117 aa2 = remote_iterator(rc[2],'a')
117
118
118 # Let's merge them, both locally and remotely:
119 # Let's merge them, both locally and remotely:
119 print 'Merge the local datasets:'
120 print('Merge the local datasets:')
120 print list(mergesort([a0,a1,a2]))
121 print(list(mergesort([a0,a1,a2])))
121
122
122 print 'Locally merge the remote sets:'
123 print('Locally merge the remote sets:')
123 print list(mergesort([aa0,aa1,aa2]))
124 print(list(mergesort([aa0,aa1,aa2])))
@@ -9,6 +9,7 b''
9 # ## Problem setup
9 # ## Problem setup
10
10
11 # <codecell>
11 # <codecell>
12 from __future__ import print_function
12
13
13 import sys
14 import sys
14 import time
15 import time
@@ -59,8 +60,8 b' view = c.load_balanced_view()'
59
60
60 # <codecell>
61 # <codecell>
61
62
62 print "Strike prices: ", strike_vals
63 print("Strike prices: ", strike_vals)
63 print "Volatilities: ", sigma_vals
64 print("Volatilities: ", sigma_vals)
64
65
65 # <markdowncell>
66 # <markdowncell>
66
67
@@ -77,7 +78,7 b' for strike in strike_vals:'
77
78
78 # <codecell>
79 # <codecell>
79
80
80 print "Submitted tasks: ", len(async_results)
81 print("Submitted tasks: ", len(async_results))
81
82
82 # <markdowncell>
83 # <markdowncell>
83
84
@@ -89,7 +90,7 b' c.wait(async_results)'
89 t2 = time.time()
90 t2 = time.time()
90 t = t2-t1
91 t = t2-t1
91
92
92 print "Parallel calculation completed, time = %s s" % t
93 print("Parallel calculation completed, time = %s s" % t)
93
94
94 # <markdowncell>
95 # <markdowncell>
95
96
@@ -15,6 +15,7 b' ftp://pi.super-computing.org/.2/pi200m/'
15 and the files used will be downloaded if they are not in the working directory
15 and the files used will be downloaded if they are not in the working directory
16 of the IPython engines.
16 of the IPython engines.
17 """
17 """
18 from __future__ import print_function
18
19
19 from IPython.parallel import Client
20 from IPython.parallel import Client
20 from matplotlib import pyplot as plt
21 from matplotlib import pyplot as plt
@@ -36,16 +37,16 b' id0 = c.ids[0]'
36 v = c[:]
37 v = c[:]
37 v.block=True
38 v.block=True
38 # fetch the pi-files
39 # fetch the pi-files
39 print "downloading %i files of pi"%n
40 print("downloading %i files of pi"%n)
40 v.map(fetch_pi_file, files[:n])
41 v.map(fetch_pi_file, files[:n])
41 print "done"
42 print("done")
42
43
43 # Run 10m digits on 1 engine
44 # Run 10m digits on 1 engine
44 t1 = clock()
45 t1 = clock()
45 freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
46 freqs10m = c[id0].apply_sync(compute_two_digit_freqs, files[0])
46 t2 = clock()
47 t2 = clock()
47 digits_per_second1 = 10.0e6/(t2-t1)
48 digits_per_second1 = 10.0e6/(t2-t1)
48 print "Digits per second (1 core, 10m digits): ", digits_per_second1
49 print("Digits per second (1 core, 10m digits): ", digits_per_second1)
49
50
50
51
51 # Run n*10m digits on all engines
52 # Run n*10m digits on all engines
@@ -54,9 +55,9 b' freqs_all = v.map(compute_two_digit_freqs, files[:n])'
54 freqs150m = reduce_freqs(freqs_all)
55 freqs150m = reduce_freqs(freqs_all)
55 t2 = clock()
56 t2 = clock()
56 digits_per_second8 = n*10.0e6/(t2-t1)
57 digits_per_second8 = n*10.0e6/(t2-t1)
57 print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8
58 print("Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_second8)
58
59
59 print "Speedup: ", digits_per_second8/digits_per_second1
60 print("Speedup: ", digits_per_second8/digits_per_second1)
60
61
61 plot_two_digit_freqs(freqs150m)
62 plot_two_digit_freqs(freqs150m)
62 plt.title("2 digit sequences in %i0m digits of pi"%n)
63 plt.title("2 digit sequences in %i0m digits of pi"%n)
@@ -17,6 +17,7 b' When used with IPython.parallel, this code is run on the engines. Because this'
17 code doesn't make any plots, the engines don't have to have any plotting
17 code doesn't make any plots, the engines don't have to have any plotting
18 packages installed.
18 packages installed.
19 """
19 """
20 from __future__ import print_function
20
21
21 # Imports
22 # Imports
22 import numpy as N
23 import numpy as N
@@ -51,7 +52,7 b' downy = downsample(x, d_number)'
51 downpx = downsample(px, d_number)
52 downpx = downsample(px, d_number)
52 downpy = downsample(py, d_number)
53 downpy = downsample(py, d_number)
53
54
54 print "downx: ", downx[:10]
55 print("downx: ", downx[:10])
55 print "downy: ", downy[:10]
56 print("downy: ", downy[:10])
56 print "downpx: ", downpx[:10]
57 print("downpx: ", downpx[:10])
57 print "downpy: ", downpy[:10] No newline at end of file
58 print("downpy: ", downpy[:10])
@@ -15,6 +15,7 b' Then a simple "run plotting_frontend.py" in IPython will run the'
15 example. When this is done, all the variables (such as number, downx, etc.)
15 example. When this is done, all the variables (such as number, downx, etc.)
16 are available in IPython, so for example you can make additional plots.
16 are available in IPython, so for example you can make additional plots.
17 """
17 """
18 from __future__ import print_function
18
19
19 import numpy as N
20 import numpy as N
20 from pylab import *
21 from pylab import *
@@ -36,8 +37,8 b" downpx = view.gather('downpx')"
36 downpy = view.gather('downpy')
37 downpy = view.gather('downpy')
37
38
38 # but we can still iterate through AsyncResults before they are done
39 # but we can still iterate through AsyncResults before they are done
39 print "number: ", sum(number)
40 print("number: ", sum(number))
40 print "downsampled number: ", sum(d_number)
41 print("downsampled number: ", sum(d_number))
41
42
42
43
43 # Make a scatter plot of the gathered data
44 # Make a scatter plot of the gathered data
@@ -57,4 +58,4 b' downpy = downpy.get()'
57 scatter(downpx, downpy)
58 scatter(downpx, downpy)
58 xlabel('px')
59 xlabel('px')
59 ylabel('py')
60 ylabel('py')
60 show() No newline at end of file
61 show()
@@ -5,6 +5,7 b''
5 # # Simple task farming example
5 # # Simple task farming example
6
6
7 # <codecell>
7 # <codecell>
8 from __future__ import print_function
8
9
9 from IPython.parallel import Client
10 from IPython.parallel import Client
10
11
@@ -48,5 +49,5 b' ar = v.apply(task, 5)'
48
49
49 # <codecell>
50 # <codecell>
50
51
51 print "a, b, c: ", ar.get()
52 print("a, b, c: ", ar.get())
52
53
@@ -16,8 +16,8 b' for i in range(24):'
16
16
17 for i in range(6):
17 for i in range(6):
18 time.sleep(1.0)
18 time.sleep(1.0)
19 print "Queue status (vebose=False)"
19 print("Queue status (vebose=False)")
20 print v.queue_status(verbose=False)
20 print(v.queue_status(verbose=False))
21 flush()
21 flush()
22
22
23 for i in range(24):
23 for i in range(24):
@@ -25,15 +25,15 b' for i in range(24):'
25
25
26 for i in range(6):
26 for i in range(6):
27 time.sleep(1.0)
27 time.sleep(1.0)
28 print "Queue status (vebose=True)"
28 print("Queue status (vebose=True)")
29 print v.queue_status(verbose=True)
29 print(v.queue_status(verbose=True))
30 flush()
30 flush()
31
31
32 for i in range(12):
32 for i in range(12):
33 v.apply(time.sleep, 2)
33 v.apply(time.sleep, 2)
34
34
35 print "Queue status (vebose=True)"
35 print("Queue status (vebose=True)")
36 print v.queue_status(verbose=True)
36 print(v.queue_status(verbose=True))
37 flush()
37 flush()
38
38
39 # qs = v.queue_status(verbose=True)
39 # qs = v.queue_status(verbose=True)
@@ -44,7 +44,7 b' for msg_id in v.history[-4:]:'
44
44
45 for i in range(6):
45 for i in range(6):
46 time.sleep(1.0)
46 time.sleep(1.0)
47 print "Queue status (vebose=True)"
47 print("Queue status (vebose=True)")
48 print v.queue_status(verbose=True)
48 print(v.queue_status(verbose=True))
49 flush()
49 flush()
50
50
@@ -42,7 +42,7 b' def main():'
42
42
43 rc = Client()
43 rc = Client()
44 view = rc.load_balanced_view()
44 view = rc.load_balanced_view()
45 print view
45 print(view)
46 rc.block=True
46 rc.block=True
47 nengines = len(rc.ids)
47 nengines = len(rc.ids)
48 with rc[:].sync_imports():
48 with rc[:].sync_imports():
@@ -52,7 +52,7 b' def main():'
52 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
52 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
53 stime = sum(times)
53 stime = sum(times)
54
54
55 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
55 print("executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines))
56 time.sleep(1)
56 time.sleep(1)
57 start = time.time()
57 start = time.time()
58 amr = view.map(time.sleep, times)
58 amr = view.map(time.sleep, times)
@@ -62,9 +62,9 b' def main():'
62 ptime = stop-start
62 ptime = stop-start
63 scale = stime/ptime
63 scale = stime/ptime
64
64
65 print "executed %.1f secs in %.1f secs"%(stime, ptime)
65 print("executed %.1f secs in %.1f secs"%(stime, ptime))
66 print "%.3fx parallel performance on %i engines"%(scale, nengines)
66 print("%.3fx parallel performance on %i engines"%(scale, nengines))
67 print "%.1f%% of theoretical max"%(100*scale/nengines)
67 print("%.1f%% of theoretical max"%(100*scale/nengines))
68
68
69
69
70 if __name__ == '__main__':
70 if __name__ == '__main__':
@@ -5,6 +5,7 b''
5 # # Load balanced map and parallel function decorator
5 # # Load balanced map and parallel function decorator
6
6
7 # <codecell>
7 # <codecell>
8 from __future__ import print_function
8
9
9 from IPython.parallel import Client
10 from IPython.parallel import Client
10
11
@@ -16,14 +17,14 b' v = rc.load_balanced_view()'
16 # <codecell>
17 # <codecell>
17
18
18 result = v.map(lambda x: 2*x, range(10))
19 result = v.map(lambda x: 2*x, range(10))
19 print "Simple, default map: ", list(result)
20 print("Simple, default map: ", list(result))
20
21
21 # <codecell>
22 # <codecell>
22
23
23 ar = v.map_async(lambda x: 2*x, range(10))
24 ar = v.map_async(lambda x: 2*x, range(10))
24 print "Submitted tasks, got ids: ", ar.msg_ids
25 print("Submitted tasks, got ids: ", ar.msg_ids)
25 result = ar.get()
26 result = ar.get()
26 print "Using a mapper: ", result
27 print("Using a mapper: ", result)
27
28
28 # <codecell>
29 # <codecell>
29
30
@@ -31,5 +32,5 b' print "Using a mapper: ", result'
31 def f(x): return 2*x
32 def f(x): return 2*x
32
33
33 result = f.map(range(10))
34 result = f.map(range(10))
34 print "Using a parallel function: ", result
35 print("Using a parallel function: ", result)
35
36
@@ -15,6 +15,7 b' Authors'
15 * Min Ragan-Kelley
15 * Min Ragan-Kelley
16
16
17 """
17 """
18 from __future__ import print_function
18 import time
19 import time
19
20
20 from numpy import zeros, ascontiguousarray, frombuffer
21 from numpy import zeros, ascontiguousarray, frombuffer
@@ -43,10 +44,10 b' class RectPartitioner:'
43
44
44 def redim (self, global_num_cells, num_parts):
45 def redim (self, global_num_cells, num_parts):
45 nsd_ = len(global_num_cells)
46 nsd_ = len(global_num_cells)
46 # print "Inside the redim function, nsd=%d" %nsd_
47 # print("Inside the redim function, nsd=%d" %nsd_)
47
48
48 if nsd_<1 | nsd_>3 | nsd_!=len(num_parts):
49 if nsd_<1 | nsd_>3 | nsd_!=len(num_parts):
49 print 'The input global_num_cells is not ok!'
50 print('The input global_num_cells is not ok!')
50 return
51 return
51
52
52 self.nsd = nsd_
53 self.nsd = nsd_
@@ -61,7 +62,7 b' class RectPartitioner:'
61
62
62 nsd_ = self.nsd
63 nsd_ = self.nsd
63 if nsd_<1:
64 if nsd_<1:
64 print 'Number of space dimensions is %d, nothing to do' %nsd_
65 print('Number of space dimensions is %d, nothing to do' %nsd_)
65 return
66 return
66
67
67 self.subd_rank = [-1,-1,-1]
68 self.subd_rank = [-1,-1,-1]
@@ -77,7 +78,7 b' class RectPartitioner:'
77 for i in range(nsd_):
78 for i in range(nsd_):
78 num_subds = num_subds*self.num_parts[i]
79 num_subds = num_subds*self.num_parts[i]
79 if my_id==0:
80 if my_id==0:
80 print "# subds=", num_subds
81 print("# subds=", num_subds)
81 # should check num_subds againt num_procs
82 # should check num_subds againt num_procs
82
83
83 offsets = [1, 0, 0]
84 offsets = [1, 0, 0]
@@ -93,9 +94,9 b' class RectPartitioner:'
93 self.subd_rank[1] = (my_id%offsets[2])/self.num_parts[0]
94 self.subd_rank[1] = (my_id%offsets[2])/self.num_parts[0]
94 self.subd_rank[2] = my_id/offsets[2]
95 self.subd_rank[2] = my_id/offsets[2]
95
96
96 print "my_id=%d, subd_rank: "%my_id, self.subd_rank
97 print("my_id=%d, subd_rank: "%my_id, self.subd_rank)
97 if my_id==0:
98 if my_id==0:
98 print "offsets=", offsets
99 print("offsets=", offsets)
99
100
100 # find the neighbor ids
101 # find the neighbor ids
101 for i in range(nsd_):
102 for i in range(nsd_):
@@ -118,11 +119,11 b' class RectPartitioner:'
118 ix = ix+1 # one cell of overlap
119 ix = ix+1 # one cell of overlap
119 self.subd_hi_ix[i] = ix
120 self.subd_hi_ix[i] = ix
120
121
121 print "subd_rank:",self.subd_rank,\
122 print("subd_rank:",self.subd_rank,\
122 "lower_neig:", self.lower_neighbors, \
123 "lower_neig:", self.lower_neighbors, \
123 "upper_neig:", self.upper_neighbors
124 "upper_neig:", self.upper_neighbors)
124 print "subd_rank:",self.subd_rank,"subd_lo_ix:", self.subd_lo_ix, \
125 print("subd_rank:",self.subd_rank,"subd_lo_ix:", self.subd_lo_ix, \
125 "subd_hi_ix:", self.subd_hi_ix
126 "subd_hi_ix:", self.subd_hi_ix)
126
127
127
128
128 class RectPartitioner1D(RectPartitioner):
129 class RectPartitioner1D(RectPartitioner):
@@ -199,10 +200,10 b' class MPIRectPartitioner2D(RectPartitioner2D):'
199 def update_internal_boundary (self, solution_array):
200 def update_internal_boundary (self, solution_array):
200 nsd_ = self.nsd
201 nsd_ = self.nsd
201 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
202 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
202 print "Buffers for communicating with lower neighbors not ready"
203 print("Buffers for communicating with lower neighbors not ready")
203 return
204 return
204 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
205 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
205 print "Buffers for communicating with upper neighbors not ready"
206 print("Buffers for communicating with upper neighbors not ready")
206 return
207 return
207
208
208 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
209 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
@@ -299,10 +300,10 b' class ZMQRectPartitioner2D(RectPartitioner2D):'
299 nsd_ = self.nsd
300 nsd_ = self.nsd
300 dtype = solution_array.dtype
301 dtype = solution_array.dtype
301 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
302 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
302 print "Buffers for communicating with lower neighbors not ready"
303 print("Buffers for communicating with lower neighbors not ready")
303 return
304 return
304 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
305 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
305 print "Buffers for communicating with upper neighbors not ready"
306 print("Buffers for communicating with upper neighbors not ready")
306 return
307 return
307
308
308 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
309 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
@@ -390,10 +391,10 b' class ZMQRectPartitioner2D(RectPartitioner2D):'
390 nsd_ = self.nsd
391 nsd_ = self.nsd
391 dtype = solution_array.dtype
392 dtype = solution_array.dtype
392 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
393 if nsd_!=len(self.in_lower_buffers) | nsd_!=len(self.out_lower_buffers):
393 print "Buffers for communicating with lower neighbors not ready"
394 print("Buffers for communicating with lower neighbors not ready")
394 return
395 return
395 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
396 if nsd_!=len(self.in_upper_buffers) | nsd_!=len(self.out_upper_buffers):
396 print "Buffers for communicating with upper neighbors not ready"
397 print("Buffers for communicating with upper neighbors not ready")
397 return
398 return
398
399
399 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
400 loc_nx = self.subd_hi_ix[0]-self.subd_lo_ix[0]
@@ -114,7 +114,7 b" if __name__ == '__main__':"
114
114
115 # construct the View:
115 # construct the View:
116 view = rc[:num_procs]
116 view = rc[:num_procs]
117 print "Running %s system on %s processes until %f"%(grid, partition, tstop)
117 print("Running %s system on %s processes until %f"%(grid, partition, tstop))
118
118
119 # functions defining initial/boundary/source conditions
119 # functions defining initial/boundary/source conditions
120 def I(x,y):
120 def I(x,y):
@@ -179,7 +179,7 b" if __name__ == '__main__':"
179 else:
179 else:
180 norm = -1
180 norm = -1
181 t1 = time.time()
181 t1 = time.time()
182 print 'scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
182 print('scalar inner-version, Wtime=%g, norm=%g'%(t1-t0, norm))
183
183
184 # run again with faster numpy-vectorized inner implementation:
184 # run again with faster numpy-vectorized inner implementation:
185 impl['inner'] = 'vectorized'
185 impl['inner'] = 'vectorized'
@@ -197,7 +197,7 b" if __name__ == '__main__':"
197 else:
197 else:
198 norm = -1
198 norm = -1
199 t1 = time.time()
199 t1 = time.time()
200 print 'vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm)
200 print('vector inner-version, Wtime=%g, norm=%g'%(t1-t0, norm))
201
201
202 # if ns.save is True, then u_hist stores the history of u as a list
202 # if ns.save is True, then u_hist stores the history of u as a list
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
203 # If the partion scheme is Nx1, then u can be reconstructed via 'gather':
@@ -206,4 +206,4 b" if __name__ == '__main__':"
206 view.execute('u_last=u_hist[-1]')
206 view.execute('u_last=u_hist[-1]')
207 u_last = view.gather('u_last', block=True)
207 u_last = view.gather('u_last', block=True)
208 pylab.pcolor(u_last)
208 pylab.pcolor(u_last)
209 pylab.show() No newline at end of file
209 pylab.show()
@@ -160,7 +160,7 b' class WaveSolver(object):'
160
160
161 if user_action is not None:
161 if user_action is not None:
162 user_action(u_1, x, y, t) # allow user to plot etc.
162 user_action(u_1, x, y, t) # allow user to plot etc.
163 # print list(self.us[2][2])
163 # print(list(self.us[2][2]))
164 self.us = (u,u_1,u_2)
164 self.us = (u,u_1,u_2)
165
165
166
166
@@ -196,8 +196,8 b' class WaveSolver(object):'
196 while t <= tstop:
196 while t <= tstop:
197 t_old = t; t += dt
197 t_old = t; t += dt
198 if verbose:
198 if verbose:
199 print 'solving (%s version) at t=%g' % \
199 print('solving (%s version) at t=%g' % \
200 (implementation['inner'], t)
200 (implementation['inner'], t))
201 # update all inner points:
201 # update all inner points:
202 if implementation['inner'] == 'scalar':
202 if implementation['inner'] == 'scalar':
203 for i in xrange(1, nx):
203 for i in xrange(1, nx):
@@ -251,9 +251,9 b' class WaveSolver(object):'
251 u_2, u_1, u = u_1, u, u_2
251 u_2, u_1, u = u_1, u, u_2
252
252
253 t1 = time.time()
253 t1 = time.time()
254 print 'my_id=%2d, dt=%g, %s version, slice_copy=%s, net Wtime=%g'\
254 print('my_id=%2d, dt=%g, %s version, slice_copy=%s, net Wtime=%g'\
255 %(partitioner.my_id,dt,implementation['inner'],\
255 %(partitioner.my_id,dt,implementation['inner'],\
256 partitioner.slice_copy,t1-t0)
256 partitioner.slice_copy,t1-t0))
257 # save the us
257 # save the us
258 self.us = u,u_1,u_2
258 self.us = u,u_1,u_2
259 # check final results; compute discrete L2-norm of the solution
259 # check final results; compute discrete L2-norm of the solution
@@ -25,7 +25,7 b' def gilsleep(t):'
25 ])
25 ])
26 while True:
26 while True:
27 inline(code, quiet=True, t=t)
27 inline(code, quiet=True, t=t)
28 print time.time()
28 print(time.time())
29 sys.stdout.flush() # this is important
29 sys.stdout.flush() # this is important
30
30
31 gilsleep(5)
31 gilsleep(5)
General Comments 0
You need to be logged in to leave comments. Login now