##// END OF EJS Templates
removed pidlock from whoosh and added it as locked_task decorator
removed pidlock from whoosh and added it as locked_task decorator

File last commit:

r406:b153a51b default
r504:d280aa1c default
Show More
multiprocessing_indexer.py
176 lines | 5.3 KiB | text/x-python | PythonLexer
/ pylons_app / lib / indexers / multiprocessing_indexer.py
from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys
import time
import os
import sys
from os.path import dirname as dn
from multiprocessing.dummy import current_process
from shutil import rmtree
sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
from pylons_app.model.hg_model import HgModel
from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
from whoosh.fields import TEXT, ID, STORED, Schema
from whoosh.index import create_in, open_dir
from datetime import datetime
from multiprocessing.process import current_process
from multiprocessing import Array, Value
root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
idx_location = os.path.join(root, 'data', 'index')
root_path = '/home/marcink/python_workspace_dirty/*'
exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
my_analyzer = RegexTokenizer() | LowercaseFilter()
def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
def index_paths(root_dir):
index_paths_ = set()
for path, dirs, files in os.walk(root_dir):
if path.find('.hg') == -1:
#if path.find('.hg') == -1 and path.find('bel-epa') != -1:
for f in files:
index_paths_.add(os.path.join(path, f))
return index_paths_
def get_schema():
return Schema(owner=TEXT(),
repository=TEXT(stored=True),
path=ID(stored=True, unique=True),
content=TEXT(stored=True, analyzer=my_analyzer),
modtime=STORED())
def add_doc(writer, path, repo_name, contact):
"""
Adding doc to writer
@param writer:
@param path:
@param repo:
@param fname:
"""
#we don't won't to read excluded file extensions just index them
if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
fobj = open(path, 'rb')
content = fobj.read()
fobj.close()
try:
u_content = unicode(content)
except UnicodeDecodeError:
#incase we have a decode error just represent as byte string
u_content = unicode(str(content).encode('string_escape'))
else:
u_content = u''
writer.add_document(repository=u"%s" % repo_name,
owner=unicode(contact),
path=u"%s" % path,
content=u_content,
modtime=os.path.getmtime(path))
class MultiProcessIndexer(object):
""" multiprocessing whoosh indexer """
def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
q = Queue()
l = Lock()
work_set = work_set
writer = None
#writer = idx.writer()
for q_task in work_set:
q.put(q_task)
q.put('COMMIT')
#to stop all processes we have to put STOP to queue and
#break the loop for each process
for _ in xrange(nr_processes):
q.put('STOP')
for _ in xrange(nr_processes):
p = Process(target=self.work_func, args=(q, l, idx, writer))
p.start()
def work_func(self, q, l, idx, writer):
""" worker class invoked by process """
writer = idx.writer()
while True:
q_task = q.get()
proc = current_process()
# if q_task == 'COMMIT':
# l.acquire()
# sys.stdout.write('%s commiting and STOP\n' % proc._name)
# writer.commit(merge=False)
# l.release()
# break
# l.acquire()
# writer = idx.writer()
# l.release()
if q_task == 'STOP':
sys.stdout.write('%s STOP\n' % proc._name)
break
if q_task != 'COMMIT':
l.acquire()
sys.stdout.write(' >> %s %s %s @ ' % q_task)
sys.stdout.write(' %s \n' % proc._name)
l.release()
add_doc(writer, q_task[0], q_task[1], q_task[2])
l.acquire()
writer.commit(merge=True)
l.release()
if __name__ == "__main__":
#build queue
do = True if len(sys.argv) > 1 else False
q_tasks = []
if os.path.exists(idx_location):
rmtree(idx_location)
if not os.path.exists(idx_location):
os.mkdir(idx_location)
idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
if do:
sys.stdout.write('Building queue...')
for cnt, repo in enumerate(scan_paths(root_path).values()):
if repo.name != 'evoice_py':
continue
q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
if cnt == 4:
break
sys.stdout.write('done\n')
mpi = MultiProcessIndexer(idx, q_tasks)
else:
print 'checking index'
reader = idx.reader()
all = reader.all_stored_fields()
#print all
for fields in all:
print fields['path']