##// END OF EJS Templates
update a few parallel examples...
MinRK -
Show More
@@ -1,73 +1,80 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Parallel word frequency counter.
2 """Parallel word frequency counter.
3
3
4 This only works for a local cluster, because the filenames are local paths.
4 This only works for a local cluster, because the filenames are local paths.
5 """
5 """
6
6
7
7
8 import os
8 import os
9 import time
9 import urllib
10 import urllib
10
11
11 from itertools import repeat
12 from itertools import repeat
12
13
13 from wordfreq import print_wordfreq, wordfreq
14 from wordfreq import print_wordfreq, wordfreq
14
15
15 from IPython.parallel import Client, Reference
16 from IPython.parallel import Client, Reference
16
17
17 davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt"
18 davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt"
18
19
19 def pwordfreq(view, fnames):
20 def pwordfreq(view, fnames):
20 """Parallel word frequency counter.
21 """Parallel word frequency counter.
21
22
22 view - An IPython DirectView
23 view - An IPython DirectView
23 fnames - The filenames containing the split data.
24 fnames - The filenames containing the split data.
24 """
25 """
25 assert len(fnames) == len(view.targets)
26 assert len(fnames) == len(view.targets)
26 view.scatter('fname', fnames, flatten=True)
27 view.scatter('fname', fnames, flatten=True)
27 ar = view.apply(wordfreq, Reference('fname'))
28 ar = view.apply(wordfreq, Reference('fname'))
28 freqs_list = ar.get()
29 freqs_list = ar.get()
29 word_set = set()
30 word_set = set()
30 for f in freqs_list:
31 for f in freqs_list:
31 word_set.update(f.keys())
32 word_set.update(f.keys())
32 freqs = dict(zip(word_set, repeat(0)))
33 freqs = dict(zip(word_set, repeat(0)))
33 for f in freqs_list:
34 for f in freqs_list:
34 for word, count in f.iteritems():
35 for word, count in f.iteritems():
35 freqs[word] += count
36 freqs[word] += count
36 return freqs
37 return freqs
37
38
38 if __name__ == '__main__':
39 if __name__ == '__main__':
39 # Create a Client and View
40 # Create a Client and View
40 rc = Client()
41 rc = Client()
41
42
42 view = rc[:]
43 view = rc[:]
43
44
44 if not os.path.exists('davinci.txt'):
45 if not os.path.exists('davinci.txt'):
45 # download from project gutenberg
46 # download from project gutenberg
46 print "Downloading Da Vinci's notebooks from Project Gutenberg"
47 print "Downloading Da Vinci's notebooks from Project Gutenberg"
47 urllib.urlretrieve(davinci_url, 'davinci.txt')
48 urllib.urlretrieve(davinci_url, 'davinci.txt')
48
49
49 # Run the serial version
50 # Run the serial version
50 print "Serial word frequency count:"
51 print "Serial word frequency count:"
51 text = open('davinci.txt').read()
52 text = open('davinci.txt').read()
53 tic = time.time()
52 freqs = wordfreq(text)
54 freqs = wordfreq(text)
55 toc = time.time()
53 print_wordfreq(freqs, 10)
56 print_wordfreq(freqs, 10)
57 print "Took %.3f s to calcluate"%(toc-tic)
54
58
55
59
56 # The parallel version
60 # The parallel version
57 print "\nParallel word frequency count:"
61 print "\nParallel word frequency count:"
58 # split the davinci.txt into one file per engine:
62 # split the davinci.txt into one file per engine:
59 lines = text.splitlines()
63 lines = text.splitlines()
60 nlines = len(lines)
64 nlines = len(lines)
61 n = len(rc)
65 n = len(rc)
62 block = nlines/n
66 block = nlines/n
63 for i in range(n):
67 for i in range(n):
64 chunk = lines[i*block:i*(block+1)]
68 chunk = lines[i*block:i*(block+1)]
65 with open('davinci%i.txt'%i, 'w') as f:
69 with open('davinci%i.txt'%i, 'w') as f:
66 f.write('\n'.join(chunk))
70 f.write('\n'.join(chunk))
67
71
68 cwd = os.path.abspath(os.getcwd())
72 cwd = os.path.abspath(os.getcwd())
69 fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)]
73 fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)]
74 tic = time.time()
70 pfreqs = pwordfreq(view,fnames)
75 pfreqs = pwordfreq(view,fnames)
76 toc = time.time()
71 print_wordfreq(freqs)
77 print_wordfreq(freqs)
78 print "Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets))
72 # cleanup split files
79 # cleanup split files
73 map(os.remove, fnames)
80 map(os.remove, fnames)
@@ -1,37 +1,36 b''
1 from IPython.parallel import *
1 from IPython.parallel import *
2
2
3 client = Client()
3 client = Client()
4 view = client[:]
4 view = client.load_balanced_view()
5
5
6 @view.remote(block=True)
6 @view.remote(block=True)
7 def square(a):
7 def square(a):
8 """return square of a number"""
8 """return square of a number"""
9 return a*a
9 return a*a
10
10
11 squares = map(square, range(42))
11 squares = map(square, range(42))
12
12
13 # but that blocked between each result; not exactly useful
13 # but that blocked between each result; not exactly useful
14
14
15 square.block = False
15 square.block = False
16
16
17 arlist = map(square, range(42))
17 arlist = map(square, range(42))
18 # submitted very fast
18 # submitted very fast
19
19
20 # wait for the results:
20 # wait for the results:
21 squares2 = [ r.get() for r in arlist ]
21 squares2 = [ r.get() for r in arlist ]
22
22
23 # now the more convenient @parallel decorator, which has a map method:
23 # now the more convenient @parallel decorator, which has a map method:
24
24 view2 = client[:]
25 @view.parallel(block=False)
25 @view2.parallel(block=False)
26 def psquare(a):
26 def psquare(a):
27 """return square of a number"""
27 """return square of a number"""
28 return a*a
28 return a*a
29
29
30 # this chunks the data into n-negines jobs, not 42 jobs:
30 # this chunks the data into n-negines jobs, not 42 jobs:
31 ar = psquare.map(range(42))
31 ar = psquare.map(range(42))
32
32
33 # wait for the results to be done:
33 # wait for the results to be done:
34 squares3 = ar.get()
34 squares3 = ar.get()
35
36 print squares == squares2, squares3==squares
35 print squares == squares2, squares3==squares
37 # True No newline at end of file
36 # True
@@ -1,86 +1,64 b''
1 import time
1 import time
2 import numpy as np
2 import numpy as np
3 from IPython import parallel
3 from IPython import parallel
4
4
5 nlist = map(int, np.logspace(2,9,16,base=2))
5 nlist = map(int, np.logspace(2,9,16,base=2))
6 nlist2 = map(int, np.logspace(2,8,15,base=2))
6 nlist2 = map(int, np.logspace(2,8,15,base=2))
7 tlist = map(int, np.logspace(7,22,16,base=2))
7 tlist = map(int, np.logspace(7,22,16,base=2))
8 nt = 16
8 nt = 16
9 def wait(t=0):
9 def wait(t=0):
10 import time
10 import time
11 time.sleep(t)
11 time.sleep(t)
12
12
13 def echo(s=''):
13 def echo(s=''):
14 return s
14 return s
15
15
16 def time_throughput(nmessages, t=0, f=wait):
16 def time_throughput(nmessages, t=0, f=wait):
17 client = parallel.Client()
17 client = parallel.Client()
18 view = client[None]
18 view = client.load_balanced_view()
19 # do one ping before starting timing
19 # do one ping before starting timing
20 if f is echo:
20 if f is echo:
21 t = np.random.random(t/8)
21 t = np.random.random(t/8)
22 view.apply_sync(echo, '')
22 view.apply_sync(echo, '')
23 client.spin()
23 client.spin()
24 tic = time.time()
24 tic = time.time()
25 for i in xrange(nmessages):
25 for i in xrange(nmessages):
26 view.apply(f, t)
26 view.apply(f, t)
27 lap = time.time()
27 lap = time.time()
28 client.barrier()
28 client.wait()
29 toc = time.time()
29 toc = time.time()
30 return lap-tic, toc-tic
30 return lap-tic, toc-tic
31
31
32 def time_twisted(nmessages, t=0, f=wait):
33 from IPython.kernel import client as kc
34 client = kc.TaskClient()
35 if f is wait:
36 s = "import time; time.sleep(%f)"%t
37 task = kc.StringTask(s)
38 elif f is echo:
39 t = np.random.random(t/8)
40 s = "s=t"
41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
42 else:
43 raise
44 # do one ping before starting timing
45 client.barrier(client.run(task))
46 tic = time.time()
47 tids = []
48 for i in xrange(nmessages):
49 tids.append(client.run(task))
50 lap = time.time()
51 client.barrier(tids)
52 toc = time.time()
53 return lap-tic, toc-tic
54
32
55 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
33 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
56 A = np.zeros((len(nlist),2))
34 A = np.zeros((len(nlist),2))
57 for i,n in enumerate(nlist):
35 for i,n in enumerate(nlist):
58 t1 = t2 = 0
36 t1 = t2 = 0
59 for _ in range(trials):
37 for _ in range(trials):
60 time.sleep(.25)
38 time.sleep(.25)
61 ts = runner(n,t,f)
39 ts = runner(n,t,f)
62 t1 += ts[0]
40 t1 += ts[0]
63 t2 += ts[1]
41 t2 += ts[1]
64 t1 /= trials
42 t1 /= trials
65 t2 /= trials
43 t2 /= trials
66 A[i] = (t1,t2)
44 A[i] = (t1,t2)
67 A[i] = n/A[i]
45 A[i] = n/A[i]
68 print n,A[i]
46 print n,A[i]
69 return A
47 return A
70
48
71 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
49 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
72 A = np.zeros((len(tlist),2))
50 A = np.zeros((len(tlist),2))
73 for i,t in enumerate(tlist):
51 for i,t in enumerate(tlist):
74 t1 = t2 = 0
52 t1 = t2 = 0
75 for _ in range(trials):
53 for _ in range(trials):
76 time.sleep(.25)
54 time.sleep(.25)
77 ts = runner(n,t,f)
55 ts = runner(n,t,f)
78 t1 += ts[0]
56 t1 += ts[0]
79 t2 += ts[1]
57 t2 += ts[1]
80 t1 /= trials
58 t1 /= trials
81 t2 /= trials
59 t2 /= trials
82 A[i] = (t1,t2)
60 A[i] = (t1,t2)
83 A[i] = n/A[i]
61 A[i] = n/A[i]
84 print t,A[i]
62 print t,A[i]
85 return A
63 return A
86 No newline at end of file
64
@@ -1,17 +1,17 b''
1 from IPython.parallel import Client
1 from IPython.parallel import Client
2
2
3 rc = Client()
3 rc = Client()
4 view = rc[:]
4 view = rc[:]
5 result = view.map_sync(lambda x: 2*x, range(10))
5 result = view.map_sync(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
6 print "Simple, default map: ", result
7
7
8 ar = view.map_async(lambda x: 2*x, range(10))
8 ar = view.map_async(lambda x: 2*x, range(10))
9 print "Submitted map, got AsyncResult: ", ar
9 print "Submitted map, got AsyncResult: ", ar
10 result = ar.r
10 result = ar.r
11 print "Using map_async: ", result
11 print "Using map_async: ", result
12
12
13 @view.parallel(block=True)
13 @view.parallel(block=True)
14 def f(x): return 2*x
14 def f(x): return 2*x
15
15
16 result = f(range(10))
16 result = f.map(range(10))
17 print "Using a parallel function: ", result No newline at end of file
17 print "Using a parallel function: ", result
@@ -1,40 +1,40 b''
1 """Parallel histogram function"""
1 """Parallel histogram function"""
2 import numpy
2 import numpy
3 from IPython.utils.pickleutil import Reference
3 from IPython.parallel import Reference
4
4
5 def phistogram(view, a, bins=10, rng=None, normed=False):
5 def phistogram(view, a, bins=10, rng=None, normed=False):
6 """Compute the histogram of a remote array a.
6 """Compute the histogram of a remote array a.
7
7
8 Parameters
8 Parameters
9 ----------
9 ----------
10 view
10 view
11 IPython DirectView instance
11 IPython DirectView instance
12 a : str
12 a : str
13 String name of the remote array
13 String name of the remote array
14 bins : int
14 bins : int
15 Number of histogram bins
15 Number of histogram bins
16 rng : (float, float)
16 rng : (float, float)
17 Tuple of min, max of the range to histogram
17 Tuple of min, max of the range to histogram
18 normed : boolean
18 normed : boolean
19 Should the histogram counts be normalized to 1
19 Should the histogram counts be normalized to 1
20 """
20 """
21 nengines = len(view.targets)
21 nengines = len(view.targets)
22
22
23 # view.push(dict(bins=bins, rng=rng))
23 # view.push(dict(bins=bins, rng=rng))
24 with view.sync_imports():
24 with view.sync_imports():
25 import numpy
25 import numpy
26 rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
26 rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
27 hists = [ r[0] for r in rets ]
27 hists = [ r[0] for r in rets ]
28 lower_edges = [ r[1] for r in rets ]
28 lower_edges = [ r[1] for r in rets ]
29 # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
29 # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
30 lower_edges = view.pull('lower_edges', targets=0)
30 lower_edges = view.pull('lower_edges', targets=0)
31 hist_array = numpy.array(hists).reshape(nengines, -1)
31 hist_array = numpy.array(hists).reshape(nengines, -1)
32 # hist_array.shape = (nengines,-1)
32 # hist_array.shape = (nengines,-1)
33 total_hist = numpy.sum(hist_array, 0)
33 total_hist = numpy.sum(hist_array, 0)
34 if normed:
34 if normed:
35 total_hist = total_hist/numpy.sum(total_hist,dtype=float)
35 total_hist = total_hist/numpy.sum(total_hist,dtype=float)
36 return total_hist, lower_edges
36 return total_hist, lower_edges
37
37
38
38
39
39
40
40
@@ -1,70 +1,71 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Test the performance of the task farming system.
2 """Test the performance of the task farming system.
3
3
4 This script submits a set of tasks via a LoadBalancedView. The tasks
4 This script submits a set of tasks via a LoadBalancedView. The tasks
5 are basically just a time.sleep(t), where t is a random number between
5 are basically just a time.sleep(t), where t is a random number between
6 two limits that can be configured at the command line. To run
6 two limits that can be configured at the command line. To run
7 the script there must first be an IPython controller and engines running::
7 the script there must first be an IPython controller and engines running::
8
8
9 ipclusterz start -n 16
9 ipclusterz start -n 16
10
10
11 A good test to run with 16 engines is::
11 A good test to run with 16 engines is::
12
12
13 python task_profiler.py -n 128 -t 0.01 -T 1.0
13 python task_profiler.py -n 128 -t 0.01 -T 1.0
14
14
15 This should show a speedup of 13-14x. The limitation here is that the
15 This should show a speedup of 13-14x. The limitation here is that the
16 overhead of a single task is about 0.001-0.01 seconds.
16 overhead of a single task is about 0.001-0.01 seconds.
17 """
17 """
18 import random, sys
18 import random, sys
19 from optparse import OptionParser
19 from optparse import OptionParser
20
20
21 from IPython.utils.timing import time
21 from IPython.utils.timing import time
22 from IPython.parallel import Client
22 from IPython.parallel import Client
23
23
24 def main():
24 def main():
25 parser = OptionParser()
25 parser = OptionParser()
26 parser.set_defaults(n=100)
26 parser.set_defaults(n=100)
27 parser.set_defaults(tmin=1)
27 parser.set_defaults(tmin=1e-3)
28 parser.set_defaults(tmax=60)
28 parser.set_defaults(tmax=1)
29 parser.set_defaults(profile='default')
29 parser.set_defaults(profile='default')
30
30
31 parser.add_option("-n", type='int', dest='n',
31 parser.add_option("-n", type='int', dest='n',
32 help='the number of tasks to run')
32 help='the number of tasks to run')
33 parser.add_option("-t", type='float', dest='tmin',
33 parser.add_option("-t", type='float', dest='tmin',
34 help='the minimum task length in seconds')
34 help='the minimum task length in seconds')
35 parser.add_option("-T", type='float', dest='tmax',
35 parser.add_option("-T", type='float', dest='tmax',
36 help='the maximum task length in seconds')
36 help='the maximum task length in seconds')
37 parser.add_option("-p", '--profile', type='str', dest='profile',
37 parser.add_option("-p", '--profile', type='str', dest='profile',
38 help="the cluster profile [default: 'default']")
38 help="the cluster profile [default: 'default']")
39
39
40 (opts, args) = parser.parse_args()
40 (opts, args) = parser.parse_args()
41 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
41 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
42
42
43 rc = Client()
43 rc = Client()
44 view = rc.load_balanced_view()
44 view = rc.load_balanced_view()
45 print view
45 print view
46 rc.block=True
46 rc.block=True
47 nengines = len(rc.ids)
47 nengines = len(rc.ids)
48 rc[:].execute('from IPython.utils.timing import time')
48 with rc[:].sync_imports():
49 from IPython.utils.timing import time
49
50
50 # the jobs should take a random time within a range
51 # the jobs should take a random time within a range
51 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
52 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
52 stime = sum(times)
53 stime = sum(times)
53
54
54 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
55 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
55 time.sleep(1)
56 time.sleep(1)
56 start = time.time()
57 start = time.time()
57 amr = view.map(time.sleep, times)
58 amr = view.map(time.sleep, times)
58 amr.get()
59 amr.get()
59 stop = time.time()
60 stop = time.time()
60
61
61 ptime = stop-start
62 ptime = stop-start
62 scale = stime/ptime
63 scale = stime/ptime
63
64
64 print "executed %.1f secs in %.1f secs"%(stime, ptime)
65 print "executed %.1f secs in %.1f secs"%(stime, ptime)
65 print "%.3fx parallel performance on %i engines"%(scale, nengines)
66 print "%.3fx parallel performance on %i engines"%(scale, nengines)
66 print "%.1f%% of theoretical max"%(100*scale/nengines)
67 print "%.1f%% of theoretical max"%(100*scale/nengines)
67
68
68
69
69 if __name__ == '__main__':
70 if __name__ == '__main__':
70 main()
71 main()
General Comments 0
You need to be logged in to leave comments. Login now