Show More
@@ -1,80 +1,89 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """Parallel word frequency counter. |
|
2 | """Parallel word frequency counter. | |
3 |
|
3 | |||
4 | This only works for a local cluster, because the filenames are local paths. |
|
4 | This only works for a local cluster, because the filenames are local paths. | |
5 | """ |
|
5 | """ | |
6 |
|
6 | |||
7 |
|
7 | |||
8 | import os |
|
8 | import os | |
9 | import time |
|
9 | import time | |
10 | import urllib |
|
10 | import urllib | |
11 |
|
11 | |||
12 | from itertools import repeat |
|
12 | from itertools import repeat | |
13 |
|
13 | |||
14 | from wordfreq import print_wordfreq, wordfreq |
|
14 | from wordfreq import print_wordfreq, wordfreq | |
15 |
|
15 | |||
16 | from IPython.parallel import Client, Reference |
|
16 | from IPython.parallel import Client, Reference | |
17 |
|
17 | |||
|
18 | from __future__ import division | |||
|
19 | ||||
18 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" |
|
20 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" | |
19 |
|
21 | |||
20 | def pwordfreq(view, fnames): |
|
22 | def pwordfreq(view, fnames): | |
21 | """Parallel word frequency counter. |
|
23 | """Parallel word frequency counter. | |
22 |
|
24 | |||
23 | view - An IPython DirectView |
|
25 | view - An IPython DirectView | |
24 | fnames - The filenames containing the split data. |
|
26 | fnames - The filenames containing the split data. | |
25 | """ |
|
27 | """ | |
26 | assert len(fnames) == len(view.targets) |
|
28 | assert len(fnames) == len(view.targets) | |
27 | view.scatter('fname', fnames, flatten=True) |
|
29 | view.scatter('fname', fnames, flatten=True) | |
28 | ar = view.apply(wordfreq, Reference('fname')) |
|
30 | ar = view.apply(wordfreq, Reference('fname')) | |
29 | freqs_list = ar.get() |
|
31 | freqs_list = ar.get() | |
30 | word_set = set() |
|
32 | word_set = set() | |
31 | for f in freqs_list: |
|
33 | for f in freqs_list: | |
32 | word_set.update(f.keys()) |
|
34 | word_set.update(f.keys()) | |
33 | freqs = dict(zip(word_set, repeat(0))) |
|
35 | freqs = dict(zip(word_set, repeat(0))) | |
34 | for f in freqs_list: |
|
36 | for f in freqs_list: | |
35 |
for word, count in f. |
|
37 | for word, count in f.items(): | |
36 | freqs[word] += count |
|
38 | freqs[word] += count | |
37 | return freqs |
|
39 | return freqs | |
38 |
|
40 | |||
39 | if __name__ == '__main__': |
|
41 | if __name__ == '__main__': | |
40 | # Create a Client and View |
|
42 | # Create a Client and View | |
41 | rc = Client() |
|
43 | rc = Client() | |
42 |
|
44 | |||
43 | view = rc[:] |
|
45 | view = rc[:] | |
44 |
|
46 | |||
45 | if not os.path.exists('davinci.txt'): |
|
47 | if not os.path.exists('davinci.txt'): | |
46 | # download from project gutenberg |
|
48 | # download from project gutenberg | |
47 | print("Downloading Da Vinci's notebooks from Project Gutenberg") |
|
49 | print("Downloading Da Vinci's notebooks from Project Gutenberg") | |
48 | urllib.urlretrieve(davinci_url, 'davinci.txt') |
|
50 | try : #python2 | |
|
51 | urllib.urlretrieve(davinci_url, 'davinci.txt') | |||
|
52 | except : #python3 | |||
|
53 | import urllib.request | |||
|
54 | urllib.request.urlretrieve(davinci_url, 'davinci.txt') | |||
49 |
|
55 | |||
50 | # Run the serial version |
|
56 | # Run the serial version | |
51 | print("Serial word frequency count:") |
|
57 | print("Serial word frequency count:") | |
52 | text = open('davinci.txt').read() |
|
58 | text = open('davinci.txt').read() | |
53 | tic = time.time() |
|
59 | tic = time.time() | |
54 | freqs = wordfreq(text) |
|
60 | freqs = wordfreq(text) | |
55 | toc = time.time() |
|
61 | toc = time.time() | |
56 | print_wordfreq(freqs, 10) |
|
62 | print_wordfreq(freqs, 10) | |
57 | print("Took %.3f s to calcluate"%(toc-tic)) |
|
63 | print("Took %.3f s to calcluate"%(toc-tic)) | |
58 |
|
64 | |||
59 |
|
65 | |||
60 | # The parallel version |
|
66 | # The parallel version | |
61 | print("\nParallel word frequency count:") |
|
67 | print("\nParallel word frequency count:") | |
62 | # split the davinci.txt into one file per engine: |
|
68 | # split the davinci.txt into one file per engine: | |
63 | lines = text.splitlines() |
|
69 | lines = text.splitlines() | |
64 | nlines = len(lines) |
|
70 | nlines = len(lines) | |
65 | n = len(rc) |
|
71 | n = len(rc) | |
66 | block = nlines/n |
|
72 | block = nlines//n | |
67 | for i in range(n): |
|
73 | for i in range(n): | |
68 | chunk = lines[i*block:i*(block+1)] |
|
74 | chunk = lines[i*block:i*(block+1)] | |
69 | with open('davinci%i.txt'%i, 'w') as f: |
|
75 | with open('davinci%i.txt'%i, 'w') as f: | |
70 | f.write('\n'.join(chunk)) |
|
76 | f.write('\n'.join(chunk)) | |
71 |
|
77 | |||
72 | cwd = os.path.abspath(os.getcwdu()) |
|
78 | try : #python2 | |
|
79 | cwd = os.path.abspath(os.getcwdu()) | |||
|
80 | except : #python3 | |||
|
81 | cwd = os.path.abspath(os.getcwd()) | |||
73 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] |
|
82 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] | |
74 | tic = time.time() |
|
83 | tic = time.time() | |
75 | pfreqs = pwordfreq(view,fnames) |
|
84 | pfreqs = pwordfreq(view,fnames) | |
76 | toc = time.time() |
|
85 | toc = time.time() | |
77 | print_wordfreq(freqs) |
|
86 | print_wordfreq(freqs) | |
78 | print("Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets))) |
|
87 | print("Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets))) | |
79 | # cleanup split files |
|
88 | # cleanup split files | |
80 | map(os.remove, fnames) |
|
89 | map(os.remove, fnames) |
General Comments 0
You need to be logged in to leave comments.
Login now