##// END OF EJS Templates
removed pidlock from whoosh and added it as locked_task decorator
removed pidlock from whoosh and added it as locked_task decorator

File last commit:

r497:fb0c3af6 celery
r504:d280aa1c default
Show More
daemon.py
238 lines | 8.0 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
# encoding: utf-8
# whoosh indexer daemon for hg-app
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""
Created on Jan 26, 2010
@author: marcink
A deamon will read from task table and run tasks
"""
import sys
import os
from os.path import dirname as dn
from os.path import join as jn
#to get the pylons_app import
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
sys.path.append(project_path)
from pylons_app.lib.pidlock import LockHeld, DaemonLock
from pylons_app.model.hg_model import HgModel
from pylons_app.lib.helpers import safe_unicode
from whoosh.index import create_in, open_dir
from shutil import rmtree
from pylons_app.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
import logging
log = logging.getLogger('whooshIndexer')
# create logger
log.setLevel(logging.DEBUG)
log.propagate = False
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
class WhooshIndexingDaemon(object):
"""Deamon for atomic jobs"""
def __init__(self, indexname='HG_INDEX', repo_location=None):
self.indexname = indexname
self.repo_location = repo_location
self.initial = False
if not os.path.isdir(IDX_LOCATION):
os.mkdir(IDX_LOCATION)
log.info('Cannot run incremental index since it does not'
' yet exist running full build')
self.initial = True
def get_paths(self, root_dir):
"""recursive walk in root dir and return a set of all path in that dir
excluding files in .hg dir"""
index_paths_ = set()
for path, dirs, files in os.walk(root_dir):
if path.find('.hg') == -1:
for f in files:
index_paths_.add(jn(path, f))
return index_paths_
def add_doc(self, writer, path, repo):
"""Adding doc to writer"""
ext = unicode(path.split('/')[-1].split('.')[-1].lower())
#we just index the content of choosen files
if ext in INDEX_EXTENSIONS:
log.debug(' >> %s [WITH CONTENT]' % path)
fobj = open(path, 'rb')
content = fobj.read()
fobj.close()
u_content = safe_unicode(content)
else:
log.debug(' >> %s' % path)
#just index file name without it's content
u_content = u''
try:
os.stat(path)
writer.add_document(owner=unicode(repo.contact),
repository=u"%s" % repo.name,
path=u"%s" % path,
content=u_content,
modtime=os.path.getmtime(path),
extension=ext)
except OSError, e:
import errno
if e.errno == errno.ENOENT:
log.debug('path %s does not exist or is a broken symlink' % path)
else:
raise e
def build_index(self):
if os.path.exists(IDX_LOCATION):
log.debug('removing previos index')
rmtree(IDX_LOCATION)
if not os.path.exists(IDX_LOCATION):
os.mkdir(IDX_LOCATION)
idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
writer = idx.writer()
for cnt, repo in enumerate(scan_paths(self.repo_location).values()):
log.debug('building index @ %s' % repo.path)
for idx_path in self.get_paths(repo.path):
self.add_doc(writer, idx_path, repo)
writer.commit(merge=True)
log.debug('>>> FINISHED BUILDING INDEX <<<')
def update_index(self):
log.debug('STARTING INCREMENTAL INDEXING UPDATE')
idx = open_dir(IDX_LOCATION, indexname=self.indexname)
# The set of all paths in the index
indexed_paths = set()
# The set of all paths we need to re-index
to_index = set()
reader = idx.reader()
writer = idx.writer()
# Loop over the stored fields in the index
for fields in reader.all_stored_fields():
indexed_path = fields['path']
indexed_paths.add(indexed_path)
if not os.path.exists(indexed_path):
# This file was deleted since it was indexed
log.debug('removing from index %s' % indexed_path)
writer.delete_by_term('path', indexed_path)
else:
# Check if this file was changed since it
# was indexed
indexed_time = fields['modtime']
mtime = os.path.getmtime(indexed_path)
if mtime > indexed_time:
# The file has changed, delete it and add it to the list of
# files to reindex
log.debug('adding to reindex list %s' % indexed_path)
writer.delete_by_term('path', indexed_path)
to_index.add(indexed_path)
#writer.commit()
# Loop over the files in the filesystem
# Assume we have a function that gathers the filenames of the
# documents to be indexed
for repo in scan_paths(self.repo_location).values():
for path in self.get_paths(repo.path):
if path in to_index or path not in indexed_paths:
# This is either a file that's changed, or a new file
# that wasn't indexed before. So index it!
self.add_doc(writer, path, repo)
log.debug('reindexing %s' % path)
writer.commit(merge=True)
#idx.optimize()
log.debug('>>> FINISHED <<<')
def run(self, full_index=False):
"""Run daemon"""
if full_index or self.initial:
self.build_index()
else:
self.update_index()
if __name__ == "__main__":
arg = sys.argv[1:]
if len(arg) != 2:
sys.stderr.write('Please specify indexing type [full|incremental]'
'and path to repositories as script args \n')
sys.exit()
if arg[0] == 'full':
full_index = True
elif arg[0] == 'incremental':
# False means looking just for changes
full_index = False
else:
sys.stdout.write('Please use [full|incremental]'
' as script first arg \n')
sys.exit()
if not os.path.isdir(arg[1]):
sys.stderr.write('%s is not a valid path \n' % arg[1])
sys.exit()
else:
if arg[1].endswith('/'):
repo_location = arg[1] + '*'
else:
repo_location = arg[1] + '/*'
try:
l = DaemonLock()
WhooshIndexingDaemon(repo_location=repo_location)\
.run(full_index=full_index)
l.release()
reload(logging)
except LockHeld:
sys.exit(1)