Show More
@@ -1,89 +1,90 | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """Parallel word frequency counter. |
|
3 | 3 | |
|
4 | 4 | This only works for a local cluster, because the filenames are local paths. |
|
5 | 5 | """ |
|
6 | 6 | |
|
7 | 7 | |
|
8 | 8 | import os |
|
9 | 9 | import time |
|
10 | 10 | import urllib |
|
11 | 11 | |
|
12 | 12 | from itertools import repeat |
|
13 | 13 | |
|
14 | 14 | from wordfreq import print_wordfreq, wordfreq |
|
15 | 15 | |
|
16 | 16 | from IPython.parallel import Client, Reference |
|
17 | 17 | |
|
18 | 18 | from __future__ import division |
|
19 | 19 | |
|
20 | try : #python2 | |
|
21 | from urllib import urlretrieve | |
|
22 | except : #python3 | |
|
23 | from urllib.request import urlretrieve | |
|
24 | ||
|
20 | 25 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" |
|
21 | 26 | |
|
22 | 27 | def pwordfreq(view, fnames): |
|
23 | 28 | """Parallel word frequency counter. |
|
24 | 29 | |
|
25 | 30 | view - An IPython DirectView |
|
26 | 31 | fnames - The filenames containing the split data. |
|
27 | 32 | """ |
|
28 | 33 | assert len(fnames) == len(view.targets) |
|
29 | 34 | view.scatter('fname', fnames, flatten=True) |
|
30 | 35 | ar = view.apply(wordfreq, Reference('fname')) |
|
31 | 36 | freqs_list = ar.get() |
|
32 | 37 | word_set = set() |
|
33 | 38 | for f in freqs_list: |
|
34 | 39 | word_set.update(f.keys()) |
|
35 | 40 | freqs = dict(zip(word_set, repeat(0))) |
|
36 | 41 | for f in freqs_list: |
|
37 | 42 | for word, count in f.items(): |
|
38 | 43 | freqs[word] += count |
|
39 | 44 | return freqs |
|
40 | 45 | |
|
41 | 46 | if __name__ == '__main__': |
|
42 | 47 | # Create a Client and View |
|
43 | 48 | rc = Client() |
|
44 | 49 | |
|
45 | 50 | view = rc[:] |
|
46 | 51 | |
|
47 | 52 | if not os.path.exists('davinci.txt'): |
|
48 | 53 | # download from project gutenberg |
|
49 | 54 | print("Downloading Da Vinci's notebooks from Project Gutenberg") |
|
50 | try : #python2 | |
|
51 | urllib.urlretrieve(davinci_url, 'davinci.txt') | |
|
52 | except : #python3 | |
|
53 | import urllib.request | |
|
54 | urllib.request.urlretrieve(davinci_url, 'davinci.txt') | |
|
55 | urlretrieve(davinci_url, 'davinci.txt') | |
|
55 | 56 | |
|
56 | 57 | # Run the serial version |
|
57 | 58 | print("Serial word frequency count:") |
|
58 | 59 | text = open('davinci.txt').read() |
|
59 | 60 | tic = time.time() |
|
60 | 61 | freqs = wordfreq(text) |
|
61 | 62 | toc = time.time() |
|
62 | 63 | print_wordfreq(freqs, 10) |
|
63 |
print("Took %.3f s to calc |
|
|
64 | print("Took %.3f s to calculate"%(toc-tic)) | |
|
64 | 65 | |
|
65 | 66 | |
|
66 | 67 | # The parallel version |
|
67 | 68 | print("\nParallel word frequency count:") |
|
68 | 69 | # split the davinci.txt into one file per engine: |
|
69 | 70 | lines = text.splitlines() |
|
70 | 71 | nlines = len(lines) |
|
71 | 72 | n = len(rc) |
|
72 | 73 | block = nlines//n |
|
73 | 74 | for i in range(n): |
|
74 | 75 | chunk = lines[i*block:i*(block+1)] |
|
75 | 76 | with open('davinci%i.txt'%i, 'w') as f: |
|
76 | 77 | f.write('\n'.join(chunk)) |
|
77 | 78 | |
|
78 | 79 | try : #python2 |
|
79 | 80 | cwd = os.path.abspath(os.getcwdu()) |
|
80 | 81 | except : #python3 |
|
81 | 82 | cwd = os.path.abspath(os.getcwd()) |
|
82 | 83 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] |
|
83 | 84 | tic = time.time() |
|
84 | 85 | pfreqs = pwordfreq(view,fnames) |
|
85 | 86 | toc = time.time() |
|
86 | 87 | print_wordfreq(freqs) |
|
87 |
print("Took %.3f s to calc |
|
|
88 | print("Took %.3f s to calculate on %i engines"%(toc-tic, len(view.targets))) | |
|
88 | 89 | # cleanup split files |
|
89 | 90 | map(os.remove, fnames) |
General Comments 0
You need to be logged in to leave comments.
Login now