##// END OF EJS Templates
update a few parallel examples...
MinRK -
Show More
@@ -1,73 +1,80 b''
1 1 #!/usr/bin/env python
2 2 """Parallel word frequency counter.
3 3
4 4 This only works for a local cluster, because the filenames are local paths.
5 5 """
6 6
7 7
8 8 import os
9 import time
9 10 import urllib
10 11
11 12 from itertools import repeat
12 13
13 14 from wordfreq import print_wordfreq, wordfreq
14 15
15 16 from IPython.parallel import Client, Reference
16 17
17 18 davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt"
18 19
19 20 def pwordfreq(view, fnames):
20 21 """Parallel word frequency counter.
21 22
22 23 view - An IPython DirectView
23 24 fnames - The filenames containing the split data.
24 25 """
25 26 assert len(fnames) == len(view.targets)
26 27 view.scatter('fname', fnames, flatten=True)
27 28 ar = view.apply(wordfreq, Reference('fname'))
28 29 freqs_list = ar.get()
29 30 word_set = set()
30 31 for f in freqs_list:
31 32 word_set.update(f.keys())
32 33 freqs = dict(zip(word_set, repeat(0)))
33 34 for f in freqs_list:
34 35 for word, count in f.iteritems():
35 36 freqs[word] += count
36 37 return freqs
37 38
38 39 if __name__ == '__main__':
39 40 # Create a Client and View
40 41 rc = Client()
41 42
42 43 view = rc[:]
43 44
44 45 if not os.path.exists('davinci.txt'):
45 46 # download from project gutenberg
46 47 print "Downloading Da Vinci's notebooks from Project Gutenberg"
47 48 urllib.urlretrieve(davinci_url, 'davinci.txt')
48 49
49 50 # Run the serial version
50 51 print "Serial word frequency count:"
51 52 text = open('davinci.txt').read()
53 tic = time.time()
52 54 freqs = wordfreq(text)
55 toc = time.time()
53 56 print_wordfreq(freqs, 10)
57 print "Took %.3f s to calcluate"%(toc-tic)
54 58
55 59
56 60 # The parallel version
57 61 print "\nParallel word frequency count:"
58 62 # split the davinci.txt into one file per engine:
59 63 lines = text.splitlines()
60 64 nlines = len(lines)
61 65 n = len(rc)
62 66 block = nlines/n
63 67 for i in range(n):
64 68 chunk = lines[i*block:i*(block+1)]
65 69 with open('davinci%i.txt'%i, 'w') as f:
66 70 f.write('\n'.join(chunk))
67 71
68 72 cwd = os.path.abspath(os.getcwd())
69 73 fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)]
74 tic = time.time()
70 75 pfreqs = pwordfreq(view,fnames)
76 toc = time.time()
71 77 print_wordfreq(freqs)
78 print "Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets))
72 79 # cleanup split files
73 80 map(os.remove, fnames)
@@ -1,37 +1,36 b''
1 1 from IPython.parallel import *
2 2
3 3 client = Client()
4 view = client[:]
4 view = client.load_balanced_view()
5 5
6 6 @view.remote(block=True)
7 7 def square(a):
8 8 """return square of a number"""
9 9 return a*a
10 10
11 11 squares = map(square, range(42))
12 12
13 13 # but that blocked between each result; not exactly useful
14 14
15 15 square.block = False
16 16
17 17 arlist = map(square, range(42))
18 18 # submitted very fast
19 19
20 20 # wait for the results:
21 21 squares2 = [ r.get() for r in arlist ]
22 22
23 23 # now the more convenient @parallel decorator, which has a map method:
24
25 @view.parallel(block=False)
24 view2 = client[:]
25 @view2.parallel(block=False)
26 26 def psquare(a):
27 27 """return square of a number"""
28 28 return a*a
29 29
30 30 # this chunks the data into n-negines jobs, not 42 jobs:
31 31 ar = psquare.map(range(42))
32 32
33 33 # wait for the results to be done:
34 34 squares3 = ar.get()
35
36 35 print squares == squares2, squares3==squares
37 36 # True No newline at end of file
@@ -1,86 +1,64 b''
1 1 import time
2 2 import numpy as np
3 3 from IPython import parallel
4 4
5 5 nlist = map(int, np.logspace(2,9,16,base=2))
6 6 nlist2 = map(int, np.logspace(2,8,15,base=2))
7 7 tlist = map(int, np.logspace(7,22,16,base=2))
8 8 nt = 16
9 9 def wait(t=0):
10 10 import time
11 11 time.sleep(t)
12 12
13 13 def echo(s=''):
14 14 return s
15 15
16 16 def time_throughput(nmessages, t=0, f=wait):
17 17 client = parallel.Client()
18 view = client[None]
18 view = client.load_balanced_view()
19 19 # do one ping before starting timing
20 20 if f is echo:
21 21 t = np.random.random(t/8)
22 22 view.apply_sync(echo, '')
23 23 client.spin()
24 24 tic = time.time()
25 25 for i in xrange(nmessages):
26 26 view.apply(f, t)
27 27 lap = time.time()
28 client.barrier()
28 client.wait()
29 29 toc = time.time()
30 30 return lap-tic, toc-tic
31 31
32 def time_twisted(nmessages, t=0, f=wait):
33 from IPython.kernel import client as kc
34 client = kc.TaskClient()
35 if f is wait:
36 s = "import time; time.sleep(%f)"%t
37 task = kc.StringTask(s)
38 elif f is echo:
39 t = np.random.random(t/8)
40 s = "s=t"
41 task = kc.StringTask(s, push=dict(t=t), pull=['s'])
42 else:
43 raise
44 # do one ping before starting timing
45 client.barrier(client.run(task))
46 tic = time.time()
47 tids = []
48 for i in xrange(nmessages):
49 tids.append(client.run(task))
50 lap = time.time()
51 client.barrier(tids)
52 toc = time.time()
53 return lap-tic, toc-tic
54 32
55 33 def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput):
56 34 A = np.zeros((len(nlist),2))
57 35 for i,n in enumerate(nlist):
58 36 t1 = t2 = 0
59 37 for _ in range(trials):
60 38 time.sleep(.25)
61 39 ts = runner(n,t,f)
62 40 t1 += ts[0]
63 41 t2 += ts[1]
64 42 t1 /= trials
65 43 t2 /= trials
66 44 A[i] = (t1,t2)
67 45 A[i] = n/A[i]
68 46 print n,A[i]
69 47 return A
70 48
71 49 def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput):
72 50 A = np.zeros((len(tlist),2))
73 51 for i,t in enumerate(tlist):
74 52 t1 = t2 = 0
75 53 for _ in range(trials):
76 54 time.sleep(.25)
77 55 ts = runner(n,t,f)
78 56 t1 += ts[0]
79 57 t2 += ts[1]
80 58 t1 /= trials
81 59 t2 /= trials
82 60 A[i] = (t1,t2)
83 61 A[i] = n/A[i]
84 62 print t,A[i]
85 63 return A
86 64 No newline at end of file
@@ -1,17 +1,17 b''
1 1 from IPython.parallel import Client
2 2
3 3 rc = Client()
4 4 view = rc[:]
5 5 result = view.map_sync(lambda x: 2*x, range(10))
6 6 print "Simple, default map: ", result
7 7
8 8 ar = view.map_async(lambda x: 2*x, range(10))
9 9 print "Submitted map, got AsyncResult: ", ar
10 10 result = ar.r
11 11 print "Using map_async: ", result
12 12
13 13 @view.parallel(block=True)
14 14 def f(x): return 2*x
15 15
16 result = f(range(10))
16 result = f.map(range(10))
17 17 print "Using a parallel function: ", result No newline at end of file
@@ -1,40 +1,40 b''
1 1 """Parallel histogram function"""
2 2 import numpy
3 from IPython.utils.pickleutil import Reference
3 from IPython.parallel import Reference
4 4
5 5 def phistogram(view, a, bins=10, rng=None, normed=False):
6 6 """Compute the histogram of a remote array a.
7 7
8 8 Parameters
9 9 ----------
10 10 view
11 11 IPython DirectView instance
12 12 a : str
13 13 String name of the remote array
14 14 bins : int
15 15 Number of histogram bins
16 16 rng : (float, float)
17 17 Tuple of min, max of the range to histogram
18 18 normed : boolean
19 19 Should the histogram counts be normalized to 1
20 20 """
21 21 nengines = len(view.targets)
22 22
23 23 # view.push(dict(bins=bins, rng=rng))
24 24 with view.sync_imports():
25 25 import numpy
26 26 rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng)
27 27 hists = [ r[0] for r in rets ]
28 28 lower_edges = [ r[1] for r in rets ]
29 29 # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a)
30 30 lower_edges = view.pull('lower_edges', targets=0)
31 31 hist_array = numpy.array(hists).reshape(nengines, -1)
32 32 # hist_array.shape = (nengines,-1)
33 33 total_hist = numpy.sum(hist_array, 0)
34 34 if normed:
35 35 total_hist = total_hist/numpy.sum(total_hist,dtype=float)
36 36 return total_hist, lower_edges
37 37
38 38
39 39
40 40
@@ -1,70 +1,71 b''
1 1 #!/usr/bin/env python
2 2 """Test the performance of the task farming system.
3 3
4 4 This script submits a set of tasks via a LoadBalancedView. The tasks
5 5 are basically just a time.sleep(t), where t is a random number between
6 6 two limits that can be configured at the command line. To run
7 7 the script there must first be an IPython controller and engines running::
8 8
9 9 ipclusterz start -n 16
10 10
11 11 A good test to run with 16 engines is::
12 12
13 13 python task_profiler.py -n 128 -t 0.01 -T 1.0
14 14
15 15 This should show a speedup of 13-14x. The limitation here is that the
16 16 overhead of a single task is about 0.001-0.01 seconds.
17 17 """
18 18 import random, sys
19 19 from optparse import OptionParser
20 20
21 21 from IPython.utils.timing import time
22 22 from IPython.parallel import Client
23 23
24 24 def main():
25 25 parser = OptionParser()
26 26 parser.set_defaults(n=100)
27 parser.set_defaults(tmin=1)
28 parser.set_defaults(tmax=60)
27 parser.set_defaults(tmin=1e-3)
28 parser.set_defaults(tmax=1)
29 29 parser.set_defaults(profile='default')
30 30
31 31 parser.add_option("-n", type='int', dest='n',
32 32 help='the number of tasks to run')
33 33 parser.add_option("-t", type='float', dest='tmin',
34 34 help='the minimum task length in seconds')
35 35 parser.add_option("-T", type='float', dest='tmax',
36 36 help='the maximum task length in seconds')
37 37 parser.add_option("-p", '--profile', type='str', dest='profile',
38 38 help="the cluster profile [default: 'default']")
39 39
40 40 (opts, args) = parser.parse_args()
41 41 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
42 42
43 43 rc = Client()
44 44 view = rc.load_balanced_view()
45 45 print view
46 46 rc.block=True
47 47 nengines = len(rc.ids)
48 rc[:].execute('from IPython.utils.timing import time')
48 with rc[:].sync_imports():
49 from IPython.utils.timing import time
49 50
50 51 # the jobs should take a random time within a range
51 52 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
52 53 stime = sum(times)
53 54
54 55 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
55 56 time.sleep(1)
56 57 start = time.time()
57 58 amr = view.map(time.sleep, times)
58 59 amr.get()
59 60 stop = time.time()
60 61
61 62 ptime = stop-start
62 63 scale = stime/ptime
63 64
64 65 print "executed %.1f secs in %.1f secs"%(stime, ptime)
65 66 print "%.3fx parallel performance on %i engines"%(scale, nengines)
66 67 print "%.1f%% of theoretical max"%(100*scale/nengines)
67 68
68 69
69 70 if __name__ == '__main__':
70 71 main()
General Comments 0
You need to be logged in to leave comments. Login now