##// END OF EJS Templates
Fix possible exception about repo_name not defined, on whoosh indexer when using index-only option
marcink -
r2839:c0ddc86b beta
parent child Browse files
Show More
@@ -1,413 +1,414 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26
27 27 import os
28 28 import sys
29 29 import logging
30 30 import traceback
31 31
32 32 from shutil import rmtree
33 33 from time import mktime
34 34
35 35 from os.path import dirname as dn
36 36 from os.path import join as jn
37 37
38 38 #to get the rhodecode import
39 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
40 40 sys.path.append(project_path)
41 41
42 42 from rhodecode.config.conf import INDEX_EXTENSIONS
43 43 from rhodecode.model.scm import ScmModel
44 44 from rhodecode.lib.utils2 import safe_unicode
45 45 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, \
46 46 CHGSET_IDX_NAME
47 47
48 48 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
49 49 NodeDoesNotExistError
50 50
51 51 from whoosh.index import create_in, open_dir, exists_in
52 52 from whoosh.query import *
53 53 from whoosh.qparser import QueryParser
54 54
55 55 log = logging.getLogger('whoosh_indexer')
56 56
57 57
58 58 class WhooshIndexingDaemon(object):
59 59 """
60 60 Daemon for atomic indexing jobs
61 61 """
62 62
63 63 def __init__(self, indexname=IDX_NAME, index_location=None,
64 64 repo_location=None, sa=None, repo_list=None,
65 65 repo_update_list=None):
66 66 self.indexname = indexname
67 67
68 68 self.index_location = index_location
69 69 if not index_location:
70 70 raise Exception('You have to provide index location')
71 71
72 72 self.repo_location = repo_location
73 73 if not repo_location:
74 74 raise Exception('You have to provide repositories location')
75 75
76 76 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
77 77
78 78 #filter repo list
79 79 if repo_list:
80 80 self.filtered_repo_paths = {}
81 81 for repo_name, repo in self.repo_paths.items():
82 82 if repo_name in repo_list:
83 83 self.filtered_repo_paths[repo_name] = repo
84 84
85 85 self.repo_paths = self.filtered_repo_paths
86 86
87 87 #filter update repo list
88 88 self.filtered_repo_update_paths = {}
89 89 if repo_update_list:
90 90 self.filtered_repo_update_paths = {}
91 91 for repo_name, repo in self.repo_paths.items():
92 92 if repo_name in repo_update_list:
93 93 self.filtered_repo_update_paths[repo_name] = repo
94 94 self.repo_paths = self.filtered_repo_update_paths
95 95
96 96 self.initial = True
97 97 if not os.path.isdir(self.index_location):
98 98 os.makedirs(self.index_location)
99 99 log.info('Cannot run incremental index since it does not'
100 100 ' yet exist running full build')
101 101 elif not exists_in(self.index_location, IDX_NAME):
102 102 log.info('Running full index build as the file content'
103 103 ' index does not exist')
104 104 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
105 105 log.info('Running full index build as the changeset'
106 106 ' index does not exist')
107 107 else:
108 108 self.initial = False
109 109
110 110 def get_paths(self, repo):
111 111 """
112 112 recursive walk in root dir and return a set of all path in that dir
113 113 based on repository walk function
114 114 """
115 115 index_paths_ = set()
116 116 try:
117 117 tip = repo.get_changeset('tip')
118 118 for _topnode, _dirs, files in tip.walk('/'):
119 119 for f in files:
120 120 index_paths_.add(jn(repo.path, f.path))
121 121
122 122 except RepositoryError:
123 123 log.debug(traceback.format_exc())
124 124 pass
125 125 return index_paths_
126 126
127 127 def get_node(self, repo, path):
128 128 n_path = path[len(repo.path) + 1:]
129 129 node = repo.get_changeset().get_node(n_path)
130 130 return node
131 131
132 132 def get_node_mtime(self, node):
133 133 return mktime(node.last_changeset.date.timetuple())
134 134
135 135 def add_doc(self, writer, path, repo, repo_name):
136 136 """
137 137 Adding doc to writer this function itself fetches data from
138 138 the instance of vcs backend
139 139 """
140 140
141 141 node = self.get_node(repo, path)
142 142 indexed = indexed_w_content = 0
143 143 # we just index the content of chosen files, and skip binary files
144 144 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
145 145 u_content = node.content
146 146 if not isinstance(u_content, unicode):
147 147 log.warning(' >> %s Could not get this content as unicode '
148 148 'replacing with empty content' % path)
149 149 u_content = u''
150 150 else:
151 151 log.debug(' >> %s [WITH CONTENT]' % path)
152 152 indexed_w_content += 1
153 153
154 154 else:
155 155 log.debug(' >> %s' % path)
156 156 # just index file name without it's content
157 157 u_content = u''
158 158 indexed += 1
159 159
160 160 p = safe_unicode(path)
161 161 writer.add_document(
162 162 fileid=p,
163 163 owner=unicode(repo.contact),
164 164 repository=safe_unicode(repo_name),
165 165 path=p,
166 166 content=u_content,
167 167 modtime=self.get_node_mtime(node),
168 168 extension=node.extension
169 169 )
170 170 return indexed, indexed_w_content
171 171
172 172 def index_changesets(self, writer, repo_name, repo, start_rev=None):
173 173 """
174 174 Add all changeset in the vcs repo starting at start_rev
175 175 to the index writer
176 176
177 177 :param writer: the whoosh index writer to add to
178 178 :param repo_name: name of the repository from whence the
179 179 changeset originates including the repository group
180 180 :param repo: the vcs repository instance to index changesets for,
181 181 the presumption is the repo has changesets to index
182 182 :param start_rev=None: the full sha id to start indexing from
183 183 if start_rev is None then index from the first changeset in
184 184 the repo
185 185 """
186 186
187 187 if start_rev is None:
188 188 start_rev = repo[0].raw_id
189 189
190 190 log.debug('indexing changesets in %s starting at rev: %s' %
191 191 (repo_name, start_rev))
192 192
193 193 indexed = 0
194 194 for cs in repo.get_changesets(start=start_rev):
195 195 log.debug(' >> %s' % cs)
196 196 writer.add_document(
197 197 raw_id=unicode(cs.raw_id),
198 198 owner=unicode(repo.contact),
199 199 date=cs._timestamp,
200 200 repository=safe_unicode(repo_name),
201 201 author=cs.author,
202 202 message=cs.message,
203 203 last=cs.last,
204 204 added=u' '.join([safe_unicode(node.path) for node in cs.added]).lower(),
205 205 removed=u' '.join([safe_unicode(node.path) for node in cs.removed]).lower(),
206 206 changed=u' '.join([safe_unicode(node.path) for node in cs.changed]).lower(),
207 207 parents=u' '.join([cs.raw_id for cs in cs.parents]),
208 208 )
209 209 indexed += 1
210 210
211 211 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
212 212 return indexed
213 213
214 214 def index_files(self, file_idx_writer, repo_name, repo):
215 215 """
216 216 Index files for given repo_name
217 217
218 218 :param file_idx_writer: the whoosh index writer to add to
219 219 :param repo_name: name of the repository we're indexing
220 220 :param repo: instance of vcs repo
221 221 """
222 222 i_cnt = iwc_cnt = 0
223 223 log.debug('building index for [%s]' % repo.path)
224 224 for idx_path in self.get_paths(repo):
225 225 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
226 226 i_cnt += i
227 227 iwc_cnt += iwc
228 228
229 229 log.debug('added %s files %s with content for repo %s' %
230 230 (i_cnt + iwc_cnt, iwc_cnt, repo.path))
231 231 return i_cnt, iwc_cnt
232 232
233 233 def update_changeset_index(self):
234 234 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
235 235
236 236 with idx.searcher() as searcher:
237 237 writer = idx.writer()
238 238 writer_is_dirty = False
239 239 try:
240 240 indexed_total = 0
241 repo_name = None
241 242 for repo_name, repo in self.repo_paths.items():
242 243 # skip indexing if there aren't any revs in the repo
243 244 num_of_revs = len(repo)
244 245 if num_of_revs < 1:
245 246 continue
246 247
247 248 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
248 249 q = qp.parse(u"last:t AND %s" % repo_name)
249 250
250 251 results = searcher.search(q)
251 252
252 253 # default to scanning the entire repo
253 254 last_rev = 0
254 255 start_id = None
255 256
256 257 if len(results) > 0:
257 258 # assuming that there is only one result, if not this
258 259 # may require a full re-index.
259 260 start_id = results[0]['raw_id']
260 261 last_rev = repo.get_changeset(revision=start_id).revision
261 262
262 263 # there are new changesets to index or a new repo to index
263 264 if last_rev == 0 or num_of_revs > last_rev + 1:
264 265 # delete the docs in the index for the previous
265 266 # last changeset(s)
266 267 for hit in results:
267 268 q = qp.parse(u"last:t AND %s AND raw_id:%s" %
268 269 (repo_name, hit['raw_id']))
269 270 writer.delete_by_query(q)
270 271
271 272 # index from the previous last changeset + all new ones
272 273 indexed_total += self.index_changesets(writer,
273 274 repo_name, repo, start_id)
274 275 writer_is_dirty = True
275 276 log.debug('indexed %s changesets for repo %s' % (
276 277 indexed_total, repo_name)
277 278 )
278 279 finally:
279 280 if writer_is_dirty:
280 281 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
281 282 writer.commit(merge=True)
282 283 log.debug('>> COMMITTED CHANGES TO CHANGESET INDEX<<')
283 284 else:
284 285 writer.cancel
285 286 log.debug('>> NOTHING TO COMMIT<<')
286 287
287 288 def update_file_index(self):
288 289 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
289 290 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
290 291
291 292 idx = open_dir(self.index_location, indexname=self.indexname)
292 293 # The set of all paths in the index
293 294 indexed_paths = set()
294 295 # The set of all paths we need to re-index
295 296 to_index = set()
296 297
297 298 writer = idx.writer()
298 299 writer_is_dirty = False
299 300 try:
300 301 with idx.reader() as reader:
301 302
302 303 # Loop over the stored fields in the index
303 304 for fields in reader.all_stored_fields():
304 305 indexed_path = fields['path']
305 306 indexed_repo_path = fields['repository']
306 307 indexed_paths.add(indexed_path)
307 308
308 309 if not indexed_repo_path in self.filtered_repo_update_paths:
309 310 continue
310 311
311 312 repo = self.repo_paths[indexed_repo_path]
312 313
313 314 try:
314 315 node = self.get_node(repo, indexed_path)
315 316 # Check if this file was changed since it was indexed
316 317 indexed_time = fields['modtime']
317 318 mtime = self.get_node_mtime(node)
318 319 if mtime > indexed_time:
319 320 # The file has changed, delete it and add it to
320 321 # the list of files to reindex
321 322 log.debug(
322 323 'adding to reindex list %s mtime: %s vs %s' % (
323 324 indexed_path, mtime, indexed_time)
324 325 )
325 326 writer.delete_by_term('fileid', indexed_path)
326 327 writer_is_dirty = True
327 328
328 329 to_index.add(indexed_path)
329 330 except (ChangesetError, NodeDoesNotExistError):
330 331 # This file was deleted since it was indexed
331 332 log.debug('removing from index %s' % indexed_path)
332 333 writer.delete_by_term('path', indexed_path)
333 334 writer_is_dirty = True
334 335
335 336 # Loop over the files in the filesystem
336 337 # Assume we have a function that gathers the filenames of the
337 338 # documents to be indexed
338 339 ri_cnt_total = 0 # indexed
339 340 riwc_cnt_total = 0 # indexed with content
340 341 for repo_name, repo in self.repo_paths.items():
341 342 # skip indexing if there aren't any revisions
342 343 if len(repo) < 1:
343 344 continue
344 345 ri_cnt = 0 # indexed
345 346 riwc_cnt = 0 # indexed with content
346 347 for path in self.get_paths(repo):
347 348 path = safe_unicode(path)
348 349 if path in to_index or path not in indexed_paths:
349 350
350 351 # This is either a file that's changed, or a new file
351 352 # that wasn't indexed before. So index it!
352 353 i, iwc = self.add_doc(writer, path, repo, repo_name)
353 354 writer_is_dirty = True
354 355 log.debug('re indexing %s' % path)
355 356 ri_cnt += i
356 357 ri_cnt_total += 1
357 358 riwc_cnt += iwc
358 359 riwc_cnt_total += iwc
359 360 log.debug('added %s files %s with content for repo %s' % (
360 361 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
361 362 )
362 363 log.debug('indexed %s files in total and %s with content' % (
363 364 ri_cnt_total, riwc_cnt_total)
364 365 )
365 366 finally:
366 367 if writer_is_dirty:
367 368 log.debug('>> COMMITING CHANGES <<')
368 369 writer.commit(merge=True)
369 370 log.debug('>>> FINISHED REBUILDING INDEX <<<')
370 371 else:
371 372 log.debug('>> NOTHING TO COMMIT<<')
372 373 writer.cancel()
373 374
374 375 def build_indexes(self):
375 376 if os.path.exists(self.index_location):
376 377 log.debug('removing previous index')
377 378 rmtree(self.index_location)
378 379
379 380 if not os.path.exists(self.index_location):
380 381 os.mkdir(self.index_location)
381 382
382 383 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA,
383 384 indexname=CHGSET_IDX_NAME)
384 385 chgset_idx_writer = chgset_idx.writer()
385 386
386 387 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
387 388 file_idx_writer = file_idx.writer()
388 389 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
389 390 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
390 391
391 392 for repo_name, repo in self.repo_paths.items():
392 393 # skip indexing if there aren't any revisions
393 394 if len(repo) < 1:
394 395 continue
395 396
396 397 self.index_files(file_idx_writer, repo_name, repo)
397 398 self.index_changesets(chgset_idx_writer, repo_name, repo)
398 399
399 400 log.debug('>> COMMITING CHANGES <<')
400 401 file_idx_writer.commit(merge=True)
401 402 chgset_idx_writer.commit(merge=True)
402 403 log.debug('>>> FINISHED BUILDING INDEX <<<')
403 404
404 405 def update_indexes(self):
405 406 self.update_file_index()
406 407 self.update_changeset_index()
407 408
408 409 def run(self, full_index=False):
409 410 """Run daemon"""
410 411 if full_index or self.initial:
411 412 self.build_indexes()
412 413 else:
413 414 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now