daemon.py
390 lines
| 15.0 KiB
| text/x-python
|
PythonLexer
r885 | # -*- coding: utf-8 -*- | |||
""" | ||||
rhodecode.lib.indexers.daemon | ||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
r1377 | A daemon will read from task table and run tasks | |||
r947 | ||||
r885 | :created_on: Jan 26, 2010 | |||
:author: marcink | ||||
r1824 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> | |||
r885 | :license: GPLv3, see COPYING for more details. | |||
""" | ||||
r1206 | # This program is free software: you can redistribute it and/or modify | |||
# it under the terms of the GNU General Public License as published by | ||||
# the Free Software Foundation, either version 3 of the License, or | ||||
# (at your option) any later version. | ||||
r947 | # | |||
r547 | # This program is distributed in the hope that it will be useful, | |||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
r947 | # | |||
r547 | # You should have received a copy of the GNU General Public License | |||
r1206 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
Indra Talip
|
r2641 | from __future__ import with_statement | ||
r547 | ||||
r1154 | import os | |||
r547 | import sys | |||
r1154 | import logging | |||
r885 | import traceback | |||
r1154 | ||||
from shutil import rmtree | ||||
from time import mktime | ||||
r547 | from os.path import dirname as dn | |||
from os.path import join as jn | ||||
#to get the rhodecode import | ||||
project_path = dn(dn(dn(dn(os.path.realpath(__file__))))) | ||||
sys.path.append(project_path) | ||||
r2109 | from rhodecode.config.conf import INDEX_EXTENSIONS | |||
r691 | from rhodecode.model.scm import ScmModel | |||
r2109 | from rhodecode.lib.utils2 import safe_unicode | |||
Indra Talip
|
r2640 | from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, CHGSET_IDX_NAME | ||
r547 | ||||
r2007 | from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \ | |||
r1711 | NodeDoesNotExistError | |||
r560 | ||||
Indra Talip
|
r2640 | from whoosh.index import create_in, open_dir, exists_in | ||
from whoosh.query import * | ||||
from whoosh.qparser import QueryParser | ||||
r1154 | ||||
r2101 | log = logging.getLogger('whoosh_indexer') | |||
r547 | ||||
r1995 | ||||
r547 | class WhooshIndexingDaemon(object): | |||
r560 | """ | |||
r2373 | Daemon for atomic indexing jobs | |||
r560 | """ | |||
r547 | ||||
r1995 | def __init__(self, indexname=IDX_NAME, index_location=None, | |||
r2373 | repo_location=None, sa=None, repo_list=None, | |||
repo_update_list=None): | ||||
r547 | self.indexname = indexname | |||
r631 | ||||
self.index_location = index_location | ||||
if not index_location: | ||||
raise Exception('You have to provide index location') | ||||
r547 | self.repo_location = repo_location | |||
r631 | if not repo_location: | |||
raise Exception('You have to provide repositories location') | ||||
r1036 | self.repo_paths = ScmModel(sa).repo_scan(self.repo_location) | |||
r894 | ||||
r2373 | #filter repo list | |||
r894 | if repo_list: | |||
r2373 | self.filtered_repo_paths = {} | |||
r894 | for repo_name, repo in self.repo_paths.items(): | |||
if repo_name in repo_list: | ||||
r2373 | self.filtered_repo_paths[repo_name] = repo | |||
self.repo_paths = self.filtered_repo_paths | ||||
r894 | ||||
r2373 | #filter update repo list | |||
self.filtered_repo_update_paths = {} | ||||
if repo_update_list: | ||||
self.filtered_repo_update_paths = {} | ||||
for repo_name, repo in self.repo_paths.items(): | ||||
if repo_name in repo_update_list: | ||||
self.filtered_repo_update_paths[repo_name] = repo | ||||
self.repo_paths = self.filtered_repo_update_paths | ||||
r894 | ||||
Indra Talip
|
r2640 | self.initial = True | ||
r631 | if not os.path.isdir(self.index_location): | |||
r763 | os.makedirs(self.index_location) | |||
r547 | log.info('Cannot run incremental index since it does not' | |||
' yet exist running full build') | ||||
Indra Talip
|
r2640 | elif not exists_in(self.index_location, IDX_NAME): | ||
log.info('Running full index build as the file content' | ||||
' index does not exist') | ||||
elif not exists_in(self.index_location, CHGSET_IDX_NAME): | ||||
log.info('Running full index build as the changeset' | ||||
' index does not exist') | ||||
else: | ||||
self.initial = False | ||||
r631 | ||||
r561 | def get_paths(self, repo): | |||
r2101 | """ | |||
recursive walk in root dir and return a set of all path in that dir | ||||
r560 | based on repository walk function | |||
""" | ||||
r547 | index_paths_ = set() | |||
r567 | try: | |||
r947 | tip = repo.get_changeset('tip') | |||
for topnode, dirs, files in tip.walk('/'): | ||||
r547 | for f in files: | |||
r561 | index_paths_.add(jn(repo.path, f.path)) | |||
r631 | ||||
r885 | except RepositoryError, e: | |||
log.debug(traceback.format_exc()) | ||||
r567 | pass | |||
r631 | return index_paths_ | |||
r561 | def get_node(self, repo, path): | |||
n_path = path[len(repo.path) + 1:] | ||||
node = repo.get_changeset().get_node(n_path) | ||||
return node | ||||
r631 | ||||
r561 | def get_node_mtime(self, node): | |||
return mktime(node.last_changeset.date.timetuple()) | ||||
r631 | ||||
r1171 | def add_doc(self, writer, path, repo, repo_name): | |||
r2101 | """ | |||
Adding doc to writer this function itself fetches data from | ||||
the instance of vcs backend | ||||
""" | ||||
r561 | node = self.get_node(repo, path) | |||
r2109 | indexed = indexed_w_content = 0 | |||
r2101 | # we just index the content of chosen files, and skip binary files | |||
r886 | if node.extension in INDEX_EXTENSIONS and not node.is_binary: | |||
r560 | u_content = node.content | |||
r885 | if not isinstance(u_content, unicode): | |||
log.warning(' >> %s Could not get this content as unicode ' | ||||
r2101 | 'replacing with empty content' % path) | |||
r885 | u_content = u'' | |||
else: | ||||
log.debug(' >> %s [WITH CONTENT]' % path) | ||||
r2109 | indexed_w_content += 1 | |||
r885 | ||||
r547 | else: | |||
log.debug(' >> %s' % path) | ||||
r2101 | # just index file name without it's content | |||
r547 | u_content = u'' | |||
r2109 | indexed += 1 | |||
r631 | ||||
r2388 | p = safe_unicode(path) | |||
r2101 | writer.add_document( | |||
r2388 | fileid=p, | |||
r2101 | owner=unicode(repo.contact), | |||
repository=safe_unicode(repo_name), | ||||
r2388 | path=p, | |||
r2101 | content=u_content, | |||
modtime=self.get_node_mtime(node), | ||||
extension=node.extension | ||||
) | ||||
r2109 | return indexed, indexed_w_content | |||
r631 | ||||
Indra Talip
|
r2643 | def index_changesets(self, writer, repo_name, repo, start_rev=None): | ||
Indra Talip
|
r2640 | """ | ||
Add all changeset in the vcs repo starting at start_rev | ||||
to the index writer | ||||
Indra Talip
|
r2643 | |||
:param writer: the whoosh index writer to add to | ||||
:param repo_name: name of the repository from whence the | ||||
changeset originates including the repository group | ||||
:param repo: the vcs repository instance to index changesets for, | ||||
the presumption is the repo has changesets to index | ||||
:param start_rev=None: the full sha id to start indexing from | ||||
if start_rev is None then index from the first changeset in | ||||
the repo | ||||
Indra Talip
|
r2640 | """ | ||
Indra Talip
|
r2643 | if start_rev is None: | ||
start_rev = repo[0].raw_id | ||||
log.debug('indexing changesets in %s starting at rev: %s' % (repo_name, start_rev)) | ||||
r631 | ||||
Indra Talip
|
r2640 | indexed=0 | ||
Indra Talip
|
r2643 | for cs in repo.get_changesets(start=start_rev): | ||
Indra Talip
|
r2640 | writer.add_document( | ||
Indra Talip
|
r2642 | raw_id=unicode(cs.raw_id), | ||
Indra Talip
|
r2640 | owner=unicode(repo.contact), | ||
repository=safe_unicode(repo_name), | ||||
author=cs.author, | ||||
message=cs.message, | ||||
last=cs.last, | ||||
added=u' '.join([node.path for node in cs.added]).lower(), | ||||
removed=u' '.join([node.path for node in cs.removed]).lower(), | ||||
changed=u' '.join([node.path for node in cs.changed]).lower(), | ||||
parents=u' '.join([cs.raw_id for cs in cs.parents]), | ||||
) | ||||
indexed += 1 | ||||
r631 | ||||
Indra Talip
|
r2640 | log.debug('indexed %d changesets for repo %s' % (indexed, repo_name)) | ||
def index_files(self, file_idx_writer, repo_name, repo): | ||||
i_cnt = iwc_cnt = 0 | ||||
log.debug('building index for [%s]' % repo.path) | ||||
for idx_path in self.get_paths(repo): | ||||
i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name) | ||||
i_cnt += i | ||||
iwc_cnt += iwc | ||||
log.debug('added %s files %s with content for repo %s' % (i_cnt + iwc_cnt, iwc_cnt, repo.path)) | ||||
def update_changeset_index(self): | ||||
idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME) | ||||
r2569 | ||||
Indra Talip
|
r2640 | with idx.searcher() as searcher: | ||
writer = idx.writer() | ||||
writer_is_dirty = False | ||||
try: | ||||
for repo_name, repo in self.repo_paths.items(): | ||||
# skip indexing if there aren't any revs in the repo | ||||
Indra Talip
|
r2643 | num_of_revs = len(repo) | ||
if num_of_revs < 1: | ||||
Indra Talip
|
r2640 | continue | ||
qp = QueryParser('repository', schema=CHGSETS_SCHEMA) | ||||
q = qp.parse(u"last:t AND %s" % repo_name) | ||||
Indra Talip
|
r2643 | results = searcher.search(q) | ||
Indra Talip
|
r2640 | |||
Indra Talip
|
r2643 | # default to scanning the entire repo | ||
Indra Talip
|
r2640 | last_rev = 0 | ||
Indra Talip
|
r2643 | start_id = None | ||
Indra Talip
|
r2640 | if len(results) > 0: | ||
Indra Talip
|
r2643 | # assuming that there is only one result, if not this | ||
# may require a full re-index. | ||||
start_id = results[0]['raw_id'] | ||||
last_rev = repo.get_changeset(revision=start_id).revision | ||||
r631 | ||||
Indra Talip
|
r2640 | # there are new changesets to index or a new repo to index | ||
Indra Talip
|
r2643 | if last_rev == 0 or num_of_revs > last_rev + 1: | ||
Indra Talip
|
r2640 | # delete the docs in the index for the previous last changeset(s) | ||
for hit in results: | ||||
Indra Talip
|
r2642 | q = qp.parse(u"last:t AND %s AND raw_id:%s" % | ||
(repo_name, hit['raw_id'])) | ||||
Indra Talip
|
r2640 | writer.delete_by_query(q) | ||
r631 | ||||
Indra Talip
|
r2640 | # index from the previous last changeset + all new ones | ||
Indra Talip
|
r2643 | self.index_changesets(writer, repo_name, repo, start_id) | ||
Indra Talip
|
r2640 | writer_is_dirty = True | ||
finally: | ||||
if writer_is_dirty: | ||||
log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<') | ||||
writer.commit(merge=True) | ||||
log.debug('>> COMMITTED CHANGES TO CHANGESET INDEX<<') | ||||
else: | ||||
writer.cancel | ||||
def update_file_index(self): | ||||
r2373 | log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s ' | |||
'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys())) | ||||
r631 | ||||
idx = open_dir(self.index_location, indexname=self.indexname) | ||||
r547 | # The set of all paths in the index | |||
indexed_paths = set() | ||||
# The set of all paths we need to re-index | ||||
to_index = set() | ||||
r631 | ||||
r547 | writer = idx.writer() | |||
Indra Talip
|
r2640 | writer_is_dirty = False | ||
try: | ||||
with idx.reader() as reader: | ||||
# Loop over the stored fields in the index | ||||
for fields in reader.all_stored_fields(): | ||||
indexed_path = fields['path'] | ||||
indexed_repo_path = fields['repository'] | ||||
indexed_paths.add(indexed_path) | ||||
if not indexed_repo_path in self.filtered_repo_update_paths: | ||||
continue | ||||
repo = self.repo_paths[indexed_repo_path] | ||||
try: | ||||
node = self.get_node(repo, indexed_path) | ||||
# Check if this file was changed since it was indexed | ||||
indexed_time = fields['modtime'] | ||||
mtime = self.get_node_mtime(node) | ||||
if mtime > indexed_time: | ||||
# The file has changed, delete it and add it to the list of | ||||
# files to reindex | ||||
log.debug('adding to reindex list %s mtime: %s vs %s' % ( | ||||
indexed_path, mtime, indexed_time) | ||||
) | ||||
writer.delete_by_term('fileid', indexed_path) | ||||
writer_is_dirty = True | ||||
to_index.add(indexed_path) | ||||
except (ChangesetError, NodeDoesNotExistError): | ||||
# This file was deleted since it was indexed | ||||
log.debug('removing from index %s' % indexed_path) | ||||
writer.delete_by_term('path', indexed_path) | ||||
writer_is_dirty = True | ||||
r631 | ||||
Indra Talip
|
r2640 | # Loop over the files in the filesystem | ||
# Assume we have a function that gathers the filenames of the | ||||
# documents to be indexed | ||||
ri_cnt_total = 0 # indexed | ||||
riwc_cnt_total = 0 # indexed with content | ||||
for repo_name, repo in self.repo_paths.items(): | ||||
# skip indexing if there aren't any revisions | ||||
if len(repo) < 1: | ||||
continue | ||||
ri_cnt = 0 # indexed | ||||
riwc_cnt = 0 # indexed with content | ||||
for path in self.get_paths(repo): | ||||
path = safe_unicode(path) | ||||
if path in to_index or path not in indexed_paths: | ||||
r631 | ||||
Indra Talip
|
r2640 | # This is either a file that's changed, or a new file | ||
# that wasn't indexed before. So index it! | ||||
i, iwc = self.add_doc(writer, path, repo, repo_name) | ||||
writer_is_dirty = True | ||||
log.debug('re indexing %s' % path) | ||||
ri_cnt += i | ||||
ri_cnt_total += 1 | ||||
riwc_cnt += iwc | ||||
riwc_cnt_total += iwc | ||||
log.debug('added %s files %s with content for repo %s' % ( | ||||
ri_cnt + riwc_cnt, riwc_cnt, repo.path) | ||||
) | ||||
log.debug('indexed %s files in total and %s with content' % ( | ||||
ri_cnt_total, riwc_cnt_total) | ||||
) | ||||
finally: | ||||
if writer_is_dirty: | ||||
log.debug('>> COMMITING CHANGES <<') | ||||
writer.commit(merge=True) | ||||
log.debug('>>> FINISHED REBUILDING INDEX <<<') | ||||
else: | ||||
writer.cancel() | ||||
def build_indexes(self): | ||||
if os.path.exists(self.index_location): | ||||
log.debug('removing previous index') | ||||
rmtree(self.index_location) | ||||
if not os.path.exists(self.index_location): | ||||
os.mkdir(self.index_location) | ||||
chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA, indexname=CHGSET_IDX_NAME) | ||||
chgset_idx_writer = chgset_idx.writer() | ||||
file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME) | ||||
file_idx_writer = file_idx.writer() | ||||
log.debug('BUILDING INDEX FOR EXTENSIONS %s ' | ||||
'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys())) | ||||
for repo_name, repo in self.repo_paths.items(): | ||||
# skip indexing if there aren't any revisions | ||||
if len(repo) < 1: | ||||
r2373 | continue | |||
Indra Talip
|
r2640 | self.index_files(file_idx_writer, repo_name, repo) | ||
self.index_changesets(chgset_idx_writer, repo_name, repo) | ||||
r631 | ||||
Indra Talip
|
r2640 | log.debug('>> COMMITING CHANGES <<') | ||
file_idx_writer.commit(merge=True) | ||||
chgset_idx_writer.commit(merge=True) | ||||
log.debug('>>> FINISHED BUILDING INDEX <<<') | ||||
r2388 | ||||
Indra Talip
|
r2640 | def update_indexes(self): | ||
self.update_file_index() | ||||
self.update_changeset_index() | ||||
r631 | ||||
r547 | def run(self, full_index=False): | |||
"""Run daemon""" | ||||
if full_index or self.initial: | ||||
Indra Talip
|
r2640 | self.build_indexes() | ||
r547 | else: | |||
Indra Talip
|
r2640 | self.update_indexes() | ||