from multiprocessing import Process, Queue, cpu_count, Lock import socket, sys import time import os import sys from os.path import dirname as dn from multiprocessing.dummy import current_process from shutil import rmtree sys.path.append(dn(dn(dn(os.path.realpath(__file__))))) from pylons_app.model.hg_model import HgModel from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter from whoosh.fields import TEXT, ID, STORED, Schema from whoosh.index import create_in, open_dir from datetime import datetime from multiprocessing.process import current_process from multiprocessing import Array, Value root = dn(dn(os.path.dirname(os.path.abspath(__file__)))) idx_location = os.path.join(root, 'data', 'index') root_path = '/home/marcink/python_workspace_dirty/*' exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf', 'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll'] my_analyzer = RegexTokenizer() | LowercaseFilter() def scan_paths(root_location): return HgModel.repo_scan('/', root_location, None, True) def index_paths(root_dir): index_paths_ = set() for path, dirs, files in os.walk(root_dir): if path.find('.hg') == -1: #if path.find('.hg') == -1 and path.find('bel-epa') != -1: for f in files: index_paths_.add(os.path.join(path, f)) return index_paths_ def get_schema(): return Schema(owner=TEXT(), repository=TEXT(stored=True), path=ID(stored=True, unique=True), content=TEXT(stored=True, analyzer=my_analyzer), modtime=STORED()) def add_doc(writer, path, repo_name, contact): """ Adding doc to writer @param writer: @param path: @param repo: @param fname: """ #we don't won't to read excluded file extensions just index them if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions: fobj = open(path, 'rb') content = fobj.read() fobj.close() try: u_content = unicode(content) except UnicodeDecodeError: #incase we have a decode error just represent as byte string u_content = unicode(str(content).encode('string_escape')) else: u_content = u'' writer.add_document(repository=u"%s" % repo_name, owner=unicode(contact), path=u"%s" % path, content=u_content, modtime=os.path.getmtime(path)) class MultiProcessIndexer(object): """ multiprocessing whoosh indexer """ def __init__(self, idx, work_set=set(), nr_processes=cpu_count()): q = Queue() l = Lock() work_set = work_set writer = None #writer = idx.writer() for q_task in work_set: q.put(q_task) q.put('COMMIT') #to stop all processes we have to put STOP to queue and #break the loop for each process for _ in xrange(nr_processes): q.put('STOP') for _ in xrange(nr_processes): p = Process(target=self.work_func, args=(q, l, idx, writer)) p.start() def work_func(self, q, l, idx, writer): """ worker class invoked by process """ writer = idx.writer() while True: q_task = q.get() proc = current_process() # if q_task == 'COMMIT': # l.acquire() # sys.stdout.write('%s commiting and STOP\n' % proc._name) # writer.commit(merge=False) # l.release() # break # l.acquire() # writer = idx.writer() # l.release() if q_task == 'STOP': sys.stdout.write('%s STOP\n' % proc._name) break if q_task != 'COMMIT': l.acquire() sys.stdout.write(' >> %s %s %s @ ' % q_task) sys.stdout.write(' %s \n' % proc._name) l.release() add_doc(writer, q_task[0], q_task[1], q_task[2]) l.acquire() writer.commit(merge=True) l.release() if __name__ == "__main__": #build queue do = True if len(sys.argv) > 1 else False q_tasks = [] if os.path.exists(idx_location): rmtree(idx_location) if not os.path.exists(idx_location): os.mkdir(idx_location) idx = create_in(idx_location, get_schema() , indexname='HG_INDEX') if do: sys.stdout.write('Building queue...') for cnt, repo in enumerate(scan_paths(root_path).values()): if repo.name != 'evoice_py': continue q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)]) if cnt == 4: break sys.stdout.write('done\n') mpi = MultiProcessIndexer(idx, q_tasks) else: print 'checking index' reader = idx.reader() all = reader.all_stored_fields() #print all for fields in all: print fields['path']