##// END OF EJS Templates
Implemented locking for task, to prevent for running the same tasks,...
Implemented locking for task, to prevent for running the same tasks, moved out pidlock library. Added dirsize display

File last commit:

r406:b153a51b default
r497:fb0c3af6 celery
Show More
multiprocessing_indexer.py
176 lines | 5.3 KiB | text/x-python | PythonLexer
/ pylons_app / lib / indexers / multiprocessing_indexer.py
from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys
import time
import os
import sys
from os.path import dirname as dn
from multiprocessing.dummy import current_process
from shutil import rmtree
sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
from pylons_app.model.hg_model import HgModel
from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
from whoosh.fields import TEXT, ID, STORED, Schema
from whoosh.index import create_in, open_dir
from datetime import datetime
from multiprocessing.process import current_process
from multiprocessing import Array, Value
root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
idx_location = os.path.join(root, 'data', 'index')
root_path = '/home/marcink/python_workspace_dirty/*'
exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
my_analyzer = RegexTokenizer() | LowercaseFilter()
def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
def index_paths(root_dir):
index_paths_ = set()
for path, dirs, files in os.walk(root_dir):
if path.find('.hg') == -1:
#if path.find('.hg') == -1 and path.find('bel-epa') != -1:
for f in files:
index_paths_.add(os.path.join(path, f))
return index_paths_
def get_schema():
return Schema(owner=TEXT(),
repository=TEXT(stored=True),
path=ID(stored=True, unique=True),
content=TEXT(stored=True, analyzer=my_analyzer),
modtime=STORED())
def add_doc(writer, path, repo_name, contact):
"""
Adding doc to writer
@param writer:
@param path:
@param repo:
@param fname:
"""
#we don't won't to read excluded file extensions just index them
if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
fobj = open(path, 'rb')
content = fobj.read()
fobj.close()
try:
u_content = unicode(content)
except UnicodeDecodeError:
#incase we have a decode error just represent as byte string
u_content = unicode(str(content).encode('string_escape'))
else:
u_content = u''
writer.add_document(repository=u"%s" % repo_name,
owner=unicode(contact),
path=u"%s" % path,
content=u_content,
modtime=os.path.getmtime(path))
class MultiProcessIndexer(object):
""" multiprocessing whoosh indexer """
def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
q = Queue()
l = Lock()
work_set = work_set
writer = None
#writer = idx.writer()
for q_task in work_set:
q.put(q_task)
q.put('COMMIT')
#to stop all processes we have to put STOP to queue and
#break the loop for each process
for _ in xrange(nr_processes):
q.put('STOP')
for _ in xrange(nr_processes):
p = Process(target=self.work_func, args=(q, l, idx, writer))
p.start()
def work_func(self, q, l, idx, writer):
""" worker class invoked by process """
writer = idx.writer()
while True:
q_task = q.get()
proc = current_process()
# if q_task == 'COMMIT':
# l.acquire()
# sys.stdout.write('%s commiting and STOP\n' % proc._name)
# writer.commit(merge=False)
# l.release()
# break
# l.acquire()
# writer = idx.writer()
# l.release()
if q_task == 'STOP':
sys.stdout.write('%s STOP\n' % proc._name)
break
if q_task != 'COMMIT':
l.acquire()
sys.stdout.write(' >> %s %s %s @ ' % q_task)
sys.stdout.write(' %s \n' % proc._name)
l.release()
add_doc(writer, q_task[0], q_task[1], q_task[2])
l.acquire()
writer.commit(merge=True)
l.release()
if __name__ == "__main__":
#build queue
do = True if len(sys.argv) > 1 else False
q_tasks = []
if os.path.exists(idx_location):
rmtree(idx_location)
if not os.path.exists(idx_location):
os.mkdir(idx_location)
idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
if do:
sys.stdout.write('Building queue...')
for cnt, repo in enumerate(scan_paths(root_path).values()):
if repo.name != 'evoice_py':
continue
q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
if cnt == 4:
break
sys.stdout.write('done\n')
mpi = MultiProcessIndexer(idx, q_tasks)
else:
print 'checking index'
reader = idx.reader()
all = reader.all_stored_fields()
#print all
for fields in all:
print fields['path']