##// END OF EJS Templates
fixes issue with whoosh reindexing files that were removed or renamed
marcink -
r1711:b369bec5 beta
parent child Browse files
Show More
@@ -1,237 +1,238 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.indexers.daemon
3 rhodecode.lib.indexers.daemon
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 A daemon will read from task table and run tasks
6 A daemon will read from task table and run tasks
7
7
8 :created_on: Jan 26, 2010
8 :created_on: Jan 26, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import logging
28 import logging
29 import traceback
29 import traceback
30
30
31 from shutil import rmtree
31 from shutil import rmtree
32 from time import mktime
32 from time import mktime
33
33
34 from os.path import dirname as dn
34 from os.path import dirname as dn
35 from os.path import join as jn
35 from os.path import join as jn
36
36
37 #to get the rhodecode import
37 #to get the rhodecode import
38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
39 sys.path.append(project_path)
39 sys.path.append(project_path)
40
40
41
41
42 from rhodecode.model.scm import ScmModel
42 from rhodecode.model.scm import ScmModel
43 from rhodecode.lib import safe_unicode
43 from rhodecode.lib import safe_unicode
44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
45
45
46 from vcs.exceptions import ChangesetError, RepositoryError
46 from vcs.exceptions import ChangesetError, RepositoryError, \
47 NodeDoesNotExistError
47
48
48 from whoosh.index import create_in, open_dir
49 from whoosh.index import create_in, open_dir
49
50
50
51
51
52
52 log = logging.getLogger('whooshIndexer')
53 log = logging.getLogger('whooshIndexer')
53 # create logger
54 # create logger
54 log.setLevel(logging.DEBUG)
55 log.setLevel(logging.DEBUG)
55 log.propagate = False
56 log.propagate = False
56 # create console handler and set level to debug
57 # create console handler and set level to debug
57 ch = logging.StreamHandler()
58 ch = logging.StreamHandler()
58 ch.setLevel(logging.DEBUG)
59 ch.setLevel(logging.DEBUG)
59
60
60 # create formatter
61 # create formatter
61 formatter = logging.Formatter("%(asctime)s - %(name)s -"
62 formatter = logging.Formatter("%(asctime)s - %(name)s -"
62 " %(levelname)s - %(message)s")
63 " %(levelname)s - %(message)s")
63
64
64 # add formatter to ch
65 # add formatter to ch
65 ch.setFormatter(formatter)
66 ch.setFormatter(formatter)
66
67
67 # add ch to logger
68 # add ch to logger
68 log.addHandler(ch)
69 log.addHandler(ch)
69
70
70 class WhooshIndexingDaemon(object):
71 class WhooshIndexingDaemon(object):
71 """
72 """
72 Daemon for atomic jobs
73 Daemon for atomic jobs
73 """
74 """
74
75
75 def __init__(self, indexname='HG_INDEX', index_location=None,
76 def __init__(self, indexname='HG_INDEX', index_location=None,
76 repo_location=None, sa=None, repo_list=None):
77 repo_location=None, sa=None, repo_list=None):
77 self.indexname = indexname
78 self.indexname = indexname
78
79
79 self.index_location = index_location
80 self.index_location = index_location
80 if not index_location:
81 if not index_location:
81 raise Exception('You have to provide index location')
82 raise Exception('You have to provide index location')
82
83
83 self.repo_location = repo_location
84 self.repo_location = repo_location
84 if not repo_location:
85 if not repo_location:
85 raise Exception('You have to provide repositories location')
86 raise Exception('You have to provide repositories location')
86
87
87 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
88 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
88
89
89 if repo_list:
90 if repo_list:
90 filtered_repo_paths = {}
91 filtered_repo_paths = {}
91 for repo_name, repo in self.repo_paths.items():
92 for repo_name, repo in self.repo_paths.items():
92 if repo_name in repo_list:
93 if repo_name in repo_list:
93 filtered_repo_paths[repo_name] = repo
94 filtered_repo_paths[repo_name] = repo
94
95
95 self.repo_paths = filtered_repo_paths
96 self.repo_paths = filtered_repo_paths
96
97
97
98
98 self.initial = False
99 self.initial = False
99 if not os.path.isdir(self.index_location):
100 if not os.path.isdir(self.index_location):
100 os.makedirs(self.index_location)
101 os.makedirs(self.index_location)
101 log.info('Cannot run incremental index since it does not'
102 log.info('Cannot run incremental index since it does not'
102 ' yet exist running full build')
103 ' yet exist running full build')
103 self.initial = True
104 self.initial = True
104
105
105 def get_paths(self, repo):
106 def get_paths(self, repo):
106 """recursive walk in root dir and return a set of all path in that dir
107 """recursive walk in root dir and return a set of all path in that dir
107 based on repository walk function
108 based on repository walk function
108 """
109 """
109 index_paths_ = set()
110 index_paths_ = set()
110 try:
111 try:
111 tip = repo.get_changeset('tip')
112 tip = repo.get_changeset('tip')
112 for topnode, dirs, files in tip.walk('/'):
113 for topnode, dirs, files in tip.walk('/'):
113 for f in files:
114 for f in files:
114 index_paths_.add(jn(repo.path, f.path))
115 index_paths_.add(jn(repo.path, f.path))
115
116
116 except RepositoryError, e:
117 except RepositoryError, e:
117 log.debug(traceback.format_exc())
118 log.debug(traceback.format_exc())
118 pass
119 pass
119 return index_paths_
120 return index_paths_
120
121
121 def get_node(self, repo, path):
122 def get_node(self, repo, path):
122 n_path = path[len(repo.path) + 1:]
123 n_path = path[len(repo.path) + 1:]
123 node = repo.get_changeset().get_node(n_path)
124 node = repo.get_changeset().get_node(n_path)
124 return node
125 return node
125
126
126 def get_node_mtime(self, node):
127 def get_node_mtime(self, node):
127 return mktime(node.last_changeset.date.timetuple())
128 return mktime(node.last_changeset.date.timetuple())
128
129
129 def add_doc(self, writer, path, repo, repo_name):
130 def add_doc(self, writer, path, repo, repo_name):
130 """Adding doc to writer this function itself fetches data from
131 """Adding doc to writer this function itself fetches data from
131 the instance of vcs backend"""
132 the instance of vcs backend"""
132 node = self.get_node(repo, path)
133 node = self.get_node(repo, path)
133
134
134 #we just index the content of chosen files, and skip binary files
135 #we just index the content of chosen files, and skip binary files
135 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
136 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
136
137
137 u_content = node.content
138 u_content = node.content
138 if not isinstance(u_content, unicode):
139 if not isinstance(u_content, unicode):
139 log.warning(' >> %s Could not get this content as unicode '
140 log.warning(' >> %s Could not get this content as unicode '
140 'replacing with empty content', path)
141 'replacing with empty content', path)
141 u_content = u''
142 u_content = u''
142 else:
143 else:
143 log.debug(' >> %s [WITH CONTENT]' % path)
144 log.debug(' >> %s [WITH CONTENT]' % path)
144
145
145 else:
146 else:
146 log.debug(' >> %s' % path)
147 log.debug(' >> %s' % path)
147 #just index file name without it's content
148 #just index file name without it's content
148 u_content = u''
149 u_content = u''
149
150
150 writer.add_document(owner=unicode(repo.contact),
151 writer.add_document(owner=unicode(repo.contact),
151 repository=safe_unicode(repo_name),
152 repository=safe_unicode(repo_name),
152 path=safe_unicode(path),
153 path=safe_unicode(path),
153 content=u_content,
154 content=u_content,
154 modtime=self.get_node_mtime(node),
155 modtime=self.get_node_mtime(node),
155 extension=node.extension)
156 extension=node.extension)
156
157
157
158
158 def build_index(self):
159 def build_index(self):
159 if os.path.exists(self.index_location):
160 if os.path.exists(self.index_location):
160 log.debug('removing previous index')
161 log.debug('removing previous index')
161 rmtree(self.index_location)
162 rmtree(self.index_location)
162
163
163 if not os.path.exists(self.index_location):
164 if not os.path.exists(self.index_location):
164 os.mkdir(self.index_location)
165 os.mkdir(self.index_location)
165
166
166 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
167 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
167 writer = idx.writer()
168 writer = idx.writer()
168
169
169 for repo_name, repo in self.repo_paths.items():
170 for repo_name, repo in self.repo_paths.items():
170 log.debug('building index @ %s' % repo.path)
171 log.debug('building index @ %s' % repo.path)
171
172
172 for idx_path in self.get_paths(repo):
173 for idx_path in self.get_paths(repo):
173 self.add_doc(writer, idx_path, repo, repo_name)
174 self.add_doc(writer, idx_path, repo, repo_name)
174
175
175 log.debug('>> COMMITING CHANGES <<')
176 log.debug('>> COMMITING CHANGES <<')
176 writer.commit(merge=True)
177 writer.commit(merge=True)
177 log.debug('>>> FINISHED BUILDING INDEX <<<')
178 log.debug('>>> FINISHED BUILDING INDEX <<<')
178
179
179
180
180 def update_index(self):
181 def update_index(self):
181 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
182 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
182
183
183 idx = open_dir(self.index_location, indexname=self.indexname)
184 idx = open_dir(self.index_location, indexname=self.indexname)
184 # The set of all paths in the index
185 # The set of all paths in the index
185 indexed_paths = set()
186 indexed_paths = set()
186 # The set of all paths we need to re-index
187 # The set of all paths we need to re-index
187 to_index = set()
188 to_index = set()
188
189
189 reader = idx.reader()
190 reader = idx.reader()
190 writer = idx.writer()
191 writer = idx.writer()
191
192
192 # Loop over the stored fields in the index
193 # Loop over the stored fields in the index
193 for fields in reader.all_stored_fields():
194 for fields in reader.all_stored_fields():
194 indexed_path = fields['path']
195 indexed_path = fields['path']
195 indexed_paths.add(indexed_path)
196 indexed_paths.add(indexed_path)
196
197
197 repo = self.repo_paths[fields['repository']]
198 repo = self.repo_paths[fields['repository']]
198
199
199 try:
200 try:
200 node = self.get_node(repo, indexed_path)
201 node = self.get_node(repo, indexed_path)
201 except ChangesetError:
202 except (ChangesetError, NodeDoesNotExistError):
202 # This file was deleted since it was indexed
203 # This file was deleted since it was indexed
203 log.debug('removing from index %s' % indexed_path)
204 log.debug('removing from index %s' % indexed_path)
204 writer.delete_by_term('path', indexed_path)
205 writer.delete_by_term('path', indexed_path)
205
206
206 else:
207 else:
207 # Check if this file was changed since it was indexed
208 # Check if this file was changed since it was indexed
208 indexed_time = fields['modtime']
209 indexed_time = fields['modtime']
209 mtime = self.get_node_mtime(node)
210 mtime = self.get_node_mtime(node)
210 if mtime > indexed_time:
211 if mtime > indexed_time:
211 # The file has changed, delete it and add it to the list of
212 # The file has changed, delete it and add it to the list of
212 # files to reindex
213 # files to reindex
213 log.debug('adding to reindex list %s' % indexed_path)
214 log.debug('adding to reindex list %s' % indexed_path)
214 writer.delete_by_term('path', indexed_path)
215 writer.delete_by_term('path', indexed_path)
215 to_index.add(indexed_path)
216 to_index.add(indexed_path)
216
217
217 # Loop over the files in the filesystem
218 # Loop over the files in the filesystem
218 # Assume we have a function that gathers the filenames of the
219 # Assume we have a function that gathers the filenames of the
219 # documents to be indexed
220 # documents to be indexed
220 for repo_name, repo in self.repo_paths.items():
221 for repo_name, repo in self.repo_paths.items():
221 for path in self.get_paths(repo):
222 for path in self.get_paths(repo):
222 if path in to_index or path not in indexed_paths:
223 if path in to_index or path not in indexed_paths:
223 # This is either a file that's changed, or a new file
224 # This is either a file that's changed, or a new file
224 # that wasn't indexed before. So index it!
225 # that wasn't indexed before. So index it!
225 self.add_doc(writer, path, repo, repo_name)
226 self.add_doc(writer, path, repo, repo_name)
226 log.debug('re indexing %s' % path)
227 log.debug('re indexing %s' % path)
227
228
228 log.debug('>> COMMITING CHANGES <<')
229 log.debug('>> COMMITING CHANGES <<')
229 writer.commit(merge=True)
230 writer.commit(merge=True)
230 log.debug('>>> FINISHED REBUILDING INDEX <<<')
231 log.debug('>>> FINISHED REBUILDING INDEX <<<')
231
232
232 def run(self, full_index=False):
233 def run(self, full_index=False):
233 """Run daemon"""
234 """Run daemon"""
234 if full_index or self.initial:
235 if full_index or self.initial:
235 self.build_index()
236 self.build_index()
236 else:
237 else:
237 self.update_index()
238 self.update_index()
General Comments 0
You need to be logged in to leave comments. Login now