##// END OF EJS Templates
import with_statment to make daemon.py python 2.5 compatible
Indra Talip -
r2641:cfcd981d beta
parent child Browse files
Show More
@@ -1,372 +1,373 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 from __future__ import with_statement
25 26
26 27 import os
27 28 import sys
28 29 import logging
29 30 import traceback
30 31
31 32 from shutil import rmtree
32 33 from time import mktime
33 34
34 35 from os.path import dirname as dn
35 36 from os.path import join as jn
36 37
37 38 #to get the rhodecode import
38 39 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
39 40 sys.path.append(project_path)
40 41
41 42 from rhodecode.config.conf import INDEX_EXTENSIONS
42 43 from rhodecode.model.scm import ScmModel
43 44 from rhodecode.lib.utils2 import safe_unicode
44 45 from rhodecode.lib.indexers import SCHEMA, IDX_NAME, CHGSETS_SCHEMA, CHGSET_IDX_NAME
45 46
46 47 from rhodecode.lib.vcs.exceptions import ChangesetError, RepositoryError, \
47 48 NodeDoesNotExistError
48 49
49 50 from whoosh.index import create_in, open_dir, exists_in
50 51 from whoosh.query import *
51 52 from whoosh.qparser import QueryParser
52 53
53 54 log = logging.getLogger('whoosh_indexer')
54 55
55 56
56 57 class WhooshIndexingDaemon(object):
57 58 """
58 59 Daemon for atomic indexing jobs
59 60 """
60 61
61 62 def __init__(self, indexname=IDX_NAME, index_location=None,
62 63 repo_location=None, sa=None, repo_list=None,
63 64 repo_update_list=None):
64 65 self.indexname = indexname
65 66
66 67 self.index_location = index_location
67 68 if not index_location:
68 69 raise Exception('You have to provide index location')
69 70
70 71 self.repo_location = repo_location
71 72 if not repo_location:
72 73 raise Exception('You have to provide repositories location')
73 74
74 75 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
75 76
76 77 #filter repo list
77 78 if repo_list:
78 79 self.filtered_repo_paths = {}
79 80 for repo_name, repo in self.repo_paths.items():
80 81 if repo_name in repo_list:
81 82 self.filtered_repo_paths[repo_name] = repo
82 83
83 84 self.repo_paths = self.filtered_repo_paths
84 85
85 86 #filter update repo list
86 87 self.filtered_repo_update_paths = {}
87 88 if repo_update_list:
88 89 self.filtered_repo_update_paths = {}
89 90 for repo_name, repo in self.repo_paths.items():
90 91 if repo_name in repo_update_list:
91 92 self.filtered_repo_update_paths[repo_name] = repo
92 93 self.repo_paths = self.filtered_repo_update_paths
93 94
94 95 self.initial = True
95 96 if not os.path.isdir(self.index_location):
96 97 os.makedirs(self.index_location)
97 98 log.info('Cannot run incremental index since it does not'
98 99 ' yet exist running full build')
99 100 elif not exists_in(self.index_location, IDX_NAME):
100 101 log.info('Running full index build as the file content'
101 102 ' index does not exist')
102 103 elif not exists_in(self.index_location, CHGSET_IDX_NAME):
103 104 log.info('Running full index build as the changeset'
104 105 ' index does not exist')
105 106 else:
106 107 self.initial = False
107 108
108 109 def get_paths(self, repo):
109 110 """
110 111 recursive walk in root dir and return a set of all path in that dir
111 112 based on repository walk function
112 113 """
113 114 index_paths_ = set()
114 115 try:
115 116 tip = repo.get_changeset('tip')
116 117 for topnode, dirs, files in tip.walk('/'):
117 118 for f in files:
118 119 index_paths_.add(jn(repo.path, f.path))
119 120
120 121 except RepositoryError, e:
121 122 log.debug(traceback.format_exc())
122 123 pass
123 124 return index_paths_
124 125
125 126 def get_node(self, repo, path):
126 127 n_path = path[len(repo.path) + 1:]
127 128 node = repo.get_changeset().get_node(n_path)
128 129 return node
129 130
130 131 def get_node_mtime(self, node):
131 132 return mktime(node.last_changeset.date.timetuple())
132 133
133 134 def add_doc(self, writer, path, repo, repo_name):
134 135 """
135 136 Adding doc to writer this function itself fetches data from
136 137 the instance of vcs backend
137 138 """
138 139
139 140 node = self.get_node(repo, path)
140 141 indexed = indexed_w_content = 0
141 142 # we just index the content of chosen files, and skip binary files
142 143 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
143 144 u_content = node.content
144 145 if not isinstance(u_content, unicode):
145 146 log.warning(' >> %s Could not get this content as unicode '
146 147 'replacing with empty content' % path)
147 148 u_content = u''
148 149 else:
149 150 log.debug(' >> %s [WITH CONTENT]' % path)
150 151 indexed_w_content += 1
151 152
152 153 else:
153 154 log.debug(' >> %s' % path)
154 155 # just index file name without it's content
155 156 u_content = u''
156 157 indexed += 1
157 158
158 159 p = safe_unicode(path)
159 160 writer.add_document(
160 161 fileid=p,
161 162 owner=unicode(repo.contact),
162 163 repository=safe_unicode(repo_name),
163 164 path=p,
164 165 content=u_content,
165 166 modtime=self.get_node_mtime(node),
166 167 extension=node.extension
167 168 )
168 169 return indexed, indexed_w_content
169 170
170 171 def index_changesets(self, writer, repo_name, repo, start_rev=0):
171 172 """
172 173 Add all changeset in the vcs repo starting at start_rev
173 174 to the index writer
174 175 """
175 176
176 177 log.debug('indexing changesets in %s[%d:]' % (repo_name, start_rev))
177 178
178 179 indexed=0
179 180 for cs in repo[start_rev:]:
180 181 writer.add_document(
181 182 path=unicode(cs.raw_id),
182 183 owner=unicode(repo.contact),
183 184 repository=safe_unicode(repo_name),
184 185 author=cs.author,
185 186 message=cs.message,
186 187 revision=cs.revision,
187 188 last=cs.last,
188 189 added=u' '.join([node.path for node in cs.added]).lower(),
189 190 removed=u' '.join([node.path for node in cs.removed]).lower(),
190 191 changed=u' '.join([node.path for node in cs.changed]).lower(),
191 192 parents=u' '.join([cs.raw_id for cs in cs.parents]),
192 193 )
193 194 indexed += 1
194 195
195 196 log.debug('indexed %d changesets for repo %s' % (indexed, repo_name))
196 197
197 198 def index_files(self, file_idx_writer, repo_name, repo):
198 199 i_cnt = iwc_cnt = 0
199 200 log.debug('building index for [%s]' % repo.path)
200 201 for idx_path in self.get_paths(repo):
201 202 i, iwc = self.add_doc(file_idx_writer, idx_path, repo, repo_name)
202 203 i_cnt += i
203 204 iwc_cnt += iwc
204 205
205 206 log.debug('added %s files %s with content for repo %s' % (i_cnt + iwc_cnt, iwc_cnt, repo.path))
206 207
207 208 def update_changeset_index(self):
208 209 idx = open_dir(self.index_location, indexname=CHGSET_IDX_NAME)
209 210
210 211 with idx.searcher() as searcher:
211 212 writer = idx.writer()
212 213 writer_is_dirty = False
213 214 try:
214 215 for repo_name, repo in self.repo_paths.items():
215 216 # skip indexing if there aren't any revs in the repo
216 217 revs = repo.revisions
217 218 if len(revs) < 1:
218 219 continue
219 220
220 221 qp = QueryParser('repository', schema=CHGSETS_SCHEMA)
221 222 q = qp.parse(u"last:t AND %s" % repo_name)
222 223
223 224 results = searcher.search(q, sortedby='revision')
224 225
225 226 last_rev = 0
226 227 if len(results) > 0:
227 228 last_rev = results[0]['revision']
228 229
229 230 # there are new changesets to index or a new repo to index
230 231 if last_rev == 0 or len(revs) > last_rev + 1:
231 232 # delete the docs in the index for the previous last changeset(s)
232 233 for hit in results:
233 234 q = qp.parse(u"last:t AND %s AND path:%s" %
234 235 (repo_name, hit['path']))
235 236 writer.delete_by_query(q)
236 237
237 238 # index from the previous last changeset + all new ones
238 239 self.index_changesets(writer, repo_name, repo, last_rev)
239 240 writer_is_dirty = True
240 241
241 242 finally:
242 243 if writer_is_dirty:
243 244 log.debug('>> COMMITING CHANGES TO CHANGESET INDEX<<')
244 245 writer.commit(merge=True)
245 246 log.debug('>> COMMITTED CHANGES TO CHANGESET INDEX<<')
246 247 else:
247 248 writer.cancel
248 249
249 250 def update_file_index(self):
250 251 log.debug((u'STARTING INCREMENTAL INDEXING UPDATE FOR EXTENSIONS %s '
251 252 'AND REPOS %s') % (INDEX_EXTENSIONS, self.repo_paths.keys()))
252 253
253 254 idx = open_dir(self.index_location, indexname=self.indexname)
254 255 # The set of all paths in the index
255 256 indexed_paths = set()
256 257 # The set of all paths we need to re-index
257 258 to_index = set()
258 259
259 260 writer = idx.writer()
260 261 writer_is_dirty = False
261 262 try:
262 263 with idx.reader() as reader:
263 264
264 265 # Loop over the stored fields in the index
265 266 for fields in reader.all_stored_fields():
266 267 indexed_path = fields['path']
267 268 indexed_repo_path = fields['repository']
268 269 indexed_paths.add(indexed_path)
269 270
270 271 if not indexed_repo_path in self.filtered_repo_update_paths:
271 272 continue
272 273
273 274 repo = self.repo_paths[indexed_repo_path]
274 275
275 276 try:
276 277 node = self.get_node(repo, indexed_path)
277 278 # Check if this file was changed since it was indexed
278 279 indexed_time = fields['modtime']
279 280 mtime = self.get_node_mtime(node)
280 281 if mtime > indexed_time:
281 282 # The file has changed, delete it and add it to the list of
282 283 # files to reindex
283 284 log.debug('adding to reindex list %s mtime: %s vs %s' % (
284 285 indexed_path, mtime, indexed_time)
285 286 )
286 287 writer.delete_by_term('fileid', indexed_path)
287 288 writer_is_dirty = True
288 289
289 290 to_index.add(indexed_path)
290 291 except (ChangesetError, NodeDoesNotExistError):
291 292 # This file was deleted since it was indexed
292 293 log.debug('removing from index %s' % indexed_path)
293 294 writer.delete_by_term('path', indexed_path)
294 295 writer_is_dirty = True
295 296
296 297 # Loop over the files in the filesystem
297 298 # Assume we have a function that gathers the filenames of the
298 299 # documents to be indexed
299 300 ri_cnt_total = 0 # indexed
300 301 riwc_cnt_total = 0 # indexed with content
301 302 for repo_name, repo in self.repo_paths.items():
302 303 # skip indexing if there aren't any revisions
303 304 if len(repo) < 1:
304 305 continue
305 306 ri_cnt = 0 # indexed
306 307 riwc_cnt = 0 # indexed with content
307 308 for path in self.get_paths(repo):
308 309 path = safe_unicode(path)
309 310 if path in to_index or path not in indexed_paths:
310 311
311 312 # This is either a file that's changed, or a new file
312 313 # that wasn't indexed before. So index it!
313 314 i, iwc = self.add_doc(writer, path, repo, repo_name)
314 315 writer_is_dirty = True
315 316 log.debug('re indexing %s' % path)
316 317 ri_cnt += i
317 318 ri_cnt_total += 1
318 319 riwc_cnt += iwc
319 320 riwc_cnt_total += iwc
320 321 log.debug('added %s files %s with content for repo %s' % (
321 322 ri_cnt + riwc_cnt, riwc_cnt, repo.path)
322 323 )
323 324 log.debug('indexed %s files in total and %s with content' % (
324 325 ri_cnt_total, riwc_cnt_total)
325 326 )
326 327 finally:
327 328 if writer_is_dirty:
328 329 log.debug('>> COMMITING CHANGES <<')
329 330 writer.commit(merge=True)
330 331 log.debug('>>> FINISHED REBUILDING INDEX <<<')
331 332 else:
332 333 writer.cancel()
333 334
334 335 def build_indexes(self):
335 336 if os.path.exists(self.index_location):
336 337 log.debug('removing previous index')
337 338 rmtree(self.index_location)
338 339
339 340 if not os.path.exists(self.index_location):
340 341 os.mkdir(self.index_location)
341 342
342 343 chgset_idx = create_in(self.index_location, CHGSETS_SCHEMA, indexname=CHGSET_IDX_NAME)
343 344 chgset_idx_writer = chgset_idx.writer()
344 345
345 346 file_idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
346 347 file_idx_writer = file_idx.writer()
347 348 log.debug('BUILDING INDEX FOR EXTENSIONS %s '
348 349 'AND REPOS %s' % (INDEX_EXTENSIONS, self.repo_paths.keys()))
349 350
350 351 for repo_name, repo in self.repo_paths.items():
351 352 # skip indexing if there aren't any revisions
352 353 if len(repo) < 1:
353 354 continue
354 355
355 356 self.index_files(file_idx_writer, repo_name, repo)
356 357 self.index_changesets(chgset_idx_writer, repo_name, repo)
357 358
358 359 log.debug('>> COMMITING CHANGES <<')
359 360 file_idx_writer.commit(merge=True)
360 361 chgset_idx_writer.commit(merge=True)
361 362 log.debug('>>> FINISHED BUILDING INDEX <<<')
362 363
363 364 def update_indexes(self):
364 365 self.update_file_index()
365 366 self.update_changeset_index()
366 367
367 368 def run(self, full_index=False):
368 369 """Run daemon"""
369 370 if full_index or self.initial:
370 371 self.build_indexes()
371 372 else:
372 373 self.update_indexes()
General Comments 0
You need to be logged in to leave comments. Login now