##// END OF EJS Templates
fixes issue with whoosh reindexing files that were removed or renamed
marcink -
r1711:b369bec5 beta
parent child Browse files
Show More
@@ -1,237 +1,238 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import logging
29 29 import traceback
30 30
31 31 from shutil import rmtree
32 32 from time import mktime
33 33
34 34 from os.path import dirname as dn
35 35 from os.path import join as jn
36 36
37 37 #to get the rhodecode import
38 38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
39 39 sys.path.append(project_path)
40 40
41 41
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib import safe_unicode
44 44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
45 45
46 from vcs.exceptions import ChangesetError, RepositoryError
46 from vcs.exceptions import ChangesetError, RepositoryError, \
47 NodeDoesNotExistError
47 48
48 49 from whoosh.index import create_in, open_dir
49 50
50 51
51 52
52 53 log = logging.getLogger('whooshIndexer')
53 54 # create logger
54 55 log.setLevel(logging.DEBUG)
55 56 log.propagate = False
56 57 # create console handler and set level to debug
57 58 ch = logging.StreamHandler()
58 59 ch.setLevel(logging.DEBUG)
59 60
60 61 # create formatter
61 62 formatter = logging.Formatter("%(asctime)s - %(name)s -"
62 63 " %(levelname)s - %(message)s")
63 64
64 65 # add formatter to ch
65 66 ch.setFormatter(formatter)
66 67
67 68 # add ch to logger
68 69 log.addHandler(ch)
69 70
70 71 class WhooshIndexingDaemon(object):
71 72 """
72 73 Daemon for atomic jobs
73 74 """
74 75
75 76 def __init__(self, indexname='HG_INDEX', index_location=None,
76 77 repo_location=None, sa=None, repo_list=None):
77 78 self.indexname = indexname
78 79
79 80 self.index_location = index_location
80 81 if not index_location:
81 82 raise Exception('You have to provide index location')
82 83
83 84 self.repo_location = repo_location
84 85 if not repo_location:
85 86 raise Exception('You have to provide repositories location')
86 87
87 88 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
88 89
89 90 if repo_list:
90 91 filtered_repo_paths = {}
91 92 for repo_name, repo in self.repo_paths.items():
92 93 if repo_name in repo_list:
93 94 filtered_repo_paths[repo_name] = repo
94 95
95 96 self.repo_paths = filtered_repo_paths
96 97
97 98
98 99 self.initial = False
99 100 if not os.path.isdir(self.index_location):
100 101 os.makedirs(self.index_location)
101 102 log.info('Cannot run incremental index since it does not'
102 103 ' yet exist running full build')
103 104 self.initial = True
104 105
105 106 def get_paths(self, repo):
106 107 """recursive walk in root dir and return a set of all path in that dir
107 108 based on repository walk function
108 109 """
109 110 index_paths_ = set()
110 111 try:
111 112 tip = repo.get_changeset('tip')
112 113 for topnode, dirs, files in tip.walk('/'):
113 114 for f in files:
114 115 index_paths_.add(jn(repo.path, f.path))
115 116
116 117 except RepositoryError, e:
117 118 log.debug(traceback.format_exc())
118 119 pass
119 120 return index_paths_
120 121
121 122 def get_node(self, repo, path):
122 123 n_path = path[len(repo.path) + 1:]
123 124 node = repo.get_changeset().get_node(n_path)
124 125 return node
125 126
126 127 def get_node_mtime(self, node):
127 128 return mktime(node.last_changeset.date.timetuple())
128 129
129 130 def add_doc(self, writer, path, repo, repo_name):
130 131 """Adding doc to writer this function itself fetches data from
131 132 the instance of vcs backend"""
132 133 node = self.get_node(repo, path)
133 134
134 135 #we just index the content of chosen files, and skip binary files
135 136 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
136 137
137 138 u_content = node.content
138 139 if not isinstance(u_content, unicode):
139 140 log.warning(' >> %s Could not get this content as unicode '
140 141 'replacing with empty content', path)
141 142 u_content = u''
142 143 else:
143 144 log.debug(' >> %s [WITH CONTENT]' % path)
144 145
145 146 else:
146 147 log.debug(' >> %s' % path)
147 148 #just index file name without it's content
148 149 u_content = u''
149 150
150 151 writer.add_document(owner=unicode(repo.contact),
151 152 repository=safe_unicode(repo_name),
152 153 path=safe_unicode(path),
153 154 content=u_content,
154 155 modtime=self.get_node_mtime(node),
155 156 extension=node.extension)
156 157
157 158
158 159 def build_index(self):
159 160 if os.path.exists(self.index_location):
160 161 log.debug('removing previous index')
161 162 rmtree(self.index_location)
162 163
163 164 if not os.path.exists(self.index_location):
164 165 os.mkdir(self.index_location)
165 166
166 167 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
167 168 writer = idx.writer()
168 169
169 170 for repo_name, repo in self.repo_paths.items():
170 171 log.debug('building index @ %s' % repo.path)
171 172
172 173 for idx_path in self.get_paths(repo):
173 174 self.add_doc(writer, idx_path, repo, repo_name)
174 175
175 176 log.debug('>> COMMITING CHANGES <<')
176 177 writer.commit(merge=True)
177 178 log.debug('>>> FINISHED BUILDING INDEX <<<')
178 179
179 180
180 181 def update_index(self):
181 182 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
182 183
183 184 idx = open_dir(self.index_location, indexname=self.indexname)
184 185 # The set of all paths in the index
185 186 indexed_paths = set()
186 187 # The set of all paths we need to re-index
187 188 to_index = set()
188 189
189 190 reader = idx.reader()
190 191 writer = idx.writer()
191 192
192 193 # Loop over the stored fields in the index
193 194 for fields in reader.all_stored_fields():
194 195 indexed_path = fields['path']
195 196 indexed_paths.add(indexed_path)
196 197
197 198 repo = self.repo_paths[fields['repository']]
198 199
199 200 try:
200 201 node = self.get_node(repo, indexed_path)
201 except ChangesetError:
202 except (ChangesetError, NodeDoesNotExistError):
202 203 # This file was deleted since it was indexed
203 204 log.debug('removing from index %s' % indexed_path)
204 205 writer.delete_by_term('path', indexed_path)
205 206
206 207 else:
207 208 # Check if this file was changed since it was indexed
208 209 indexed_time = fields['modtime']
209 210 mtime = self.get_node_mtime(node)
210 211 if mtime > indexed_time:
211 212 # The file has changed, delete it and add it to the list of
212 213 # files to reindex
213 214 log.debug('adding to reindex list %s' % indexed_path)
214 215 writer.delete_by_term('path', indexed_path)
215 216 to_index.add(indexed_path)
216 217
217 218 # Loop over the files in the filesystem
218 219 # Assume we have a function that gathers the filenames of the
219 220 # documents to be indexed
220 221 for repo_name, repo in self.repo_paths.items():
221 222 for path in self.get_paths(repo):
222 223 if path in to_index or path not in indexed_paths:
223 224 # This is either a file that's changed, or a new file
224 225 # that wasn't indexed before. So index it!
225 226 self.add_doc(writer, path, repo, repo_name)
226 227 log.debug('re indexing %s' % path)
227 228
228 229 log.debug('>> COMMITING CHANGES <<')
229 230 writer.commit(merge=True)
230 231 log.debug('>>> FINISHED REBUILDING INDEX <<<')
231 232
232 233 def run(self, full_index=False):
233 234 """Run daemon"""
234 235 if full_index or self.initial:
235 236 self.build_index()
236 237 else:
237 238 self.update_index()
General Comments 0
You need to be logged in to leave comments. Login now