##// END OF EJS Templates
fixed logging messages on whoosh indexer
marcink -
r2840:c7c58252 beta
parent child Browse files
Show More
@@ -1,414 +1,414 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26
27 27 import os
28 28 import sys
29 29 import logging
30 30 import traceback
31 31
32 32 from shutil import rmtree
33 33 from time import mktime
34 34
35 35 from os.path import dirname as dn
36 36 from os.path import join as jn
37 37
38 38 #to get the rhodecode import
39 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
40 40 sys.path.append(project_path)
41 41
42 42 from rhodecode.config.conf import INDEX_EXTENSIONS
43 43 from rhodecode.model.scm import ScmModel
44 44 from rhodecode.lib.utils2 import safe_unicode
45 45 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
46 46 CHGSET_IDX_NAME
47 47
48 48 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
49 49 NodeDoesNotExistError
50 50
51 51 from whoosh.index import create_in, open_dir, exists_in
52 52 from whoosh.query import *
53 53 from whoosh.qparser import QueryParser
54 54
55 55 log = logging.getLogger('whoosh_indexer')
56 56
57 57
58 58 class WhooshIndexingDaemon(object):
59 59 """
60 60 Daemon for atomic indexing jobs
61 61 """
62 62
63 63 def __init__(self, indexname=IDX_NAME, index_location=None,
64 64 repo_location=None, sa=None, repo_list=None,
65 65 repo_update_list=None):
66 66 self.indexname = indexname
67 67
68 68 self.index_location = index_location
69 69 if not index_location:
70 70 raise Exception('You have to provide index location')
71 71
72 72 self.repo_location = repo_location
73 73 if not repo_location:
74 74 raise Exception('You have to provide repositories location')
75 75
76 76 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
77 77
78 78 #filter repo list
79 79 if repo_list:
80 80 self.filtered_repo_paths = {}
81 81 for repo_name, repo in self.repo_paths.items():
82 82 if repo_name in repo_list:
83 83 self.filtered_repo_paths[repo_name] = repo
84 84
85 85 self.repo_paths = self.filtered_repo_paths
86 86
87 87 #filter update repo list
88 88 self.filtered_repo_update_paths = {}
89 89 if repo_update_list:
90 90 self.filtered_repo_update_paths = {}
91 91 for repo_name, repo in self.repo_paths.items():
92 92 if repo_name in repo_update_list:
93 93 self.filtered_repo_update_paths[repo_name] = repo
94 94 self.repo_paths = self.filtered_repo_update_paths
95 95
96 96 self.initial = True
97 97 if not os.path.isdir(self.index_location):
98 98 os.makedirs(self.index_location)
99 99 log.info('Cannot run incremental index since it does not'
100 100 ' yet exist running full build')
101 101 elif not exists_in(self.index_location, IDX_NAME):
102 102 log.info('Running full index build as the file content'
103 103 ' index does not exist')
104 104 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
105 105 log.info('Running full index build as the changeset'
106 106 ' index does not exist')
107 107 else:
108 108 self.initial = False
109 109
110 110 def get_paths(self, repo):
111 111 """
112 112 recursive walk in root dir and return a set of all path in that dir
113 113 based on repository walk function
114 114 """
115 115 index_paths_ = set()
116 116 try:
117 117 tip = repo.get_changeset('tip')
118 118 for _topnode, _dirs, files in tip.walk('/'):
119 119 for f in files:
120 120 index_paths_.add(jn(repo.path, f.path))
121 121
122 122 except RepositoryError:
123 123 log.debug(traceback.format_exc())
124 124 pass
125 125 return index_paths_
126 126
127 127 def get_node(self, repo, path):
128 128 n_path = path[len(repo.path) + 1:]
129 129 node = repo.get_changeset().get_node(n_path)
130 130 return node
131 131
132 132 def get_node_mtime(self, node):
133 133 return mktime(node.last_changeset.date.timetuple())
134 134
135 135 def add_doc(self, writer, path, repo, repo_name):
136 136 """
137 137 Adding doc to writer this function itself fetches data from
138 138 the instance of vcs backend
139 139 """
140 140
141 141 node = self.get_node(repo, path)
142 142 indexed = indexed_w_content = 0
143 143 # we just index the content of chosen files, and skip binary files
144 144 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
145 145 u_content = node.content
146 146 if not isinstance(u_content, unicode):
147 147 log.warning(' >> %s Could not get this content as unicode '
148 148 'replacing with empty content' % path)
149 149 u_content = u''
150 150 else:
151 151 log.debug(' >> %s [WITH CONTENT]' % path)
152 152 indexed_w_content += 1
153 153
154 154 else:
155 155 log.debug(' >> %s' % path)
156 156 # just index file name without it's content
157 157 u_content = u''
158 158 indexed += 1
159 159
160 160 p = safe_unicode(path)
161 161 writer.add_document(
162 162 fileid=p,
163 163 owner=unicode(repo.contact),
164 164 repository=safe_unicode(repo_name),
165 165 path=p,
166 166 content=u_content,
167 167 modtime=self.get_node_mtime(node),
168 168 extension=node.extension
169 169 )
170 170 return indexed, indexed_w_content
171 171
172 172 def index_changesets(self, writer, repo_name, repo, start_rev=None):
173 173 """
174 174 Add all changeset in the vcs repo starting at start_rev
175 175 to the index writer
176 176
177 177 :param writer: the whoosh index writer to add to
178 178 :param repo_name: name of the repository from whence the
179 179 changeset originates including the repository group
180 180 :param repo: the vcs repository instance to index changesets for,
181 181 the presumption is the repo has changesets to index
182 182 :param start_rev=None: the full sha id to start indexing from
183 183 if start_rev is None then index from the first changeset in
184 184 the repo
185 185 """
186 186
187 187 if start_rev is None:
188 188 start_rev = repo[0].raw_id
189 189
190 190 log.debug('indexing changesets in %s starting at rev: %s' %
191 191 (repo_name, start_rev))
192 192
193 193 indexed = 0
194 194 for cs in repo.get_changesets(start=start_rev):
195 195 log.debug(' >> %s' % cs)
196 196 writer.add_document(
197 197 raw_id=unicode(cs.raw_id),
198 198 owner=unicode(repo.contact),
199 199 date=cs._timestamp,
200 200 repository=safe_unicode(repo_name),
201 201 author=cs.author,
202 202 message=cs.message,
203 203 last=cs.last,
204 204 added=u' '.join([safe_unicode(node.path) for node in cs.added]).lower(),
205 205 removed=u' '.join([safe_unicode(node.path) for node in cs.removed]).lower(),
206 206 changed=u' '.join([safe_unicode(node.path) for node in cs.changed]).lower(),
207 207 parents=u' '.join([cs.raw_id for cs in cs.parents]),
208 208 )
209 209 indexed += 1
210 210
211 211 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
212 212 return indexed
213 213
214 214 def index_files(self, file_idx_writer, repo_name, repo):
215 215 """
216 216 Index files for given repo_name
217 217
218 218 :param file_idx_writer: the whoosh index writer to add to
219 219 :param repo_name: name of the repository we're indexing
220 220 :param repo: instance of vcs repo
221 221 """
222 222 i_cnt = iwc_cnt = 0
223 223 log.debug('building index for [%s]' % repo.path)
224 224 for idx_path in self.get_paths(repo):
225 225 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
226 226 i_cnt += i
227 227 iwc_cnt += iwc
228 228
229 229 log.debug('added %s files %s with content for repo %s' %
230 230 (i_cnt + iwc_cnt, iwc_cnt, repo.path))
231 231 return i_cnt, iwc_cnt
232 232
233 233 def update_changeset_index(self):
234 234 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
235 235
236 236 with idx.searcher() as searcher:
237 237 writer = idx.writer()
238 238 writer_is_dirty = False
239 239 try:
240 240 indexed_total = 0
241 241 repo_name = None
242 242 for repo_name, repo in self.repo_paths.items():
243 243 # skip indexing if there aren't any revs in the repo
244 244 num_of_revs = len(repo)
245 245 if num_of_revs < 1:
246 246 continue
247 247
248 248 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
249 249 q = qp.parse(u"last:t AND %s" % repo_name)
250 250
251 251 results = searcher.search(q)
252 252
253 253 # default to scanning the entire repo
254 254 last_rev = 0
255 255 start_id = None
256 256
257 257 if len(results) > 0:
258 258 # assuming that there is only one result, if not this
259 259 # may require a full re-index.
260 260 start_id = results[0]['raw_id']
261 261 last_rev = repo.get_changeset(revision=start_id).revision
262 262
263 263 # there are new changesets to index or a new repo to index
264 264 if last_rev == 0 or num_of_revs > last_rev + 1:
265 265 # delete the docs in the index for the previous
266 266 # last changeset(s)
267 267 for hit in results:
268 268 q = qp.parse(u"last:t AND %s AND raw_id:%s" %
269 269 (repo_name, hit['raw_id']))
270 270 writer.delete_by_query(q)
271 271
272 272 # index from the previous last changeset + all new ones
273 273 indexed_total += self.index_changesets(writer,
274 274 repo_name, repo, start_id)
275 275 writer_is_dirty = True
276 276 log.debug('indexed %s changesets for repo %s' % (
277 277 indexed_total, repo_name)
278 278 )
279 279 finally:
280 280 if writer_is_dirty:
281 281 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
282 282 writer.commit(merge=True)
283 log.debug('>> COMMITTED CHANGES TO CHANGESET INDEX<<')
283 log.debug('>>> FINISHED REBUILDING CHANGESET INDEX <<<')
284 284 else:
285 285 writer.cancel
286 log.debug('>> NOTHING TO COMMIT<<')
286 log.debug('>> NOTHING TO COMMIT TO CHANGESET INDEX<<')
287 287
288 288 def update_file_index(self):
289 289 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
290 290 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
291 291
292 292 idx = open_dir(self.index_location, indexname=self.indexname)
293 293 # The set of all paths in the index
294 294 indexed_paths = set()
295 295 # The set of all paths we need to re-index
296 296 to_index = set()
297 297
298 298 writer = idx.writer()
299 299 writer_is_dirty = False
300 300 try:
301 301 with idx.reader() as reader:
302 302
303 303 # Loop over the stored fields in the index
304 304 for fields in reader.all_stored_fields():
305 305 indexed_path = fields['path']
306 306 indexed_repo_path = fields['repository']
307 307 indexed_paths.add(indexed_path)
308 308
309 309 if not indexed_repo_path in self.filtered_repo_update_paths:
310 310 continue
311 311
312 312 repo = self.repo_paths[indexed_repo_path]
313 313
314 314 try:
315 315 node = self.get_node(repo, indexed_path)
316 316 # Check if this file was changed since it was indexed
317 317 indexed_time = fields['modtime']
318 318 mtime = self.get_node_mtime(node)
319 319 if mtime > indexed_time:
320 320 # The file has changed, delete it and add it to
321 321 # the list of files to reindex
322 322 log.debug(
323 323 'adding to reindex list %s mtime: %s vs %s' % (
324 324 indexed_path, mtime, indexed_time)
325 325 )
326 326 writer.delete_by_term('fileid', indexed_path)
327 327 writer_is_dirty = True
328 328
329 329 to_index.add(indexed_path)
330 330 except (ChangesetError, NodeDoesNotExistError):
331 331 # This file was deleted since it was indexed
332 332 log.debug('removing from index %s' % indexed_path)
333 333 writer.delete_by_term('path', indexed_path)
334 334 writer_is_dirty = True
335 335
336 336 # Loop over the files in the filesystem
337 337 # Assume we have a function that gathers the filenames of the
338 338 # documents to be indexed
339 339 ri_cnt_total = 0 # indexed
340 340 riwc_cnt_total = 0 # indexed with content
341 341 for repo_name, repo in self.repo_paths.items():
342 342 # skip indexing if there aren't any revisions
343 343 if len(repo) < 1:
344 344 continue
345 345 ri_cnt = 0 # indexed
346 346 riwc_cnt = 0 # indexed with content
347 347 for path in self.get_paths(repo):
348 348 path = safe_unicode(path)
349 349 if path in to_index or path not in indexed_paths:
350 350
351 351 # This is either a file that's changed, or a new file
352 352 # that wasn't indexed before. So index it!
353 353 i, iwc = self.add_doc(writer, path, repo, repo_name)
354 354 writer_is_dirty = True
355 355 log.debug('re indexing %s' % path)
356 356 ri_cnt += i
357 357 ri_cnt_total += 1
358 358 riwc_cnt += iwc
359 359 riwc_cnt_total += iwc
360 360 log.debug('added %s files %s with content for repo %s' % (
361 361 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
362 362 )
363 363 log.debug('indexed %s files in total and %s with content' % (
364 364 ri_cnt_total, riwc_cnt_total)
365 365 )
366 366 finally:
367 367 if writer_is_dirty:
368 log.debug('>> COMMITING CHANGES <<')
368 log.debug('>> COMMITING CHANGES TO FILE INDEX <<')
369 369 writer.commit(merge=True)
370 log.debug('>>> FINISHED REBUILDING INDEX <<<')
370 log.debug('>>> FINISHED REBUILDING FILE INDEX <<<')
371 371 else:
372 log.debug('>> NOTHING TO COMMIT<<')
372 log.debug('>> NOTHING TO COMMIT TO FILE INDEX <<')
373 373 writer.cancel()
374 374
375 375 def build_indexes(self):
376 376 if os.path.exists(self.index_location):
377 377 log.debug('removing previous index')
378 378 rmtree(self.index_location)
379 379
380 380 if not os.path.exists(self.index_location):
381 381 os.mkdir(self.index_location)
382 382
383 383 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA,
384 384 indexname=CHGSET_IDX_NAME)
385 385 chgset_idx_writer = chgset_idx.writer()
386 386
387 387 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
388 388 file_idx_writer = file_idx.writer()
389 389 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
390 390 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
391 391
392 392 for repo_name, repo in self.repo_paths.items():
393 393 # skip indexing if there aren't any revisions
394 394 if len(repo) < 1:
395 395 continue
396 396
397 397 self.index_files(file_idx_writer, repo_name, repo)
398 398 self.index_changesets(chgset_idx_writer, repo_name, repo)
399 399
400 400 log.debug('>> COMMITING CHANGES <<')
401 401 file_idx_writer.commit(merge=True)
402 402 chgset_idx_writer.commit(merge=True)
403 403 log.debug('>>> FINISHED BUILDING INDEX <<<')
404 404
405 405 def update_indexes(self):
406 406 self.update_file_index()
407 407 self.update_changeset_index()
408 408
409 409 def run(self, full_index=False):
410 410 """Run daemon"""
411 411 if full_index or self.initial:
412 412 self.build_indexes()
413 413 else:
414 414 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now