##// END OF EJS Templates
fixed #850 Whoosh indexer should use the default revision flag to make index
marcink -
r3916:ba08786c beta
parent child Browse files
Show More
@@ -1,416 +1,430 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26
27 27 import os
28 28 import sys
29 29 import logging
30 30 import traceback
31 31
32 32 from shutil import rmtree
33 33 from time import mktime
34 34
35 35 from os.path import dirname as dn
36 36 from os.path import join as jn
37 37
38 38 #to get the rhodecode import
39 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
40 40 sys.path.append(project_path)
41 41
42 42 from rhodecode.config.conf import INDEX_EXTENSIONS
43 43 from rhodecode.model.scm import ScmModel
44 from rhodecode.model.db import Repository
44 45 from rhodecode.lib.utils2 import safe_unicode, safe_str
45 46 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
46 47 CHGSET_IDX_NAME
47 48
48 49 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
49 50 NodeDoesNotExistError
50 51
51 52 from whoosh.index import create_in, open_dir, exists_in
52 53 from whoosh.query import *
53 54 from whoosh.qparser import QueryParser
54 55
55 56 log = logging.getLogger('whoosh_indexer')
56 57
57 58
58 59 class WhooshIndexingDaemon(object):
59 60 """
60 61 Daemon for atomic indexing jobs
61 62 """
62 63
63 64 def __init__(self, indexname=IDX_NAME, index_location=None,
64 65 repo_location=None, sa=None, repo_list=None,
65 66 repo_update_list=None):
66 67 self.indexname = indexname
67 68
68 69 self.index_location = index_location
69 70 if not index_location:
70 71 raise Exception('You have to provide index location')
71 72
72 73 self.repo_location = repo_location
73 74 if not repo_location:
74 75 raise Exception('You have to provide repositories location')
75 76
76 77 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
77 78
78 79 #filter repo list
79 80 if repo_list:
80 81 #Fix non-ascii repo names to unicode
81 82 repo_list = map(safe_unicode, repo_list)
82 83 self.filtered_repo_paths = {}
83 84 for repo_name, repo in self.repo_paths.items():
84 85 if repo_name in repo_list:
85 86 self.filtered_repo_paths[repo_name] = repo
86 87
87 88 self.repo_paths = self.filtered_repo_paths
88 89
89 90 #filter update repo list
90 91 self.filtered_repo_update_paths = {}
91 92 if repo_update_list:
92 93 self.filtered_repo_update_paths = {}
93 94 for repo_name, repo in self.repo_paths.items():
94 95 if repo_name in repo_update_list:
95 96 self.filtered_repo_update_paths[repo_name] = repo
96 97 self.repo_paths = self.filtered_repo_update_paths
97 98
98 99 self.initial = True
99 100 if not os.path.isdir(self.index_location):
100 101 os.makedirs(self.index_location)
101 log.info('Cannot run incremental index since it does not'
102 ' yet exist running full build')
102 log.info('Cannot run incremental index since it does not '
103 'yet exist running full build')
103 104 elif not exists_in(self.index_location, IDX_NAME):
104 log.info('Running full index build as the file content'
105 ' index does not exist')
105 log.info('Running full index build as the file content '
106 'index does not exist')
106 107 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
107 log.info('Running full index build as the changeset'
108 ' index does not exist')
108 log.info('Running full index build as the changeset '
109 'index does not exist')
109 110 else:
110 111 self.initial = False
111 112
113 def _get_index_revision(self, repo):
114 db_repo = Repository.get_by_repo_name(repo.name)
115 landing_rev = 'tip'
116 if db_repo:
117 landing_rev = db_repo.landing_rev
118 return landing_rev
119
120 def _get_index_changeset(self, repo):
121 index_rev = self._get_index_revision(repo)
122 cs = repo.get_changeset(index_rev)
123 return cs
124
112 125 def get_paths(self, repo):
113 126 """
114 127 recursive walk in root dir and return a set of all path in that dir
115 128 based on repository walk function
116 129 """
117 130 index_paths_ = set()
118 131 try:
119 tip = repo.get_changeset('tip')
120 for _topnode, _dirs, files in tip.walk('/'):
132 cs = self._get_index_changeset(repo)
133 for _topnode, _dirs, files in cs.walk('/'):
121 134 for f in files:
122 135 index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))
123 136
124 137 except RepositoryError:
125 138 log.debug(traceback.format_exc())
126 139 pass
127 140 return index_paths_
128 141
129 142 def get_node(self, repo, path):
130 143 n_path = path[len(repo.path) + 1:]
131 node = repo.get_changeset().get_node(n_path)
144 cs = self._get_index_changeset(repo)
145 node = cs.get_node(n_path)
132 146 return node
133 147
134 148 def get_node_mtime(self, node):
135 149 return mktime(node.last_changeset.date.timetuple())
136 150
137 151 def add_doc(self, writer, path, repo, repo_name):
138 152 """
139 153 Adding doc to writer this function itself fetches data from
140 154 the instance of vcs backend
141 155 """
142 156
143 157 node = self.get_node(repo, path)
144 158 indexed = indexed_w_content = 0
145 159 # we just index the content of chosen files, and skip binary files
146 160 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
147 161 u_content = node.content
148 162 if not isinstance(u_content, unicode):
149 163 log.warning(' >> %s Could not get this content as unicode '
150 164 'replacing with empty content' % path)
151 165 u_content = u''
152 166 else:
153 167 log.debug(' >> %s [WITH CONTENT]' % path)
154 168 indexed_w_content += 1
155 169
156 170 else:
157 171 log.debug(' >> %s' % path)
158 172 # just index file name without it's content
159 173 u_content = u''
160 174 indexed += 1
161 175
162 176 p = safe_unicode(path)
163 177 writer.add_document(
164 178 fileid=p,
165 179 owner=unicode(repo.contact),
166 180 repository=safe_unicode(repo_name),
167 181 path=p,
168 182 content=u_content,
169 183 modtime=self.get_node_mtime(node),
170 184 extension=node.extension
171 185 )
172 186 return indexed, indexed_w_content
173 187
174 188 def index_changesets(self, writer, repo_name, repo, start_rev=None):
175 189 """
176 190 Add all changeset in the vcs repo starting at start_rev
177 191 to the index writer
178 192
179 193 :param writer: the whoosh index writer to add to
180 194 :param repo_name: name of the repository from whence the
181 195 changeset originates including the repository group
182 196 :param repo: the vcs repository instance to index changesets for,
183 197 the presumption is the repo has changesets to index
184 198 :param start_rev=None: the full sha id to start indexing from
185 199 if start_rev is None then index from the first changeset in
186 200 the repo
187 201 """
188 202
189 203 if start_rev is None:
190 204 start_rev = repo[0].raw_id
191 205
192 206 log.debug('indexing changesets in %s starting at rev: %s' %
193 207 (repo_name, start_rev))
194 208
195 209 indexed = 0
196 210 for cs in repo.get_changesets(start=start_rev):
197 211 log.debug(' >> %s' % cs)
198 212 writer.add_document(
199 213 raw_id=unicode(cs.raw_id),
200 214 owner=unicode(repo.contact),
201 215 date=cs._timestamp,
202 216 repository=safe_unicode(repo_name),
203 217 author=cs.author,
204 218 message=cs.message,
205 219 last=cs.last,
206 220 added=u' '.join([safe_unicode(node.path) for node in cs.added]).lower(),
207 221 removed=u' '.join([safe_unicode(node.path) for node in cs.removed]).lower(),
208 222 changed=u' '.join([safe_unicode(node.path) for node in cs.changed]).lower(),
209 223 parents=u' '.join([cs.raw_id for cs in cs.parents]),
210 224 )
211 225 indexed += 1
212 226
213 227 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
214 228 return indexed
215 229
216 230 def index_files(self, file_idx_writer, repo_name, repo):
217 231 """
218 232 Index files for given repo_name
219 233
220 234 :param file_idx_writer: the whoosh index writer to add to
221 235 :param repo_name: name of the repository we're indexing
222 236 :param repo: instance of vcs repo
223 237 """
224 238 i_cnt = iwc_cnt = 0
225 log.debug('building index for [%s]' % repo.path)
239 log.debug('building index for %s @revision:%s' % (repo.path,
240 self._get_index_revision(repo)))
226 241 for idx_path in self.get_paths(repo):
227 242 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
228 243 i_cnt += i
229 244 iwc_cnt += iwc
230 245
231 246 log.debug('added %s files %s with content for repo %s' %
232 247 (i_cnt + iwc_cnt, iwc_cnt, repo.path))
233 248 return i_cnt, iwc_cnt
234 249
235 250 def update_changeset_index(self):
236 251 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
237 252
238 253 with idx.searcher() as searcher:
239 254 writer = idx.writer()
240 255 writer_is_dirty = False
241 256 try:
242 257 indexed_total = 0
243 258 repo_name = None
244 259 for repo_name, repo in self.repo_paths.items():
245 260 # skip indexing if there aren't any revs in the repo
246 261 num_of_revs = len(repo)
247 262 if num_of_revs < 1:
248 263 continue
249 264
250 265 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
251 266 q = qp.parse(u"last:t AND %s" % repo_name)
252 267
253 268 results = searcher.search(q)
254 269
255 270 # default to scanning the entire repo
256 271 last_rev = 0
257 272 start_id = None
258 273
259 274 if len(results) > 0:
260 275 # assuming that there is only one result, if not this
261 276 # may require a full re-index.
262 277 start_id = results[0]['raw_id']
263 278 last_rev = repo.get_changeset(revision=start_id).revision
264 279
265 280 # there are new changesets to index or a new repo to index
266 281 if last_rev == 0 or num_of_revs > last_rev + 1:
267 282 # delete the docs in the index for the previous
268 283 # last changeset(s)
269 284 for hit in results:
270 285 q = qp.parse(u"last:t AND %s AND raw_id:%s" %
271 286 (repo_name, hit['raw_id']))
272 287 writer.delete_by_query(q)
273 288
274 289 # index from the previous last changeset + all new ones
275 290 indexed_total += self.index_changesets(writer,
276 291 repo_name, repo, start_id)
277 292 writer_is_dirty = True
278 293 log.debug('indexed %s changesets for repo %s' % (
279 indexed_total, repo_name)
294 indexed_total, repo_name)
280 295 )
281 296 finally:
282 297 if writer_is_dirty:
283 298 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
284 299 writer.commit(merge=True)
285 300 log.debug('>>> FINISHED REBUILDING CHANGESET INDEX <<<')
286 301 else:
287 writer.cancel
288 302 log.debug('>> NOTHING TO COMMIT TO CHANGESET INDEX<<')
289 303
290 304 def update_file_index(self):
291 305 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
292 306 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
293 307
294 308 idx = open_dir(self.index_location, indexname=self.indexname)
295 309 # The set of all paths in the index
296 310 indexed_paths = set()
297 311 # The set of all paths we need to re-index
298 312 to_index = set()
299 313
300 314 writer = idx.writer()
301 315 writer_is_dirty = False
302 316 try:
303 317 with idx.reader() as reader:
304 318
305 319 # Loop over the stored fields in the index
306 320 for fields in reader.all_stored_fields():
307 321 indexed_path = fields['path']
308 322 indexed_repo_path = fields['repository']
309 323 indexed_paths.add(indexed_path)
310 324
311 325 if not indexed_repo_path in self.filtered_repo_update_paths:
312 326 continue
313 327
314 328 repo = self.repo_paths[indexed_repo_path]
315 329
316 330 try:
317 331 node = self.get_node(repo, indexed_path)
318 332 # Check if this file was changed since it was indexed
319 333 indexed_time = fields['modtime']
320 334 mtime = self.get_node_mtime(node)
321 335 if mtime > indexed_time:
322 336 # The file has changed, delete it and add it to
323 337 # the list of files to reindex
324 338 log.debug(
325 339 'adding to reindex list %s mtime: %s vs %s' % (
326 340 indexed_path, mtime, indexed_time)
327 341 )
328 342 writer.delete_by_term('fileid', indexed_path)
329 343 writer_is_dirty = True
330 344
331 345 to_index.add(indexed_path)
332 346 except (ChangesetError, NodeDoesNotExistError):
333 347 # This file was deleted since it was indexed
334 348 log.debug('removing from index %s' % indexed_path)
335 349 writer.delete_by_term('path', indexed_path)
336 350 writer_is_dirty = True
337 351
338 352 # Loop over the files in the filesystem
339 353 # Assume we have a function that gathers the filenames of the
340 354 # documents to be indexed
341 355 ri_cnt_total = 0 # indexed
342 356 riwc_cnt_total = 0 # indexed with content
343 357 for repo_name, repo in self.repo_paths.items():
344 358 # skip indexing if there aren't any revisions
345 359 if len(repo) < 1:
346 360 continue
347 361 ri_cnt = 0 # indexed
348 362 riwc_cnt = 0 # indexed with content
349 363 for path in self.get_paths(repo):
350 364 path = safe_unicode(path)
351 365 if path in to_index or path not in indexed_paths:
352 366
353 367 # This is either a file that's changed, or a new file
354 368 # that wasn't indexed before. So index it!
355 369 i, iwc = self.add_doc(writer, path, repo, repo_name)
356 370 writer_is_dirty = True
357 371 log.debug('re indexing %s' % path)
358 372 ri_cnt += i
359 373 ri_cnt_total += 1
360 374 riwc_cnt += iwc
361 375 riwc_cnt_total += iwc
362 376 log.debug('added %s files %s with content for repo %s' % (
363 377 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
364 378 )
365 379 log.debug('indexed %s files in total and %s with content' % (
366 380 ri_cnt_total, riwc_cnt_total)
367 381 )
368 382 finally:
369 383 if writer_is_dirty:
370 384 log.debug('>> COMMITING CHANGES TO FILE INDEX <<')
371 385 writer.commit(merge=True)
372 386 log.debug('>>> FINISHED REBUILDING FILE INDEX <<<')
373 387 else:
374 388 log.debug('>> NOTHING TO COMMIT TO FILE INDEX <<')
375 389 writer.cancel()
376 390
377 391 def build_indexes(self):
378 392 if os.path.exists(self.index_location):
379 393 log.debug('removing previous index')
380 394 rmtree(self.index_location)
381 395
382 396 if not os.path.exists(self.index_location):
383 397 os.mkdir(self.index_location)
384 398
385 399 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA,
386 400 indexname=CHGSET_IDX_NAME)
387 401 chgset_idx_writer = chgset_idx.writer()
388 402
389 403 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
390 404 file_idx_writer = file_idx.writer()
391 405 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
392 406 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
393 407
394 408 for repo_name, repo in self.repo_paths.items():
395 409 # skip indexing if there aren't any revisions
396 410 if len(repo) < 1:
397 411 continue
398 412
399 413 self.index_files(file_idx_writer, repo_name, repo)
400 414 self.index_changesets(chgset_idx_writer, repo_name, repo)
401 415
402 416 log.debug('>> COMMITING CHANGES <<')
403 417 file_idx_writer.commit(merge=True)
404 418 chgset_idx_writer.commit(merge=True)
405 419 log.debug('>>> FINISHED BUILDING INDEX <<<')
406 420
407 421 def update_indexes(self):
408 422 self.update_file_index()
409 423 self.update_changeset_index()
410 424
411 425 def run(self, full_index=False):
412 426 """Run daemon"""
413 427 if full_index or self.initial:
414 428 self.build_indexes()
415 429 else:
416 430 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now