multiprocessing_indexer.py
176 lines
| 5.3 KiB
| text/x-python
|
PythonLexer
r406 | from multiprocessing import Process, Queue, cpu_count, Lock | |||
import socket, sys | ||||
import time | ||||
import os | ||||
import sys | ||||
from os.path import dirname as dn | ||||
from multiprocessing.dummy import current_process | ||||
from shutil import rmtree | ||||
sys.path.append(dn(dn(dn(os.path.realpath(__file__))))) | ||||
from pylons_app.model.hg_model import HgModel | ||||
from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter | ||||
from whoosh.fields import TEXT, ID, STORED, Schema | ||||
from whoosh.index import create_in, open_dir | ||||
from datetime import datetime | ||||
from multiprocessing.process import current_process | ||||
from multiprocessing import Array, Value | ||||
root = dn(dn(os.path.dirname(os.path.abspath(__file__)))) | ||||
idx_location = os.path.join(root, 'data', 'index') | ||||
root_path = '/home/marcink/python_workspace_dirty/*' | ||||
exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf', | ||||
'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll'] | ||||
my_analyzer = RegexTokenizer() | LowercaseFilter() | ||||
def scan_paths(root_location): | ||||
return HgModel.repo_scan('/', root_location, None, True) | ||||
def index_paths(root_dir): | ||||
index_paths_ = set() | ||||
for path, dirs, files in os.walk(root_dir): | ||||
if path.find('.hg') == -1: | ||||
#if path.find('.hg') == -1 and path.find('bel-epa') != -1: | ||||
for f in files: | ||||
index_paths_.add(os.path.join(path, f)) | ||||
return index_paths_ | ||||
def get_schema(): | ||||
return Schema(owner=TEXT(), | ||||
repository=TEXT(stored=True), | ||||
path=ID(stored=True, unique=True), | ||||
content=TEXT(stored=True, analyzer=my_analyzer), | ||||
modtime=STORED()) | ||||
def add_doc(writer, path, repo_name, contact): | ||||
""" | ||||
Adding doc to writer | ||||
@param writer: | ||||
@param path: | ||||
@param repo: | ||||
@param fname: | ||||
""" | ||||
#we don't won't to read excluded file extensions just index them | ||||
if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions: | ||||
fobj = open(path, 'rb') | ||||
content = fobj.read() | ||||
fobj.close() | ||||
try: | ||||
u_content = unicode(content) | ||||
except UnicodeDecodeError: | ||||
#incase we have a decode error just represent as byte string | ||||
u_content = unicode(str(content).encode('string_escape')) | ||||
else: | ||||
u_content = u'' | ||||
writer.add_document(repository=u"%s" % repo_name, | ||||
owner=unicode(contact), | ||||
path=u"%s" % path, | ||||
content=u_content, | ||||
modtime=os.path.getmtime(path)) | ||||
class MultiProcessIndexer(object): | ||||
""" multiprocessing whoosh indexer """ | ||||
def __init__(self, idx, work_set=set(), nr_processes=cpu_count()): | ||||
q = Queue() | ||||
l = Lock() | ||||
work_set = work_set | ||||
writer = None | ||||
#writer = idx.writer() | ||||
for q_task in work_set: | ||||
q.put(q_task) | ||||
q.put('COMMIT') | ||||
#to stop all processes we have to put STOP to queue and | ||||
#break the loop for each process | ||||
for _ in xrange(nr_processes): | ||||
q.put('STOP') | ||||
for _ in xrange(nr_processes): | ||||
p = Process(target=self.work_func, args=(q, l, idx, writer)) | ||||
p.start() | ||||
def work_func(self, q, l, idx, writer): | ||||
""" worker class invoked by process """ | ||||
writer = idx.writer() | ||||
while True: | ||||
q_task = q.get() | ||||
proc = current_process() | ||||
# if q_task == 'COMMIT': | ||||
# l.acquire() | ||||
# sys.stdout.write('%s commiting and STOP\n' % proc._name) | ||||
# writer.commit(merge=False) | ||||
# l.release() | ||||
# break | ||||
# l.acquire() | ||||
# writer = idx.writer() | ||||
# l.release() | ||||
if q_task == 'STOP': | ||||
sys.stdout.write('%s STOP\n' % proc._name) | ||||
break | ||||
if q_task != 'COMMIT': | ||||
l.acquire() | ||||
sys.stdout.write(' >> %s %s %s @ ' % q_task) | ||||
sys.stdout.write(' %s \n' % proc._name) | ||||
l.release() | ||||
add_doc(writer, q_task[0], q_task[1], q_task[2]) | ||||
l.acquire() | ||||
writer.commit(merge=True) | ||||
l.release() | ||||
if __name__ == "__main__": | ||||
#build queue | ||||
do = True if len(sys.argv) > 1 else False | ||||
q_tasks = [] | ||||
if os.path.exists(idx_location): | ||||
rmtree(idx_location) | ||||
if not os.path.exists(idx_location): | ||||
os.mkdir(idx_location) | ||||
idx = create_in(idx_location, get_schema() , indexname='HG_INDEX') | ||||
if do: | ||||
sys.stdout.write('Building queue...') | ||||
for cnt, repo in enumerate(scan_paths(root_path).values()): | ||||
if repo.name != 'evoice_py': | ||||
continue | ||||
q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)]) | ||||
if cnt == 4: | ||||
break | ||||
sys.stdout.write('done\n') | ||||
mpi = MultiProcessIndexer(idx, q_tasks) | ||||
else: | ||||
print 'checking index' | ||||
reader = idx.reader() | ||||
all = reader.all_stored_fields() | ||||
#print all | ||||
for fields in all: | ||||
print fields['path'] | ||||