##// END OF EJS Templates
Removed config names from whoosh and celery,...
Removed config names from whoosh and celery, celery is now configured based on the config name it's using on celeryconfig. And whoosh uses it's own logger configured just for whoosh Test creates a fresh whoosh index now, for more accurate checks fixed tests for searching

File last commit:

r406:b153a51b default
r483:a9e50dce celery
Show More
multiprocessing_indexer.py
176 lines | 5.3 KiB | text/x-python | PythonLexer
/ pylons_app / lib / indexers / multiprocessing_indexer.py
Implemented search using whoosh. Still as experimental option.
r406 from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys
import time
import os
import sys
from os.path import dirname as dn
from multiprocessing.dummy import current_process
from shutil import rmtree
sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
from pylons_app.model.hg_model import HgModel
from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
from whoosh.fields import TEXT, ID, STORED, Schema
from whoosh.index import create_in, open_dir
from datetime import datetime
from multiprocessing.process import current_process
from multiprocessing import Array, Value
root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
idx_location = os.path.join(root, 'data', 'index')
root_path = '/home/marcink/python_workspace_dirty/*'
exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
my_analyzer = RegexTokenizer() | LowercaseFilter()
def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
def index_paths(root_dir):
index_paths_ = set()
for path, dirs, files in os.walk(root_dir):
if path.find('.hg') == -1:
#if path.find('.hg') == -1 and path.find('bel-epa') != -1:
for f in files:
index_paths_.add(os.path.join(path, f))
return index_paths_
def get_schema():
return Schema(owner=TEXT(),
repository=TEXT(stored=True),
path=ID(stored=True, unique=True),
content=TEXT(stored=True, analyzer=my_analyzer),
modtime=STORED())
def add_doc(writer, path, repo_name, contact):
"""
Adding doc to writer
@param writer:
@param path:
@param repo:
@param fname:
"""
#we don't won't to read excluded file extensions just index them
if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
fobj = open(path, 'rb')
content = fobj.read()
fobj.close()
try:
u_content = unicode(content)
except UnicodeDecodeError:
#incase we have a decode error just represent as byte string
u_content = unicode(str(content).encode('string_escape'))
else:
u_content = u''
writer.add_document(repository=u"%s" % repo_name,
owner=unicode(contact),
path=u"%s" % path,
content=u_content,
modtime=os.path.getmtime(path))
class MultiProcessIndexer(object):
""" multiprocessing whoosh indexer """
def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
q = Queue()
l = Lock()
work_set = work_set
writer = None
#writer = idx.writer()
for q_task in work_set:
q.put(q_task)
q.put('COMMIT')
#to stop all processes we have to put STOP to queue and
#break the loop for each process
for _ in xrange(nr_processes):
q.put('STOP')
for _ in xrange(nr_processes):
p = Process(target=self.work_func, args=(q, l, idx, writer))
p.start()
def work_func(self, q, l, idx, writer):
""" worker class invoked by process """
writer = idx.writer()
while True:
q_task = q.get()
proc = current_process()
# if q_task == 'COMMIT':
# l.acquire()
# sys.stdout.write('%s commiting and STOP\n' % proc._name)
# writer.commit(merge=False)
# l.release()
# break
# l.acquire()
# writer = idx.writer()
# l.release()
if q_task == 'STOP':
sys.stdout.write('%s STOP\n' % proc._name)
break
if q_task != 'COMMIT':
l.acquire()
sys.stdout.write(' >> %s %s %s @ ' % q_task)
sys.stdout.write(' %s \n' % proc._name)
l.release()
add_doc(writer, q_task[0], q_task[1], q_task[2])
l.acquire()
writer.commit(merge=True)
l.release()
if __name__ == "__main__":
#build queue
do = True if len(sys.argv) > 1 else False
q_tasks = []
if os.path.exists(idx_location):
rmtree(idx_location)
if not os.path.exists(idx_location):
os.mkdir(idx_location)
idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
if do:
sys.stdout.write('Building queue...')
for cnt, repo in enumerate(scan_paths(root_path).values()):
if repo.name != 'evoice_py':
continue
q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
if cnt == 4:
break
sys.stdout.write('done\n')
mpi = MultiProcessIndexer(idx, q_tasks)
else:
print 'checking index'
reader = idx.reader()
all = reader.all_stored_fields()
#print all
for fields in all:
print fields['path']