Show More
@@ -1,73 +1,80 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """Parallel word frequency counter. |
|
3 | 3 | |
|
4 | 4 | This only works for a local cluster, because the filenames are local paths. |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | |
|
8 | 8 | import os |
|
9 | import time | |
|
9 | 10 | import urllib |
|
10 | 11 | |
|
11 | 12 | from itertools import repeat |
|
12 | 13 | |
|
13 | 14 | from wordfreq import print_wordfreq, wordfreq |
|
14 | 15 | |
|
15 | 16 | from IPython.parallel import Client, Reference |
|
16 | 17 | |
|
17 | 18 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" |
|
18 | 19 | |
|
19 | 20 | def pwordfreq(view, fnames): |
|
20 | 21 | """Parallel word frequency counter. |
|
21 | 22 | |
|
22 | 23 | view - An IPython DirectView |
|
23 | 24 | fnames - The filenames containing the split data. |
|
24 | 25 | """ |
|
25 | 26 | assert len(fnames) == len(view.targets) |
|
26 | 27 | view.scatter('fname', fnames, flatten=True) |
|
27 | 28 | ar = view.apply(wordfreq, Reference('fname')) |
|
28 | 29 | freqs_list = ar.get() |
|
29 | 30 | word_set = set() |
|
30 | 31 | for f in freqs_list: |
|
31 | 32 | word_set.update(f.keys()) |
|
32 | 33 | freqs = dict(zip(word_set, repeat(0))) |
|
33 | 34 | for f in freqs_list: |
|
34 | 35 | for word, count in f.iteritems(): |
|
35 | 36 | freqs[word] += count |
|
36 | 37 | return freqs |
|
37 | 38 | |
|
38 | 39 | if __name__ == '__main__': |
|
39 | 40 | # Create a Client and View |
|
40 | 41 | rc = Client() |
|
41 | 42 | |
|
42 | 43 | view = rc[:] |
|
43 | 44 | |
|
44 | 45 | if not os.path.exists('davinci.txt'): |
|
45 | 46 | # download from project gutenberg |
|
46 | 47 | print "Downloading Da Vinci's notebooks from Project Gutenberg" |
|
47 | 48 | urllib.urlretrieve(davinci_url, 'davinci.txt') |
|
48 | 49 | |
|
49 | 50 | # Run the serial version |
|
50 | 51 | print "Serial word frequency count:" |
|
51 | 52 | text = open('davinci.txt').read() |
|
53 | tic = time.time() | |
|
52 | 54 | freqs = wordfreq(text) |
|
55 | toc = time.time() | |
|
53 | 56 | print_wordfreq(freqs, 10) |
|
57 | print "Took %.3f s to calcluate"%(toc-tic) | |
|
54 | 58 | |
|
55 | 59 | |
|
56 | 60 | # The parallel version |
|
57 | 61 | print "\nParallel word frequency count:" |
|
58 | 62 | # split the davinci.txt into one file per engine: |
|
59 | 63 | lines = text.splitlines() |
|
60 | 64 | nlines = len(lines) |
|
61 | 65 | n = len(rc) |
|
62 | 66 | block = nlines/n |
|
63 | 67 | for i in range(n): |
|
64 | 68 | chunk = lines[i*block:i*(block+1)] |
|
65 | 69 | with open('davinci%i.txt'%i, 'w') as f: |
|
66 | 70 | f.write('\n'.join(chunk)) |
|
67 | 71 | |
|
68 | 72 | cwd = os.path.abspath(os.getcwd()) |
|
69 | 73 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] |
|
74 | tic = time.time() | |
|
70 | 75 | pfreqs = pwordfreq(view,fnames) |
|
76 | toc = time.time() | |
|
71 | 77 | print_wordfreq(freqs) |
|
78 | print "Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets)) | |
|
72 | 79 | # cleanup split files |
|
73 | 80 | map(os.remove, fnames) |
@@ -1,37 +1,36 b'' | |||
|
1 | 1 | from IPython.parallel import * |
|
2 | 2 | |
|
3 | 3 | client = Client() |
|
4 | view = client[:] | |
|
4 | view = client.load_balanced_view() | |
|
5 | 5 | |
|
6 | 6 | @view.remote(block=True) |
|
7 | 7 | def square(a): |
|
8 | 8 | """return square of a number""" |
|
9 | 9 | return a*a |
|
10 | 10 | |
|
11 | 11 | squares = map(square, range(42)) |
|
12 | 12 | |
|
13 | 13 | # but that blocked between each result; not exactly useful |
|
14 | 14 | |
|
15 | 15 | square.block = False |
|
16 | 16 | |
|
17 | 17 | arlist = map(square, range(42)) |
|
18 | 18 | # submitted very fast |
|
19 | 19 | |
|
20 | 20 | # wait for the results: |
|
21 | 21 | squares2 = [ r.get() for r in arlist ] |
|
22 | 22 | |
|
23 | 23 | # now the more convenient @parallel decorator, which has a map method: |
|
24 | ||
|
25 | @view.parallel(block=False) | |
|
24 | view2 = client[:] | |
|
25 | @view2.parallel(block=False) | |
|
26 | 26 | def psquare(a): |
|
27 | 27 | """return square of a number""" |
|
28 | 28 | return a*a |
|
29 | 29 | |
|
30 | 30 | # this chunks the data into n-negines jobs, not 42 jobs: |
|
31 | 31 | ar = psquare.map(range(42)) |
|
32 | 32 | |
|
33 | 33 | # wait for the results to be done: |
|
34 | 34 | squares3 = ar.get() |
|
35 | ||
|
36 | 35 | print squares == squares2, squares3==squares |
|
37 | 36 | # True No newline at end of file |
@@ -1,86 +1,64 b'' | |||
|
1 | 1 | import time |
|
2 | 2 | import numpy as np |
|
3 | 3 | from IPython import parallel |
|
4 | 4 | |
|
5 | 5 | nlist = map(int, np.logspace(2,9,16,base=2)) |
|
6 | 6 | nlist2 = map(int, np.logspace(2,8,15,base=2)) |
|
7 | 7 | tlist = map(int, np.logspace(7,22,16,base=2)) |
|
8 | 8 | nt = 16 |
|
9 | 9 | def wait(t=0): |
|
10 | 10 | import time |
|
11 | 11 | time.sleep(t) |
|
12 | 12 | |
|
13 | 13 | def echo(s=''): |
|
14 | 14 | return s |
|
15 | 15 | |
|
16 | 16 | def time_throughput(nmessages, t=0, f=wait): |
|
17 | 17 | client = parallel.Client() |
|
18 |
view = client |
|
|
18 | view = client.load_balanced_view() | |
|
19 | 19 | # do one ping before starting timing |
|
20 | 20 | if f is echo: |
|
21 | 21 | t = np.random.random(t/8) |
|
22 | 22 | view.apply_sync(echo, '') |
|
23 | 23 | client.spin() |
|
24 | 24 | tic = time.time() |
|
25 | 25 | for i in xrange(nmessages): |
|
26 | 26 | view.apply(f, t) |
|
27 | 27 | lap = time.time() |
|
28 |
client. |
|
|
28 | client.wait() | |
|
29 | 29 | toc = time.time() |
|
30 | 30 | return lap-tic, toc-tic |
|
31 | 31 | |
|
32 | def time_twisted(nmessages, t=0, f=wait): | |
|
33 | from IPython.kernel import client as kc | |
|
34 | client = kc.TaskClient() | |
|
35 | if f is wait: | |
|
36 | s = "import time; time.sleep(%f)"%t | |
|
37 | task = kc.StringTask(s) | |
|
38 | elif f is echo: | |
|
39 | t = np.random.random(t/8) | |
|
40 | s = "s=t" | |
|
41 | task = kc.StringTask(s, push=dict(t=t), pull=['s']) | |
|
42 | else: | |
|
43 | raise | |
|
44 | # do one ping before starting timing | |
|
45 | client.barrier(client.run(task)) | |
|
46 | tic = time.time() | |
|
47 | tids = [] | |
|
48 | for i in xrange(nmessages): | |
|
49 | tids.append(client.run(task)) | |
|
50 | lap = time.time() | |
|
51 | client.barrier(tids) | |
|
52 | toc = time.time() | |
|
53 | return lap-tic, toc-tic | |
|
54 | 32 | |
|
55 | 33 | def do_runs(nlist,t=0,f=wait, trials=2, runner=time_throughput): |
|
56 | 34 | A = np.zeros((len(nlist),2)) |
|
57 | 35 | for i,n in enumerate(nlist): |
|
58 | 36 | t1 = t2 = 0 |
|
59 | 37 | for _ in range(trials): |
|
60 | 38 | time.sleep(.25) |
|
61 | 39 | ts = runner(n,t,f) |
|
62 | 40 | t1 += ts[0] |
|
63 | 41 | t2 += ts[1] |
|
64 | 42 | t1 /= trials |
|
65 | 43 | t2 /= trials |
|
66 | 44 | A[i] = (t1,t2) |
|
67 | 45 | A[i] = n/A[i] |
|
68 | 46 | print n,A[i] |
|
69 | 47 | return A |
|
70 | 48 | |
|
71 | 49 | def do_echo(n,tlist=[0],f=echo, trials=2, runner=time_throughput): |
|
72 | 50 | A = np.zeros((len(tlist),2)) |
|
73 | 51 | for i,t in enumerate(tlist): |
|
74 | 52 | t1 = t2 = 0 |
|
75 | 53 | for _ in range(trials): |
|
76 | 54 | time.sleep(.25) |
|
77 | 55 | ts = runner(n,t,f) |
|
78 | 56 | t1 += ts[0] |
|
79 | 57 | t2 += ts[1] |
|
80 | 58 | t1 /= trials |
|
81 | 59 | t2 /= trials |
|
82 | 60 | A[i] = (t1,t2) |
|
83 | 61 | A[i] = n/A[i] |
|
84 | 62 | print t,A[i] |
|
85 | 63 | return A |
|
86 | 64 | No newline at end of file |
@@ -1,17 +1,17 b'' | |||
|
1 | 1 | from IPython.parallel import Client |
|
2 | 2 | |
|
3 | 3 | rc = Client() |
|
4 | 4 | view = rc[:] |
|
5 | 5 | result = view.map_sync(lambda x: 2*x, range(10)) |
|
6 | 6 | print "Simple, default map: ", result |
|
7 | 7 | |
|
8 | 8 | ar = view.map_async(lambda x: 2*x, range(10)) |
|
9 | 9 | print "Submitted map, got AsyncResult: ", ar |
|
10 | 10 | result = ar.r |
|
11 | 11 | print "Using map_async: ", result |
|
12 | 12 | |
|
13 | 13 | @view.parallel(block=True) |
|
14 | 14 | def f(x): return 2*x |
|
15 | 15 | |
|
16 | result = f(range(10)) | |
|
16 | result = f.map(range(10)) | |
|
17 | 17 | print "Using a parallel function: ", result No newline at end of file |
@@ -1,40 +1,40 b'' | |||
|
1 | 1 | """Parallel histogram function""" |
|
2 | 2 | import numpy |
|
3 |
from IPython. |
|
|
3 | from IPython.parallel import Reference | |
|
4 | 4 | |
|
5 | 5 | def phistogram(view, a, bins=10, rng=None, normed=False): |
|
6 | 6 | """Compute the histogram of a remote array a. |
|
7 | 7 | |
|
8 | 8 | Parameters |
|
9 | 9 | ---------- |
|
10 | 10 | view |
|
11 | 11 | IPython DirectView instance |
|
12 | 12 | a : str |
|
13 | 13 | String name of the remote array |
|
14 | 14 | bins : int |
|
15 | 15 | Number of histogram bins |
|
16 | 16 | rng : (float, float) |
|
17 | 17 | Tuple of min, max of the range to histogram |
|
18 | 18 | normed : boolean |
|
19 | 19 | Should the histogram counts be normalized to 1 |
|
20 | 20 | """ |
|
21 | 21 | nengines = len(view.targets) |
|
22 | 22 | |
|
23 | 23 | # view.push(dict(bins=bins, rng=rng)) |
|
24 | 24 | with view.sync_imports(): |
|
25 | 25 | import numpy |
|
26 | 26 | rets = view.apply_sync(lambda a, b, rng: numpy.histogram(a,b,rng), Reference(a), bins, rng) |
|
27 | 27 | hists = [ r[0] for r in rets ] |
|
28 | 28 | lower_edges = [ r[1] for r in rets ] |
|
29 | 29 | # view.execute('hist, lower_edges = numpy.histogram(%s, bins, rng)' % a) |
|
30 | 30 | lower_edges = view.pull('lower_edges', targets=0) |
|
31 | 31 | hist_array = numpy.array(hists).reshape(nengines, -1) |
|
32 | 32 | # hist_array.shape = (nengines,-1) |
|
33 | 33 | total_hist = numpy.sum(hist_array, 0) |
|
34 | 34 | if normed: |
|
35 | 35 | total_hist = total_hist/numpy.sum(total_hist,dtype=float) |
|
36 | 36 | return total_hist, lower_edges |
|
37 | 37 | |
|
38 | 38 | |
|
39 | 39 | |
|
40 | 40 |
@@ -1,70 +1,71 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """Test the performance of the task farming system. |
|
3 | 3 | |
|
4 | 4 | This script submits a set of tasks via a LoadBalancedView. The tasks |
|
5 | 5 | are basically just a time.sleep(t), where t is a random number between |
|
6 | 6 | two limits that can be configured at the command line. To run |
|
7 | 7 | the script there must first be an IPython controller and engines running:: |
|
8 | 8 | |
|
9 | 9 | ipclusterz start -n 16 |
|
10 | 10 | |
|
11 | 11 | A good test to run with 16 engines is:: |
|
12 | 12 | |
|
13 | 13 | python task_profiler.py -n 128 -t 0.01 -T 1.0 |
|
14 | 14 | |
|
15 | 15 | This should show a speedup of 13-14x. The limitation here is that the |
|
16 | 16 | overhead of a single task is about 0.001-0.01 seconds. |
|
17 | 17 | """ |
|
18 | 18 | import random, sys |
|
19 | 19 | from optparse import OptionParser |
|
20 | 20 | |
|
21 | 21 | from IPython.utils.timing import time |
|
22 | 22 | from IPython.parallel import Client |
|
23 | 23 | |
|
24 | 24 | def main(): |
|
25 | 25 | parser = OptionParser() |
|
26 | 26 | parser.set_defaults(n=100) |
|
27 | parser.set_defaults(tmin=1) | |
|
28 |
parser.set_defaults(tmax= |
|
|
27 | parser.set_defaults(tmin=1e-3) | |
|
28 | parser.set_defaults(tmax=1) | |
|
29 | 29 | parser.set_defaults(profile='default') |
|
30 | 30 | |
|
31 | 31 | parser.add_option("-n", type='int', dest='n', |
|
32 | 32 | help='the number of tasks to run') |
|
33 | 33 | parser.add_option("-t", type='float', dest='tmin', |
|
34 | 34 | help='the minimum task length in seconds') |
|
35 | 35 | parser.add_option("-T", type='float', dest='tmax', |
|
36 | 36 | help='the maximum task length in seconds') |
|
37 | 37 | parser.add_option("-p", '--profile', type='str', dest='profile', |
|
38 | 38 | help="the cluster profile [default: 'default']") |
|
39 | 39 | |
|
40 | 40 | (opts, args) = parser.parse_args() |
|
41 | 41 | assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin" |
|
42 | 42 | |
|
43 | 43 | rc = Client() |
|
44 | 44 | view = rc.load_balanced_view() |
|
45 | 45 | print view |
|
46 | 46 | rc.block=True |
|
47 | 47 | nengines = len(rc.ids) |
|
48 | rc[:].execute('from IPython.utils.timing import time') | |
|
48 | with rc[:].sync_imports(): | |
|
49 | from IPython.utils.timing import time | |
|
49 | 50 | |
|
50 | 51 | # the jobs should take a random time within a range |
|
51 | 52 | times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] |
|
52 | 53 | stime = sum(times) |
|
53 | 54 | |
|
54 | 55 | print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines) |
|
55 | 56 | time.sleep(1) |
|
56 | 57 | start = time.time() |
|
57 | 58 | amr = view.map(time.sleep, times) |
|
58 | 59 | amr.get() |
|
59 | 60 | stop = time.time() |
|
60 | 61 | |
|
61 | 62 | ptime = stop-start |
|
62 | 63 | scale = stime/ptime |
|
63 | 64 | |
|
64 | 65 | print "executed %.1f secs in %.1f secs"%(stime, ptime) |
|
65 | 66 | print "%.3fx parallel performance on %i engines"%(scale, nengines) |
|
66 | 67 | print "%.1f%% of theoretical max"%(100*scale/nengines) |
|
67 | 68 | |
|
68 | 69 | |
|
69 | 70 | if __name__ == '__main__': |
|
70 | 71 | main() |
General Comments 0
You need to be logged in to leave comments.
Login now