##// END OF EJS Templates
fixed reindexing, and made some optimizations to reuse repo instances from repo scann list.
marcink -
r561:5f3b967d default
parent child Browse files
Show More
@@ -1,237 +1,243 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 # whoosh indexer daemon for rhodecode
4 4 # Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
5 5 #
6 6 # This program is free software; you can redistribute it and/or
7 7 # modify it under the terms of the GNU General Public License
8 8 # as published by the Free Software Foundation; version 2
9 9 # of the License or (at your opinion) any later version of the license.
10 10 #
11 11 # This program is distributed in the hope that it will be useful,
12 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 14 # GNU General Public License for more details.
15 15 #
16 16 # You should have received a copy of the GNU General Public License
17 17 # along with this program; if not, write to the Free Software
18 18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 19 # MA 02110-1301, USA.
20 20 """
21 21 Created on Jan 26, 2010
22 22
23 23 @author: marcink
24 24 A deamon will read from task table and run tasks
25 25 """
26 26 import sys
27 27 import os
28 28 from os.path import dirname as dn
29 29 from os.path import join as jn
30 30
31 31 #to get the rhodecode import
32 32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
33 33 sys.path.append(project_path)
34 34
35 35 from rhodecode.lib.pidlock import LockHeld, DaemonLock
36 36 from rhodecode.model.hg_model import HgModel
37 37 from rhodecode.lib.helpers import safe_unicode
38 38 from whoosh.index import create_in, open_dir
39 39 from shutil import rmtree
40 40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
41 41
42 42 from time import mktime
43 from vcs.backends import hg
43 from vcs.exceptions import ChangesetError
44 44
45 45 import logging
46 46
47 47 log = logging.getLogger('whooshIndexer')
48 48 # create logger
49 49 log.setLevel(logging.DEBUG)
50 50 log.propagate = False
51 51 # create console handler and set level to debug
52 52 ch = logging.StreamHandler()
53 53 ch.setLevel(logging.DEBUG)
54 54
55 55 # create formatter
56 56 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
57 57
58 58 # add formatter to ch
59 59 ch.setFormatter(formatter)
60 60
61 61 # add ch to logger
62 62 log.addHandler(ch)
63 63
64 64 def scan_paths(root_location):
65 65 return HgModel.repo_scan('/', root_location, None, True)
66 66
67 67 class WhooshIndexingDaemon(object):
68 68 """
69 69 Deamon for atomic jobs
70 70 """
71 71
72 72 def __init__(self, indexname='HG_INDEX', repo_location=None):
73 73 self.indexname = indexname
74 74 self.repo_location = repo_location
75 self.repo_paths = scan_paths(self.repo_location)
75 76 self.initial = False
76 77 if not os.path.isdir(IDX_LOCATION):
77 78 os.mkdir(IDX_LOCATION)
78 79 log.info('Cannot run incremental index since it does not'
79 80 ' yet exist running full build')
80 81 self.initial = True
81 82
82 def get_paths(self, root_dir):
83 def get_paths(self, repo):
83 84 """
84 85 recursive walk in root dir and return a set of all path in that dir
85 86 based on repository walk function
86 87 """
87 repo = hg.MercurialRepository(root_dir)
88 88 index_paths_ = set()
89 89 for topnode, dirs, files in repo.walk('/', 'tip'):
90 90 for f in files:
91 index_paths_.add(jn(root_dir, f.path))
91 index_paths_.add(jn(repo.path, f.path))
92 92 for dir in dirs:
93 93 for f in files:
94 index_paths_.add(jn(root_dir, f.path))
94 index_paths_.add(jn(repo.path, f.path))
95 95
96 96 return index_paths_
97 97
98 def get_node(self, repo, path):
99 n_path = path[len(repo.path) + 1:]
100 node = repo.get_changeset().get_node(n_path)
101 return node
102
103 def get_node_mtime(self, node):
104 return mktime(node.last_changeset.date.timetuple())
98 105
99 106 def add_doc(self, writer, path, repo):
100 107 """Adding doc to writer"""
101 n_path = path[len(repo.path) + 1:]
102 node = repo.get_changeset().get_node(n_path)
108 node = self.get_node(repo, path)
103 109
104 110 #we just index the content of chosen files
105 111 if node.extension in INDEX_EXTENSIONS:
106 112 log.debug(' >> %s [WITH CONTENT]' % path)
107 113 u_content = node.content
108 114 else:
109 115 log.debug(' >> %s' % path)
110 116 #just index file name without it's content
111 117 u_content = u''
112 118
113 119 writer.add_document(owner=unicode(repo.contact),
114 120 repository=safe_unicode(repo.name),
115 121 path=safe_unicode(path),
116 122 content=u_content,
117 modtime=mktime(node.last_changeset.date.timetuple()),
123 modtime=self.get_node_mtime(node),
118 124 extension=node.extension)
119 125
120 126
121 127 def build_index(self):
122 128 if os.path.exists(IDX_LOCATION):
123 129 log.debug('removing previous index')
124 130 rmtree(IDX_LOCATION)
125 131
126 132 if not os.path.exists(IDX_LOCATION):
127 133 os.mkdir(IDX_LOCATION)
128 134
129 135 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
130 136 writer = idx.writer()
131 137
132 for cnt, repo in enumerate(scan_paths(self.repo_location).values()):
138 for cnt, repo in enumerate(self.repo_paths.values()):
133 139 log.debug('building index @ %s' % repo.path)
134 140
135 for idx_path in self.get_paths(repo.path):
141 for idx_path in self.get_paths(repo):
136 142 self.add_doc(writer, idx_path, repo)
143
144 log.debug('>> COMMITING CHANGES <<')
137 145 writer.commit(merge=True)
138
139 146 log.debug('>>> FINISHED BUILDING INDEX <<<')
140 147
141 148
142 149 def update_index(self):
143 150 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
144 151
145 152 idx = open_dir(IDX_LOCATION, indexname=self.indexname)
146 153 # The set of all paths in the index
147 154 indexed_paths = set()
148 155 # The set of all paths we need to re-index
149 156 to_index = set()
150 157
151 158 reader = idx.reader()
152 159 writer = idx.writer()
153 160
154 161 # Loop over the stored fields in the index
155 162 for fields in reader.all_stored_fields():
156 163 indexed_path = fields['path']
157 164 indexed_paths.add(indexed_path)
158 165
159 if not os.path.exists(indexed_path):
166 repo = self.repo_paths[fields['repository']]
167
168 try:
169 node = self.get_node(repo, indexed_path)
170 except ChangesetError:
160 171 # This file was deleted since it was indexed
161 172 log.debug('removing from index %s' % indexed_path)
162 173 writer.delete_by_term('path', indexed_path)
163 174
164 175 else:
165 # Check if this file was changed since it
166 # was indexed
176 # Check if this file was changed since it was indexed
167 177 indexed_time = fields['modtime']
168
169 mtime = os.path.getmtime(indexed_path)
170
178 mtime = self.get_node_mtime(node)
171 179 if mtime > indexed_time:
172
173 180 # The file has changed, delete it and add it to the list of
174 181 # files to reindex
175 182 log.debug('adding to reindex list %s' % indexed_path)
176 183 writer.delete_by_term('path', indexed_path)
177 184 to_index.add(indexed_path)
178 #writer.commit()
179 185
180 186 # Loop over the files in the filesystem
181 187 # Assume we have a function that gathers the filenames of the
182 188 # documents to be indexed
183 for repo in scan_paths(self.repo_location).values():
184 for path in self.get_paths(repo.path):
189 for repo in self.repo_paths.values():
190 for path in self.get_paths(repo):
185 191 if path in to_index or path not in indexed_paths:
186 192 # This is either a file that's changed, or a new file
187 193 # that wasn't indexed before. So index it!
188 194 self.add_doc(writer, path, repo)
189 195 log.debug('reindexing %s' % path)
190 196
197 log.debug('>> COMMITING CHANGES <<')
191 198 writer.commit(merge=True)
192 #idx.optimize()
193 log.debug('>>> FINISHED <<<')
199 log.debug('>>> FINISHED REBUILDING INDEX <<<')
194 200
195 201 def run(self, full_index=False):
196 202 """Run daemon"""
197 203 if full_index or self.initial:
198 204 self.build_index()
199 205 else:
200 206 self.update_index()
201 207
202 208 if __name__ == "__main__":
203 209 arg = sys.argv[1:]
204 210 if len(arg) != 2:
205 211 sys.stderr.write('Please specify indexing type [full|incremental]'
206 212 'and path to repositories as script args \n')
207 213 sys.exit()
208 214
209 215
210 216 if arg[0] == 'full':
211 217 full_index = True
212 218 elif arg[0] == 'incremental':
213 219 # False means looking just for changes
214 220 full_index = False
215 221 else:
216 222 sys.stdout.write('Please use [full|incremental]'
217 223 ' as script first arg \n')
218 224 sys.exit()
219 225
220 226 if not os.path.isdir(arg[1]):
221 227 sys.stderr.write('%s is not a valid path \n' % arg[1])
222 228 sys.exit()
223 229 else:
224 230 if arg[1].endswith('/'):
225 231 repo_location = arg[1] + '*'
226 232 else:
227 233 repo_location = arg[1] + '/*'
228 234
229 235 try:
230 236 l = DaemonLock()
231 237 WhooshIndexingDaemon(repo_location=repo_location)\
232 238 .run(full_index=full_index)
233 239 l.release()
234 240 reload(logging)
235 241 except LockHeld:
236 242 sys.exit(1)
237 243
General Comments 0
You need to be logged in to leave comments. Login now