pwordfreq.py
80 lines
| 2.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3670 | #!/usr/bin/env python | ||
"""Parallel word frequency counter. | ||||
This only works for a local cluster, because the filenames are local paths. | ||||
""" | ||||
import os | ||||
MinRK
|
r4184 | import time | ||
MinRK
|
r3670 | import urllib | ||
from itertools import repeat | ||||
from wordfreq import print_wordfreq, wordfreq | ||||
from IPython.parallel import Client, Reference | ||||
davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" | ||||
def pwordfreq(view, fnames): | ||||
"""Parallel word frequency counter. | ||||
view - An IPython DirectView | ||||
fnames - The filenames containing the split data. | ||||
""" | ||||
assert len(fnames) == len(view.targets) | ||||
view.scatter('fname', fnames, flatten=True) | ||||
ar = view.apply(wordfreq, Reference('fname')) | ||||
freqs_list = ar.get() | ||||
word_set = set() | ||||
for f in freqs_list: | ||||
word_set.update(f.keys()) | ||||
freqs = dict(zip(word_set, repeat(0))) | ||||
for f in freqs_list: | ||||
for word, count in f.iteritems(): | ||||
freqs[word] += count | ||||
return freqs | ||||
if __name__ == '__main__': | ||||
# Create a Client and View | ||||
rc = Client() | ||||
view = rc[:] | ||||
if not os.path.exists('davinci.txt'): | ||||
# download from project gutenberg | ||||
print "Downloading Da Vinci's notebooks from Project Gutenberg" | ||||
urllib.urlretrieve(davinci_url, 'davinci.txt') | ||||
# Run the serial version | ||||
print "Serial word frequency count:" | ||||
text = open('davinci.txt').read() | ||||
MinRK
|
r4184 | tic = time.time() | ||
MinRK
|
r3670 | freqs = wordfreq(text) | ||
MinRK
|
r4184 | toc = time.time() | ||
MinRK
|
r3670 | print_wordfreq(freqs, 10) | ||
MinRK
|
r4184 | print "Took %.3f s to calcluate"%(toc-tic) | ||
MinRK
|
r3670 | |||
# The parallel version | ||||
print "\nParallel word frequency count:" | ||||
# split the davinci.txt into one file per engine: | ||||
lines = text.splitlines() | ||||
nlines = len(lines) | ||||
n = len(rc) | ||||
block = nlines/n | ||||
for i in range(n): | ||||
chunk = lines[i*block:i*(block+1)] | ||||
with open('davinci%i.txt'%i, 'w') as f: | ||||
f.write('\n'.join(chunk)) | ||||
cwd = os.path.abspath(os.getcwd()) | ||||
fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] | ||||
MinRK
|
r4184 | tic = time.time() | ||
MinRK
|
r3670 | pfreqs = pwordfreq(view,fnames) | ||
MinRK
|
r4184 | toc = time.time() | ||
MinRK
|
r3670 | print_wordfreq(freqs) | ||
MinRK
|
r4184 | print "Took %.3f s to calcluate on %i engines"%(toc-tic, len(view.targets)) | ||
MinRK
|
r3670 | # cleanup split files | ||
map(os.remove, fnames) | ||||