pwordfreq.py
90 lines
| 2.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3670 | #!/usr/bin/env python | ||
"""Parallel word frequency counter. | ||||
This only works for a local cluster, because the filenames are local paths. | ||||
""" | ||||
import os | ||||
MinRK
|
r4184 | import time | ||
MinRK
|
r3670 | import urllib | ||
from itertools import repeat | ||||
from wordfreq import print_wordfreq, wordfreq | ||||
from IPython.parallel import Client, Reference | ||||
stonebig <stonebig>
|
r14201 | from __future__ import division | ||
stonebig <stonebig>
|
r14208 | try: #python2 | ||
stonebig <stonebig>
|
r14207 | from urllib import urlretrieve | ||
stonebig <stonebig>
|
r14208 | except ImportError: #python3 | ||
stonebig <stonebig>
|
r14207 | from urllib.request import urlretrieve | ||
MinRK
|
r3670 | davinci_url = "http://www.gutenberg.org/cache/epub/5000/pg5000.txt" | ||
def pwordfreq(view, fnames): | ||||
"""Parallel word frequency counter. | ||||
view - An IPython DirectView | ||||
fnames - The filenames containing the split data. | ||||
""" | ||||
assert len(fnames) == len(view.targets) | ||||
view.scatter('fname', fnames, flatten=True) | ||||
ar = view.apply(wordfreq, Reference('fname')) | ||||
freqs_list = ar.get() | ||||
word_set = set() | ||||
for f in freqs_list: | ||||
word_set.update(f.keys()) | ||||
freqs = dict(zip(word_set, repeat(0))) | ||||
for f in freqs_list: | ||||
stonebig <stonebig>
|
r14201 | for word, count in f.items(): | ||
MinRK
|
r3670 | freqs[word] += count | ||
return freqs | ||||
if __name__ == '__main__': | ||||
# Create a Client and View | ||||
rc = Client() | ||||
view = rc[:] | ||||
if not os.path.exists('davinci.txt'): | ||||
# download from project gutenberg | ||||
Thomas Kluyver
|
r6455 | print("Downloading Da Vinci's notebooks from Project Gutenberg") | ||
stonebig <stonebig>
|
r14207 | urlretrieve(davinci_url, 'davinci.txt') | ||
MinRK
|
r3670 | |||
# Run the serial version | ||||
Thomas Kluyver
|
r6455 | print("Serial word frequency count:") | ||
MinRK
|
r3670 | text = open('davinci.txt').read() | ||
MinRK
|
r4184 | tic = time.time() | ||
MinRK
|
r3670 | freqs = wordfreq(text) | ||
MinRK
|
r4184 | toc = time.time() | ||
MinRK
|
r3670 | print_wordfreq(freqs, 10) | ||
stonebig <stonebig>
|
r14207 | print("Took %.3f s to calculate"%(toc-tic)) | ||
MinRK
|
r3670 | |||
# The parallel version | ||||
Thomas Kluyver
|
r6455 | print("\nParallel word frequency count:") | ||
MinRK
|
r3670 | # split the davinci.txt into one file per engine: | ||
lines = text.splitlines() | ||||
nlines = len(lines) | ||||
n = len(rc) | ||||
stonebig <stonebig>
|
r14201 | block = nlines//n | ||
MinRK
|
r3670 | for i in range(n): | ||
chunk = lines[i*block:i*(block+1)] | ||||
with open('davinci%i.txt'%i, 'w') as f: | ||||
f.write('\n'.join(chunk)) | ||||
stonebig <stonebig>
|
r14208 | try: #python2 | ||
stonebig <stonebig>
|
r14201 | cwd = os.path.abspath(os.getcwdu()) | ||
stonebig <stonebig>
|
r14208 | except AttributeError: #python3 | ||
stonebig <stonebig>
|
r14201 | cwd = os.path.abspath(os.getcwd()) | ||
MinRK
|
r3670 | fnames = [ os.path.join(cwd, 'davinci%i.txt'%i) for i in range(n)] | ||
MinRK
|
r4184 | tic = time.time() | ||
MinRK
|
r3670 | pfreqs = pwordfreq(view,fnames) | ||
MinRK
|
r4184 | toc = time.time() | ||
MinRK
|
r3670 | print_wordfreq(freqs) | ||
stonebig <stonebig>
|
r14207 | print("Took %.3f s to calculate on %i engines"%(toc-tic, len(view.targets))) | ||
MinRK
|
r3670 | # cleanup split files | ||
map(os.remove, fnames) | ||||