##// END OF EJS Templates
fixed problems with re-indexing non-ascii names of repositories
marcink -
r2841:2fa3c09f beta
parent child Browse files
Show More
@@ -1,414 +1,416 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26
27 27 import os
28 28 import sys
29 29 import logging
30 30 import traceback
31 31
32 32 from shutil import rmtree
33 33 from time import mktime
34 34
35 35 from os.path import dirname as dn
36 36 from os.path import join as jn
37 37
38 38 #to get the rhodecode import
39 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
40 40 sys.path.append(project_path)
41 41
42 42 from rhodecode.config.conf import INDEX_EXTENSIONS
43 43 from rhodecode.model.scm import ScmModel
44 44 from rhodecode.lib.utils2 import safe_unicode
45 45 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
46 46 CHGSET_IDX_NAME
47 47
48 48 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
49 49 NodeDoesNotExistError
50 50
51 51 from whoosh.index import create_in, open_dir, exists_in
52 52 from whoosh.query import *
53 53 from whoosh.qparser import QueryParser
54 54
55 55 log = logging.getLogger('whoosh_indexer')
56 56
57 57
58 58 class WhooshIndexingDaemon(object):
59 59 """
60 60 Daemon for atomic indexing jobs
61 61 """
62 62
63 63 def __init__(self, indexname=IDX_NAME, index_location=None,
64 64 repo_location=None, sa=None, repo_list=None,
65 65 repo_update_list=None):
66 66 self.indexname = indexname
67 67
68 68 self.index_location = index_location
69 69 if not index_location:
70 70 raise Exception('You have to provide index location')
71 71
72 72 self.repo_location = repo_location
73 73 if not repo_location:
74 74 raise Exception('You have to provide repositories location')
75 75
76 76 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
77 77
78 78 #filter repo list
79 79 if repo_list:
80 #Fix non-ascii repo names to unicode
81 repo_list = map(safe_unicode, repo_list)
80 82 self.filtered_repo_paths = {}
81 83 for repo_name, repo in self.repo_paths.items():
82 84 if repo_name in repo_list:
83 85 self.filtered_repo_paths[repo_name] = repo
84 86
85 87 self.repo_paths = self.filtered_repo_paths
86 88
87 89 #filter update repo list
88 90 self.filtered_repo_update_paths = {}
89 91 if repo_update_list:
90 92 self.filtered_repo_update_paths = {}
91 93 for repo_name, repo in self.repo_paths.items():
92 94 if repo_name in repo_update_list:
93 95 self.filtered_repo_update_paths[repo_name] = repo
94 96 self.repo_paths = self.filtered_repo_update_paths
95 97
96 98 self.initial = True
97 99 if not os.path.isdir(self.index_location):
98 100 os.makedirs(self.index_location)
99 101 log.info('Cannot run incremental index since it does not'
100 102 ' yet exist running full build')
101 103 elif not exists_in(self.index_location, IDX_NAME):
102 104 log.info('Running full index build as the file content'
103 105 ' index does not exist')
104 106 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
105 107 log.info('Running full index build as the changeset'
106 108 ' index does not exist')
107 109 else:
108 110 self.initial = False
109 111
110 112 def get_paths(self, repo):
111 113 """
112 114 recursive walk in root dir and return a set of all path in that dir
113 115 based on repository walk function
114 116 """
115 117 index_paths_ = set()
116 118 try:
117 119 tip = repo.get_changeset('tip')
118 120 for _topnode, _dirs, files in tip.walk('/'):
119 121 for f in files:
120 122 index_paths_.add(jn(repo.path, f.path))
121 123
122 124 except RepositoryError:
123 125 log.debug(traceback.format_exc())
124 126 pass
125 127 return index_paths_
126 128
127 129 def get_node(self, repo, path):
128 130 n_path = path[len(repo.path) + 1:]
129 131 node = repo.get_changeset().get_node(n_path)
130 132 return node
131 133
132 134 def get_node_mtime(self, node):
133 135 return mktime(node.last_changeset.date.timetuple())
134 136
135 137 def add_doc(self, writer, path, repo, repo_name):
136 138 """
137 139 Adding doc to writer this function itself fetches data from
138 140 the instance of vcs backend
139 141 """
140 142
141 143 node = self.get_node(repo, path)
142 144 indexed = indexed_w_content = 0
143 145 # we just index the content of chosen files, and skip binary files
144 146 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
145 147 u_content = node.content
146 148 if not isinstance(u_content, unicode):
147 149 log.warning(' >> %s Could not get this content as unicode '
148 150 'replacing with empty content' % path)
149 151 u_content = u''
150 152 else:
151 153 log.debug(' >> %s [WITH CONTENT]' % path)
152 154 indexed_w_content += 1
153 155
154 156 else:
155 157 log.debug(' >> %s' % path)
156 158 # just index file name without it's content
157 159 u_content = u''
158 160 indexed += 1
159 161
160 162 p = safe_unicode(path)
161 163 writer.add_document(
162 164 fileid=p,
163 165 owner=unicode(repo.contact),
164 166 repository=safe_unicode(repo_name),
165 167 path=p,
166 168 content=u_content,
167 169 modtime=self.get_node_mtime(node),
168 170 extension=node.extension
169 171 )
170 172 return indexed, indexed_w_content
171 173
172 174 def index_changesets(self, writer, repo_name, repo, start_rev=None):
173 175 """
174 176 Add all changeset in the vcs repo starting at start_rev
175 177 to the index writer
176 178
177 179 :param writer: the whoosh index writer to add to
178 180 :param repo_name: name of the repository from whence the
179 181 changeset originates including the repository group
180 182 :param repo: the vcs repository instance to index changesets for,
181 183 the presumption is the repo has changesets to index
182 184 :param start_rev=None: the full sha id to start indexing from
183 185 if start_rev is None then index from the first changeset in
184 186 the repo
185 187 """
186 188
187 189 if start_rev is None:
188 190 start_rev = repo[0].raw_id
189 191
190 192 log.debug('indexing changesets in %s starting at rev: %s' %
191 193 (repo_name, start_rev))
192 194
193 195 indexed = 0
194 196 for cs in repo.get_changesets(start=start_rev):
195 197 log.debug(' >> %s' % cs)
196 198 writer.add_document(
197 199 raw_id=unicode(cs.raw_id),
198 200 owner=unicode(repo.contact),
199 201 date=cs._timestamp,
200 202 repository=safe_unicode(repo_name),
201 203 author=cs.author,
202 204 message=cs.message,
203 205 last=cs.last,
204 206 added=u' '.join([safe_unicode(node.path) for node in cs.added]).lower(),
205 207 removed=u' '.join([safe_unicode(node.path) for node in cs.removed]).lower(),
206 208 changed=u' '.join([safe_unicode(node.path) for node in cs.changed]).lower(),
207 209 parents=u' '.join([cs.raw_id for cs in cs.parents]),
208 210 )
209 211 indexed += 1
210 212
211 213 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
212 214 return indexed
213 215
214 216 def index_files(self, file_idx_writer, repo_name, repo):
215 217 """
216 218 Index files for given repo_name
217 219
218 220 :param file_idx_writer: the whoosh index writer to add to
219 221 :param repo_name: name of the repository we're indexing
220 222 :param repo: instance of vcs repo
221 223 """
222 224 i_cnt = iwc_cnt = 0
223 225 log.debug('building index for [%s]' % repo.path)
224 226 for idx_path in self.get_paths(repo):
225 227 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
226 228 i_cnt += i
227 229 iwc_cnt += iwc
228 230
229 231 log.debug('added %s files %s with content for repo %s' %
230 232 (i_cnt + iwc_cnt, iwc_cnt, repo.path))
231 233 return i_cnt, iwc_cnt
232 234
233 235 def update_changeset_index(self):
234 236 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
235 237
236 238 with idx.searcher() as searcher:
237 239 writer = idx.writer()
238 240 writer_is_dirty = False
239 241 try:
240 242 indexed_total = 0
241 243 repo_name = None
242 244 for repo_name, repo in self.repo_paths.items():
243 245 # skip indexing if there aren't any revs in the repo
244 246 num_of_revs = len(repo)
245 247 if num_of_revs < 1:
246 248 continue
247 249
248 250 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
249 251 q = qp.parse(u"last:t AND %s" % repo_name)
250 252
251 253 results = searcher.search(q)
252 254
253 255 # default to scanning the entire repo
254 256 last_rev = 0
255 257 start_id = None
256 258
257 259 if len(results) > 0:
258 260 # assuming that there is only one result, if not this
259 261 # may require a full re-index.
260 262 start_id = results[0]['raw_id']
261 263 last_rev = repo.get_changeset(revision=start_id).revision
262 264
263 265 # there are new changesets to index or a new repo to index
264 266 if last_rev == 0 or num_of_revs > last_rev + 1:
265 267 # delete the docs in the index for the previous
266 268 # last changeset(s)
267 269 for hit in results:
268 270 q = qp.parse(u"last:t AND %s AND raw_id:%s" %
269 271 (repo_name, hit['raw_id']))
270 272 writer.delete_by_query(q)
271 273
272 274 # index from the previous last changeset + all new ones
273 275 indexed_total += self.index_changesets(writer,
274 276 repo_name, repo, start_id)
275 277 writer_is_dirty = True
276 278 log.debug('indexed %s changesets for repo %s' % (
277 279 indexed_total, repo_name)
278 280 )
279 281 finally:
280 282 if writer_is_dirty:
281 283 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
282 284 writer.commit(merge=True)
283 285 log.debug('>>> FINISHED REBUILDING CHANGESET INDEX <<<')
284 286 else:
285 287 writer.cancel
286 288 log.debug('>> NOTHING TO COMMIT TO CHANGESET INDEX<<')
287 289
288 290 def update_file_index(self):
289 291 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
290 292 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
291 293
292 294 idx = open_dir(self.index_location, indexname=self.indexname)
293 295 # The set of all paths in the index
294 296 indexed_paths = set()
295 297 # The set of all paths we need to re-index
296 298 to_index = set()
297 299
298 300 writer = idx.writer()
299 301 writer_is_dirty = False
300 302 try:
301 303 with idx.reader() as reader:
302 304
303 305 # Loop over the stored fields in the index
304 306 for fields in reader.all_stored_fields():
305 307 indexed_path = fields['path']
306 308 indexed_repo_path = fields['repository']
307 309 indexed_paths.add(indexed_path)
308 310
309 311 if not indexed_repo_path in self.filtered_repo_update_paths:
310 312 continue
311 313
312 314 repo = self.repo_paths[indexed_repo_path]
313 315
314 316 try:
315 317 node = self.get_node(repo, indexed_path)
316 318 # Check if this file was changed since it was indexed
317 319 indexed_time = fields['modtime']
318 320 mtime = self.get_node_mtime(node)
319 321 if mtime > indexed_time:
320 322 # The file has changed, delete it and add it to
321 323 # the list of files to reindex
322 324 log.debug(
323 325 'adding to reindex list %s mtime: %s vs %s' % (
324 326 indexed_path, mtime, indexed_time)
325 327 )
326 328 writer.delete_by_term('fileid', indexed_path)
327 329 writer_is_dirty = True
328 330
329 331 to_index.add(indexed_path)
330 332 except (ChangesetError, NodeDoesNotExistError):
331 333 # This file was deleted since it was indexed
332 334 log.debug('removing from index %s' % indexed_path)
333 335 writer.delete_by_term('path', indexed_path)
334 336 writer_is_dirty = True
335 337
336 338 # Loop over the files in the filesystem
337 339 # Assume we have a function that gathers the filenames of the
338 340 # documents to be indexed
339 341 ri_cnt_total = 0 # indexed
340 342 riwc_cnt_total = 0 # indexed with content
341 343 for repo_name, repo in self.repo_paths.items():
342 344 # skip indexing if there aren't any revisions
343 345 if len(repo) < 1:
344 346 continue
345 347 ri_cnt = 0 # indexed
346 348 riwc_cnt = 0 # indexed with content
347 349 for path in self.get_paths(repo):
348 350 path = safe_unicode(path)
349 351 if path in to_index or path not in indexed_paths:
350 352
351 353 # This is either a file that's changed, or a new file
352 354 # that wasn't indexed before. So index it!
353 355 i, iwc = self.add_doc(writer, path, repo, repo_name)
354 356 writer_is_dirty = True
355 357 log.debug('re indexing %s' % path)
356 358 ri_cnt += i
357 359 ri_cnt_total += 1
358 360 riwc_cnt += iwc
359 361 riwc_cnt_total += iwc
360 362 log.debug('added %s files %s with content for repo %s' % (
361 363 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
362 364 )
363 365 log.debug('indexed %s files in total and %s with content' % (
364 366 ri_cnt_total, riwc_cnt_total)
365 367 )
366 368 finally:
367 369 if writer_is_dirty:
368 370 log.debug('>> COMMITING CHANGES TO FILE INDEX <<')
369 371 writer.commit(merge=True)
370 372 log.debug('>>> FINISHED REBUILDING FILE INDEX <<<')
371 373 else:
372 374 log.debug('>> NOTHING TO COMMIT TO FILE INDEX <<')
373 375 writer.cancel()
374 376
375 377 def build_indexes(self):
376 378 if os.path.exists(self.index_location):
377 379 log.debug('removing previous index')
378 380 rmtree(self.index_location)
379 381
380 382 if not os.path.exists(self.index_location):
381 383 os.mkdir(self.index_location)
382 384
383 385 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA,
384 386 indexname=CHGSET_IDX_NAME)
385 387 chgset_idx_writer = chgset_idx.writer()
386 388
387 389 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
388 390 file_idx_writer = file_idx.writer()
389 391 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
390 392 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
391 393
392 394 for repo_name, repo in self.repo_paths.items():
393 395 # skip indexing if there aren't any revisions
394 396 if len(repo) < 1:
395 397 continue
396 398
397 399 self.index_files(file_idx_writer, repo_name, repo)
398 400 self.index_changesets(chgset_idx_writer, repo_name, repo)
399 401
400 402 log.debug('>> COMMITING CHANGES <<')
401 403 file_idx_writer.commit(merge=True)
402 404 chgset_idx_writer.commit(merge=True)
403 405 log.debug('>>> FINISHED BUILDING INDEX <<<')
404 406
405 407 def update_indexes(self):
406 408 self.update_file_index()
407 409 self.update_changeset_index()
408 410
409 411 def run(self, full_index=False):
410 412 """Run daemon"""
411 413 if full_index or self.initial:
412 414 self.build_indexes()
413 415 else:
414 416 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now