##// END OF EJS Templates
show number of revisions to parse in whoosh indexing deamon logging
marcink -
r3922:d8e02de5 beta
parent child Browse files
Show More
@@ -1,439 +1,441 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26
27 27 import os
28 28 import sys
29 29 import logging
30 30 import traceback
31 31
32 32 from shutil import rmtree
33 33 from time import mktime
34 34
35 35 from os.path import dirname as dn
36 36 from os.path import join as jn
37 37
38 38 #to get the rhodecode import
39 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
40 40 sys.path.append(project_path)
41 41
42 42 from rhodecode.config.conf import INDEX_EXTENSIONS
43 43 from rhodecode.model.scm import ScmModel
44 44 from rhodecode.model.db import Repository
45 45 from rhodecode.lib.utils2 import safe_unicode, safe_str
46 46 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
47 47 CHGSET_IDX_NAME
48 48
49 49 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
50 50 NodeDoesNotExistError
51 51
52 52 from whoosh.index import create_in, open_dir, exists_in
53 53 from whoosh.query import *
54 54 from whoosh.qparser import QueryParser
55 55
56 56 log = logging.getLogger('whoosh_indexer')
57 57
58 58
59 59 class WhooshIndexingDaemon(object):
60 60 """
61 61 Daemon for atomic indexing jobs
62 62 """
63 63
64 64 def __init__(self, indexname=IDX_NAME, index_location=None,
65 65 repo_location=None, sa=None, repo_list=None,
66 66 repo_update_list=None):
67 67 self.indexname = indexname
68 68
69 69 self.index_location = index_location
70 70 if not index_location:
71 71 raise Exception('You have to provide index location')
72 72
73 73 self.repo_location = repo_location
74 74 if not repo_location:
75 75 raise Exception('You have to provide repositories location')
76 76
77 77 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
78 78
79 79 #filter repo list
80 80 if repo_list:
81 81 #Fix non-ascii repo names to unicode
82 82 repo_list = map(safe_unicode, repo_list)
83 83 self.filtered_repo_paths = {}
84 84 for repo_name, repo in self.repo_paths.items():
85 85 if repo_name in repo_list:
86 86 self.filtered_repo_paths[repo_name] = repo
87 87
88 88 self.repo_paths = self.filtered_repo_paths
89 89
90 90 #filter update repo list
91 91 self.filtered_repo_update_paths = {}
92 92 if repo_update_list:
93 93 self.filtered_repo_update_paths = {}
94 94 for repo_name, repo in self.repo_paths.items():
95 95 if repo_name in repo_update_list:
96 96 self.filtered_repo_update_paths[repo_name] = repo
97 97 self.repo_paths = self.filtered_repo_update_paths
98 98
99 99 self.initial = True
100 100 if not os.path.isdir(self.index_location):
101 101 os.makedirs(self.index_location)
102 102 log.info('Cannot run incremental index since it does not '
103 103 'yet exist running full build')
104 104 elif not exists_in(self.index_location, IDX_NAME):
105 105 log.info('Running full index build as the file content '
106 106 'index does not exist')
107 107 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
108 108 log.info('Running full index build as the changeset '
109 109 'index does not exist')
110 110 else:
111 111 self.initial = False
112 112
113 113 def _get_index_revision(self, repo):
114 114 db_repo = Repository.get_by_repo_name(repo.name)
115 115 landing_rev = 'tip'
116 116 if db_repo:
117 117 landing_rev = db_repo.landing_rev
118 118 return landing_rev
119 119
120 120 def _get_index_changeset(self, repo):
121 121 index_rev = self._get_index_revision(repo)
122 122 cs = repo.get_changeset(index_rev)
123 123 return cs
124 124
125 125 def get_paths(self, repo):
126 126 """
127 127 recursive walk in root dir and return a set of all path in that dir
128 128 based on repository walk function
129 129 """
130 130 index_paths_ = set()
131 131 try:
132 132 cs = self._get_index_changeset(repo)
133 133 for _topnode, _dirs, files in cs.walk('/'):
134 134 for f in files:
135 135 index_paths_.add(jn(safe_str(repo.path), safe_str(f.path)))
136 136
137 137 except RepositoryError:
138 138 log.debug(traceback.format_exc())
139 139 pass
140 140 return index_paths_
141 141
142 142 def get_node(self, repo, path):
143 143 """
144 144 gets a filenode based on given full path.It operates on string for
145 145 hg git compatability.
146 146
147 147 :param repo: scm repo instance
148 148 :param path: full path including root location
149 149 :return: FileNode
150 150 """
151 151 root_path = safe_str(repo.path)+'/'
152 152 parts = safe_str(path).partition(root_path)
153 153 cs = self._get_index_changeset(repo)
154 154 node = cs.get_node(parts[-1])
155 155 return node
156 156
157 157 def get_node_mtime(self, node):
158 158 return mktime(node.last_changeset.date.timetuple())
159 159
160 160 def add_doc(self, writer, path, repo, repo_name):
161 161 """
162 162 Adding doc to writer this function itself fetches data from
163 163 the instance of vcs backend
164 164 """
165 165
166 166 node = self.get_node(repo, path)
167 167 indexed = indexed_w_content = 0
168 168 # we just index the content of chosen files, and skip binary files
169 169 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
170 170 u_content = node.content
171 171 if not isinstance(u_content, unicode):
172 172 log.warning(' >> %s Could not get this content as unicode '
173 173 'replacing with empty content' % path)
174 174 u_content = u''
175 175 else:
176 176 log.debug(' >> %s [WITH CONTENT]' % path)
177 177 indexed_w_content += 1
178 178
179 179 else:
180 180 log.debug(' >> %s' % path)
181 181 # just index file name without it's content
182 182 u_content = u''
183 183 indexed += 1
184 184
185 185 p = safe_unicode(path)
186 186 writer.add_document(
187 187 fileid=p,
188 188 owner=unicode(repo.contact),
189 189 repository=safe_unicode(repo_name),
190 190 path=p,
191 191 content=u_content,
192 192 modtime=self.get_node_mtime(node),
193 193 extension=node.extension
194 194 )
195 195 return indexed, indexed_w_content
196 196
197 197 def index_changesets(self, writer, repo_name, repo, start_rev=None):
198 198 """
199 199 Add all changeset in the vcs repo starting at start_rev
200 200 to the index writer
201 201
202 202 :param writer: the whoosh index writer to add to
203 203 :param repo_name: name of the repository from whence the
204 204 changeset originates including the repository group
205 205 :param repo: the vcs repository instance to index changesets for,
206 206 the presumption is the repo has changesets to index
207 207 :param start_rev=None: the full sha id to start indexing from
208 208 if start_rev is None then index from the first changeset in
209 209 the repo
210 210 """
211 211
212 212 if start_rev is None:
213 213 start_rev = repo[0].raw_id
214 214
215 215 log.debug('indexing changesets in %s starting at rev: %s' %
216 216 (repo_name, start_rev))
217 217
218 218 indexed = 0
219 for cs in repo.get_changesets(start=start_rev):
220 log.debug(' >> %s' % cs)
219 cs_iter = repo.get_changesets(start=start_rev)
220 total = len(cs_iter)
221 for cs in cs_iter:
222 log.debug(' >> %s/%s' % (cs, total))
221 223 writer.add_document(
222 224 raw_id=unicode(cs.raw_id),
223 225 owner=unicode(repo.contact),
224 226 date=cs._timestamp,
225 227 repository=safe_unicode(repo_name),
226 228 author=cs.author,
227 229 message=cs.message,
228 230 last=cs.last,
229 231 added=u' '.join([safe_unicode(node.path) for node in cs.added]).lower(),
230 232 removed=u' '.join([safe_unicode(node.path) for node in cs.removed]).lower(),
231 233 changed=u' '.join([safe_unicode(node.path) for node in cs.changed]).lower(),
232 234 parents=u' '.join([cs.raw_id for cs in cs.parents]),
233 235 )
234 236 indexed += 1
235 237
236 238 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
237 239 return indexed
238 240
239 241 def index_files(self, file_idx_writer, repo_name, repo):
240 242 """
241 243 Index files for given repo_name
242 244
243 245 :param file_idx_writer: the whoosh index writer to add to
244 246 :param repo_name: name of the repository we're indexing
245 247 :param repo: instance of vcs repo
246 248 """
247 249 i_cnt = iwc_cnt = 0
248 250 log.debug('building index for %s @revision:%s' % (repo.path,
249 251 self._get_index_revision(repo)))
250 252 for idx_path in self.get_paths(repo):
251 253 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
252 254 i_cnt += i
253 255 iwc_cnt += iwc
254 256
255 257 log.debug('added %s files %s with content for repo %s' %
256 258 (i_cnt + iwc_cnt, iwc_cnt, repo.path))
257 259 return i_cnt, iwc_cnt
258 260
259 261 def update_changeset_index(self):
260 262 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
261 263
262 264 with idx.searcher() as searcher:
263 265 writer = idx.writer()
264 266 writer_is_dirty = False
265 267 try:
266 268 indexed_total = 0
267 269 repo_name = None
268 270 for repo_name, repo in self.repo_paths.items():
269 271 # skip indexing if there aren't any revs in the repo
270 272 num_of_revs = len(repo)
271 273 if num_of_revs < 1:
272 274 continue
273 275
274 276 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
275 277 q = qp.parse(u"last:t AND %s" % repo_name)
276 278
277 279 results = searcher.search(q)
278 280
279 281 # default to scanning the entire repo
280 282 last_rev = 0
281 283 start_id = None
282 284
283 285 if len(results) > 0:
284 286 # assuming that there is only one result, if not this
285 287 # may require a full re-index.
286 288 start_id = results[0]['raw_id']
287 289 last_rev = repo.get_changeset(revision=start_id).revision
288 290
289 291 # there are new changesets to index or a new repo to index
290 292 if last_rev == 0 or num_of_revs > last_rev + 1:
291 293 # delete the docs in the index for the previous
292 294 # last changeset(s)
293 295 for hit in results:
294 296 q = qp.parse(u"last:t AND %s AND raw_id:%s" %
295 297 (repo_name, hit['raw_id']))
296 298 writer.delete_by_query(q)
297 299
298 300 # index from the previous last changeset + all new ones
299 301 indexed_total += self.index_changesets(writer,
300 302 repo_name, repo, start_id)
301 303 writer_is_dirty = True
302 304 log.debug('indexed %s changesets for repo %s' % (
303 305 indexed_total, repo_name)
304 306 )
305 307 finally:
306 308 if writer_is_dirty:
307 309 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
308 310 writer.commit(merge=True)
309 311 log.debug('>>> FINISHED REBUILDING CHANGESET INDEX <<<')
310 312 else:
311 313 log.debug('>> NOTHING TO COMMIT TO CHANGESET INDEX<<')
312 314
313 315 def update_file_index(self):
314 316 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
315 317 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
316 318
317 319 idx = open_dir(self.index_location, indexname=self.indexname)
318 320 # The set of all paths in the index
319 321 indexed_paths = set()
320 322 # The set of all paths we need to re-index
321 323 to_index = set()
322 324
323 325 writer = idx.writer()
324 326 writer_is_dirty = False
325 327 try:
326 328 with idx.reader() as reader:
327 329
328 330 # Loop over the stored fields in the index
329 331 for fields in reader.all_stored_fields():
330 332 indexed_path = fields['path']
331 333 indexed_repo_path = fields['repository']
332 334 indexed_paths.add(indexed_path)
333 335
334 336 if not indexed_repo_path in self.filtered_repo_update_paths:
335 337 continue
336 338
337 339 repo = self.repo_paths[indexed_repo_path]
338 340
339 341 try:
340 342 node = self.get_node(repo, indexed_path)
341 343 # Check if this file was changed since it was indexed
342 344 indexed_time = fields['modtime']
343 345 mtime = self.get_node_mtime(node)
344 346 if mtime > indexed_time:
345 347 # The file has changed, delete it and add it to
346 348 # the list of files to reindex
347 349 log.debug(
348 350 'adding to reindex list %s mtime: %s vs %s' % (
349 351 indexed_path, mtime, indexed_time)
350 352 )
351 353 writer.delete_by_term('fileid', indexed_path)
352 354 writer_is_dirty = True
353 355
354 356 to_index.add(indexed_path)
355 357 except (ChangesetError, NodeDoesNotExistError):
356 358 # This file was deleted since it was indexed
357 359 log.debug('removing from index %s' % indexed_path)
358 360 writer.delete_by_term('path', indexed_path)
359 361 writer_is_dirty = True
360 362
361 363 # Loop over the files in the filesystem
362 364 # Assume we have a function that gathers the filenames of the
363 365 # documents to be indexed
364 366 ri_cnt_total = 0 # indexed
365 367 riwc_cnt_total = 0 # indexed with content
366 368 for repo_name, repo in self.repo_paths.items():
367 369 # skip indexing if there aren't any revisions
368 370 if len(repo) < 1:
369 371 continue
370 372 ri_cnt = 0 # indexed
371 373 riwc_cnt = 0 # indexed with content
372 374 for path in self.get_paths(repo):
373 375 path = safe_unicode(path)
374 376 if path in to_index or path not in indexed_paths:
375 377
376 378 # This is either a file that's changed, or a new file
377 379 # that wasn't indexed before. So index it!
378 380 i, iwc = self.add_doc(writer, path, repo, repo_name)
379 381 writer_is_dirty = True
380 382 log.debug('re indexing %s' % path)
381 383 ri_cnt += i
382 384 ri_cnt_total += 1
383 385 riwc_cnt += iwc
384 386 riwc_cnt_total += iwc
385 387 log.debug('added %s files %s with content for repo %s' % (
386 388 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
387 389 )
388 390 log.debug('indexed %s files in total and %s with content' % (
389 391 ri_cnt_total, riwc_cnt_total)
390 392 )
391 393 finally:
392 394 if writer_is_dirty:
393 395 log.debug('>> COMMITING CHANGES TO FILE INDEX <<')
394 396 writer.commit(merge=True)
395 397 log.debug('>>> FINISHED REBUILDING FILE INDEX <<<')
396 398 else:
397 399 log.debug('>> NOTHING TO COMMIT TO FILE INDEX <<')
398 400 writer.cancel()
399 401
400 402 def build_indexes(self):
401 403 if os.path.exists(self.index_location):
402 404 log.debug('removing previous index')
403 405 rmtree(self.index_location)
404 406
405 407 if not os.path.exists(self.index_location):
406 408 os.mkdir(self.index_location)
407 409
408 410 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA,
409 411 indexname=CHGSET_IDX_NAME)
410 412 chgset_idx_writer = chgset_idx.writer()
411 413
412 414 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
413 415 file_idx_writer = file_idx.writer()
414 416 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
415 417 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
416 418
417 419 for repo_name, repo in self.repo_paths.items():
418 420 # skip indexing if there aren't any revisions
419 421 if len(repo) < 1:
420 422 continue
421 423
422 424 self.index_files(file_idx_writer, repo_name, repo)
423 425 self.index_changesets(chgset_idx_writer, repo_name, repo)
424 426
425 427 log.debug('>> COMMITING CHANGES <<')
426 428 file_idx_writer.commit(merge=True)
427 429 chgset_idx_writer.commit(merge=True)
428 430 log.debug('>>> FINISHED BUILDING INDEX <<<')
429 431
430 432 def update_indexes(self):
431 433 self.update_file_index()
432 434 self.update_changeset_index()
433 435
434 436 def run(self, full_index=False):
435 437 """Run daemon"""
436 438 if full_index or self.initial:
437 439 self.build_indexes()
438 440 else:
439 441 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now