##// END OF EJS Templates
fixed reindexing, and made some optimizations to reuse repo instances from repo scann list.
marcink -
r561:5f3b967d default
parent child Browse files
Show More
@@ -1,237 +1,243 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 # whoosh indexer daemon for rhodecode
3 # whoosh indexer daemon for rhodecode
4 # Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
4 # Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
5 #
5 #
6 # This program is free software; you can redistribute it and/or
6 # This program is free software; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License
7 # modify it under the terms of the GNU General Public License
8 # as published by the Free Software Foundation; version 2
8 # as published by the Free Software Foundation; version 2
9 # of the License or (at your opinion) any later version of the license.
9 # of the License or (at your opinion) any later version of the license.
10 #
10 #
11 # This program is distributed in the hope that it will be useful,
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
14 # GNU General Public License for more details.
15 #
15 #
16 # You should have received a copy of the GNU General Public License
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19 # MA 02110-1301, USA.
19 # MA 02110-1301, USA.
20 """
20 """
21 Created on Jan 26, 2010
21 Created on Jan 26, 2010
22
22
23 @author: marcink
23 @author: marcink
24 A deamon will read from task table and run tasks
24 A deamon will read from task table and run tasks
25 """
25 """
26 import sys
26 import sys
27 import os
27 import os
28 from os.path import dirname as dn
28 from os.path import dirname as dn
29 from os.path import join as jn
29 from os.path import join as jn
30
30
31 #to get the rhodecode import
31 #to get the rhodecode import
32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
33 sys.path.append(project_path)
33 sys.path.append(project_path)
34
34
35 from rhodecode.lib.pidlock import LockHeld, DaemonLock
35 from rhodecode.lib.pidlock import LockHeld, DaemonLock
36 from rhodecode.model.hg_model import HgModel
36 from rhodecode.model.hg_model import HgModel
37 from rhodecode.lib.helpers import safe_unicode
37 from rhodecode.lib.helpers import safe_unicode
38 from whoosh.index import create_in, open_dir
38 from whoosh.index import create_in, open_dir
39 from shutil import rmtree
39 from shutil import rmtree
40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
40 from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
41
41
42 from time import mktime
42 from time import mktime
43 from vcs.backends import hg
43 from vcs.exceptions import ChangesetError
44
44
45 import logging
45 import logging
46
46
47 log = logging.getLogger('whooshIndexer')
47 log = logging.getLogger('whooshIndexer')
48 # create logger
48 # create logger
49 log.setLevel(logging.DEBUG)
49 log.setLevel(logging.DEBUG)
50 log.propagate = False
50 log.propagate = False
51 # create console handler and set level to debug
51 # create console handler and set level to debug
52 ch = logging.StreamHandler()
52 ch = logging.StreamHandler()
53 ch.setLevel(logging.DEBUG)
53 ch.setLevel(logging.DEBUG)
54
54
55 # create formatter
55 # create formatter
56 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
56 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
57
57
58 # add formatter to ch
58 # add formatter to ch
59 ch.setFormatter(formatter)
59 ch.setFormatter(formatter)
60
60
61 # add ch to logger
61 # add ch to logger
62 log.addHandler(ch)
62 log.addHandler(ch)
63
63
64 def scan_paths(root_location):
64 def scan_paths(root_location):
65 return HgModel.repo_scan('/', root_location, None, True)
65 return HgModel.repo_scan('/', root_location, None, True)
66
66
67 class WhooshIndexingDaemon(object):
67 class WhooshIndexingDaemon(object):
68 """
68 """
69 Deamon for atomic jobs
69 Deamon for atomic jobs
70 """
70 """
71
71
72 def __init__(self, indexname='HG_INDEX', repo_location=None):
72 def __init__(self, indexname='HG_INDEX', repo_location=None):
73 self.indexname = indexname
73 self.indexname = indexname
74 self.repo_location = repo_location
74 self.repo_location = repo_location
75 self.repo_paths = scan_paths(self.repo_location)
75 self.initial = False
76 self.initial = False
76 if not os.path.isdir(IDX_LOCATION):
77 if not os.path.isdir(IDX_LOCATION):
77 os.mkdir(IDX_LOCATION)
78 os.mkdir(IDX_LOCATION)
78 log.info('Cannot run incremental index since it does not'
79 log.info('Cannot run incremental index since it does not'
79 ' yet exist running full build')
80 ' yet exist running full build')
80 self.initial = True
81 self.initial = True
81
82
82 def get_paths(self, root_dir):
83 def get_paths(self, repo):
83 """
84 """
84 recursive walk in root dir and return a set of all path in that dir
85 recursive walk in root dir and return a set of all path in that dir
85 based on repository walk function
86 based on repository walk function
86 """
87 """
87 repo = hg.MercurialRepository(root_dir)
88 index_paths_ = set()
88 index_paths_ = set()
89 for topnode, dirs, files in repo.walk('/', 'tip'):
89 for topnode, dirs, files in repo.walk('/', 'tip'):
90 for f in files:
90 for f in files:
91 index_paths_.add(jn(root_dir, f.path))
91 index_paths_.add(jn(repo.path, f.path))
92 for dir in dirs:
92 for dir in dirs:
93 for f in files:
93 for f in files:
94 index_paths_.add(jn(root_dir, f.path))
94 index_paths_.add(jn(repo.path, f.path))
95
95
96 return index_paths_
96 return index_paths_
97
97
98
98 def get_node(self, repo, path):
99 n_path = path[len(repo.path) + 1:]
100 node = repo.get_changeset().get_node(n_path)
101 return node
102
103 def get_node_mtime(self, node):
104 return mktime(node.last_changeset.date.timetuple())
105
99 def add_doc(self, writer, path, repo):
106 def add_doc(self, writer, path, repo):
100 """Adding doc to writer"""
107 """Adding doc to writer"""
101 n_path = path[len(repo.path) + 1:]
108 node = self.get_node(repo, path)
102 node = repo.get_changeset().get_node(n_path)
103
109
104 #we just index the content of chosen files
110 #we just index the content of chosen files
105 if node.extension in INDEX_EXTENSIONS:
111 if node.extension in INDEX_EXTENSIONS:
106 log.debug(' >> %s [WITH CONTENT]' % path)
112 log.debug(' >> %s [WITH CONTENT]' % path)
107 u_content = node.content
113 u_content = node.content
108 else:
114 else:
109 log.debug(' >> %s' % path)
115 log.debug(' >> %s' % path)
110 #just index file name without it's content
116 #just index file name without it's content
111 u_content = u''
117 u_content = u''
112
118
113 writer.add_document(owner=unicode(repo.contact),
119 writer.add_document(owner=unicode(repo.contact),
114 repository=safe_unicode(repo.name),
120 repository=safe_unicode(repo.name),
115 path=safe_unicode(path),
121 path=safe_unicode(path),
116 content=u_content,
122 content=u_content,
117 modtime=mktime(node.last_changeset.date.timetuple()),
123 modtime=self.get_node_mtime(node),
118 extension=node.extension)
124 extension=node.extension)
119
125
120
126
121 def build_index(self):
127 def build_index(self):
122 if os.path.exists(IDX_LOCATION):
128 if os.path.exists(IDX_LOCATION):
123 log.debug('removing previous index')
129 log.debug('removing previous index')
124 rmtree(IDX_LOCATION)
130 rmtree(IDX_LOCATION)
125
131
126 if not os.path.exists(IDX_LOCATION):
132 if not os.path.exists(IDX_LOCATION):
127 os.mkdir(IDX_LOCATION)
133 os.mkdir(IDX_LOCATION)
128
134
129 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
135 idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
130 writer = idx.writer()
136 writer = idx.writer()
131
137
132 for cnt, repo in enumerate(scan_paths(self.repo_location).values()):
138 for cnt, repo in enumerate(self.repo_paths.values()):
133 log.debug('building index @ %s' % repo.path)
139 log.debug('building index @ %s' % repo.path)
134
140
135 for idx_path in self.get_paths(repo.path):
141 for idx_path in self.get_paths(repo):
136 self.add_doc(writer, idx_path, repo)
142 self.add_doc(writer, idx_path, repo)
143
144 log.debug('>> COMMITING CHANGES <<')
137 writer.commit(merge=True)
145 writer.commit(merge=True)
138
139 log.debug('>>> FINISHED BUILDING INDEX <<<')
146 log.debug('>>> FINISHED BUILDING INDEX <<<')
140
147
141
148
142 def update_index(self):
149 def update_index(self):
143 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
150 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
144
151
145 idx = open_dir(IDX_LOCATION, indexname=self.indexname)
152 idx = open_dir(IDX_LOCATION, indexname=self.indexname)
146 # The set of all paths in the index
153 # The set of all paths in the index
147 indexed_paths = set()
154 indexed_paths = set()
148 # The set of all paths we need to re-index
155 # The set of all paths we need to re-index
149 to_index = set()
156 to_index = set()
150
157
151 reader = idx.reader()
158 reader = idx.reader()
152 writer = idx.writer()
159 writer = idx.writer()
153
160
154 # Loop over the stored fields in the index
161 # Loop over the stored fields in the index
155 for fields in reader.all_stored_fields():
162 for fields in reader.all_stored_fields():
156 indexed_path = fields['path']
163 indexed_path = fields['path']
157 indexed_paths.add(indexed_path)
164 indexed_paths.add(indexed_path)
158
165
159 if not os.path.exists(indexed_path):
166 repo = self.repo_paths[fields['repository']]
167
168 try:
169 node = self.get_node(repo, indexed_path)
170 except ChangesetError:
160 # This file was deleted since it was indexed
171 # This file was deleted since it was indexed
161 log.debug('removing from index %s' % indexed_path)
172 log.debug('removing from index %s' % indexed_path)
162 writer.delete_by_term('path', indexed_path)
173 writer.delete_by_term('path', indexed_path)
163
174
164 else:
175 else:
165 # Check if this file was changed since it
176 # Check if this file was changed since it was indexed
166 # was indexed
167 indexed_time = fields['modtime']
177 indexed_time = fields['modtime']
168
178 mtime = self.get_node_mtime(node)
169 mtime = os.path.getmtime(indexed_path)
170
171 if mtime > indexed_time:
179 if mtime > indexed_time:
172
173 # The file has changed, delete it and add it to the list of
180 # The file has changed, delete it and add it to the list of
174 # files to reindex
181 # files to reindex
175 log.debug('adding to reindex list %s' % indexed_path)
182 log.debug('adding to reindex list %s' % indexed_path)
176 writer.delete_by_term('path', indexed_path)
183 writer.delete_by_term('path', indexed_path)
177 to_index.add(indexed_path)
184 to_index.add(indexed_path)
178 #writer.commit()
179
185
180 # Loop over the files in the filesystem
186 # Loop over the files in the filesystem
181 # Assume we have a function that gathers the filenames of the
187 # Assume we have a function that gathers the filenames of the
182 # documents to be indexed
188 # documents to be indexed
183 for repo in scan_paths(self.repo_location).values():
189 for repo in self.repo_paths.values():
184 for path in self.get_paths(repo.path):
190 for path in self.get_paths(repo):
185 if path in to_index or path not in indexed_paths:
191 if path in to_index or path not in indexed_paths:
186 # This is either a file that's changed, or a new file
192 # This is either a file that's changed, or a new file
187 # that wasn't indexed before. So index it!
193 # that wasn't indexed before. So index it!
188 self.add_doc(writer, path, repo)
194 self.add_doc(writer, path, repo)
189 log.debug('reindexing %s' % path)
195 log.debug('re indexing %s' % path)
190
196
197 log.debug('>> COMMITING CHANGES <<')
191 writer.commit(merge=True)
198 writer.commit(merge=True)
192 #idx.optimize()
199 log.debug('>>> FINISHED REBUILDING INDEX <<<')
193 log.debug('>>> FINISHED <<<')
194
200
195 def run(self, full_index=False):
201 def run(self, full_index=False):
196 """Run daemon"""
202 """Run daemon"""
197 if full_index or self.initial:
203 if full_index or self.initial:
198 self.build_index()
204 self.build_index()
199 else:
205 else:
200 self.update_index()
206 self.update_index()
201
207
202 if __name__ == "__main__":
208 if __name__ == "__main__":
203 arg = sys.argv[1:]
209 arg = sys.argv[1:]
204 if len(arg) != 2:
210 if len(arg) != 2:
205 sys.stderr.write('Please specify indexing type [full|incremental]'
211 sys.stderr.write('Please specify indexing type [full|incremental]'
206 'and path to repositories as script args \n')
212 'and path to repositories as script args \n')
207 sys.exit()
213 sys.exit()
208
214
209
215
210 if arg[0] == 'full':
216 if arg[0] == 'full':
211 full_index = True
217 full_index = True
212 elif arg[0] == 'incremental':
218 elif arg[0] == 'incremental':
213 # False means looking just for changes
219 # False means looking just for changes
214 full_index = False
220 full_index = False
215 else:
221 else:
216 sys.stdout.write('Please use [full|incremental]'
222 sys.stdout.write('Please use [full|incremental]'
217 ' as script first arg \n')
223 ' as script first arg \n')
218 sys.exit()
224 sys.exit()
219
225
220 if not os.path.isdir(arg[1]):
226 if not os.path.isdir(arg[1]):
221 sys.stderr.write('%s is not a valid path \n' % arg[1])
227 sys.stderr.write('%s is not a valid path \n' % arg[1])
222 sys.exit()
228 sys.exit()
223 else:
229 else:
224 if arg[1].endswith('/'):
230 if arg[1].endswith('/'):
225 repo_location = arg[1] + '*'
231 repo_location = arg[1] + '*'
226 else:
232 else:
227 repo_location = arg[1] + '/*'
233 repo_location = arg[1] + '/*'
228
234
229 try:
235 try:
230 l = DaemonLock()
236 l = DaemonLock()
231 WhooshIndexingDaemon(repo_location=repo_location)\
237 WhooshIndexingDaemon(repo_location=repo_location)\
232 .run(full_index=full_index)
238 .run(full_index=full_index)
233 l.release()
239 l.release()
234 reload(logging)
240 reload(logging)
235 except LockHeld:
241 except LockHeld:
236 sys.exit(1)
242 sys.exit(1)
237
243
General Comments 0
You need to be logged in to leave comments. Login now