##// END OF EJS Templates
fixed daemon typos
marcink -
r1377:78e5853d beta
parent child Browse files
Show More
@@ -1,240 +1,240 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """
2 """
3 rhodecode.lib.indexers.daemon
3 rhodecode.lib.indexers.daemon
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
5
6 A deamon will read from task table and run tasks
6 A daemon will read from task table and run tasks
7
7
8 :created_on: Jan 26, 2010
8 :created_on: Jan 26, 2010
9 :author: marcink
9 :author: marcink
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 :license: GPLv3, see COPYING for more details.
11 :license: GPLv3, see COPYING for more details.
12 """
12 """
13 # This program is free software: you can redistribute it and/or modify
13 # This program is free software: you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License as published by
14 # it under the terms of the GNU General Public License as published by
15 # the Free Software Foundation, either version 3 of the License, or
15 # the Free Software Foundation, either version 3 of the License, or
16 # (at your option) any later version.
16 # (at your option) any later version.
17 #
17 #
18 # This program is distributed in the hope that it will be useful,
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
21 # GNU General Public License for more details.
22 #
22 #
23 # You should have received a copy of the GNU General Public License
23 # You should have received a copy of the GNU General Public License
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25
25
26 import os
26 import os
27 import sys
27 import sys
28 import logging
28 import logging
29 import traceback
29 import traceback
30
30
31 from shutil import rmtree
31 from shutil import rmtree
32 from time import mktime
32 from time import mktime
33
33
34 from os.path import dirname as dn
34 from os.path import dirname as dn
35 from os.path import join as jn
35 from os.path import join as jn
36
36
37 #to get the rhodecode import
37 #to get the rhodecode import
38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
39 sys.path.append(project_path)
39 sys.path.append(project_path)
40
40
41
41
42 from rhodecode.model.scm import ScmModel
42 from rhodecode.model.scm import ScmModel
43 from rhodecode.lib import safe_unicode
43 from rhodecode.lib import safe_unicode
44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
45
45
46 from vcs.exceptions import ChangesetError, RepositoryError
46 from vcs.exceptions import ChangesetError, RepositoryError
47
47
48 from whoosh.index import create_in, open_dir
48 from whoosh.index import create_in, open_dir
49
49
50
50
51
51
52 log = logging.getLogger('whooshIndexer')
52 log = logging.getLogger('whooshIndexer')
53 # create logger
53 # create logger
54 log.setLevel(logging.DEBUG)
54 log.setLevel(logging.DEBUG)
55 log.propagate = False
55 log.propagate = False
56 # create console handler and set level to debug
56 # create console handler and set level to debug
57 ch = logging.StreamHandler()
57 ch = logging.StreamHandler()
58 ch.setLevel(logging.DEBUG)
58 ch.setLevel(logging.DEBUG)
59
59
60 # create formatter
60 # create formatter
61 formatter = logging.Formatter("%(asctime)s - %(name)s -"
61 formatter = logging.Formatter("%(asctime)s - %(name)s -"
62 " %(levelname)s - %(message)s")
62 " %(levelname)s - %(message)s")
63
63
64 # add formatter to ch
64 # add formatter to ch
65 ch.setFormatter(formatter)
65 ch.setFormatter(formatter)
66
66
67 # add ch to logger
67 # add ch to logger
68 log.addHandler(ch)
68 log.addHandler(ch)
69
69
70 class WhooshIndexingDaemon(object):
70 class WhooshIndexingDaemon(object):
71 """
71 """
72 Deamon for atomic jobs
72 Daemon for atomic jobs
73 """
73 """
74
74
75 def __init__(self, indexname='HG_INDEX', index_location=None,
75 def __init__(self, indexname='HG_INDEX', index_location=None,
76 repo_location=None, sa=None, repo_list=None):
76 repo_location=None, sa=None, repo_list=None):
77 self.indexname = indexname
77 self.indexname = indexname
78
78
79 self.index_location = index_location
79 self.index_location = index_location
80 if not index_location:
80 if not index_location:
81 raise Exception('You have to provide index location')
81 raise Exception('You have to provide index location')
82
82
83 self.repo_location = repo_location
83 self.repo_location = repo_location
84 if not repo_location:
84 if not repo_location:
85 raise Exception('You have to provide repositories location')
85 raise Exception('You have to provide repositories location')
86
86
87 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
87 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
88
88
89 if repo_list:
89 if repo_list:
90 filtered_repo_paths = {}
90 filtered_repo_paths = {}
91 for repo_name, repo in self.repo_paths.items():
91 for repo_name, repo in self.repo_paths.items():
92 if repo_name in repo_list:
92 if repo_name in repo_list:
93 filtered_repo_paths[repo_name] = repo
93 filtered_repo_paths[repo_name] = repo
94
94
95 self.repo_paths = filtered_repo_paths
95 self.repo_paths = filtered_repo_paths
96
96
97
97
98 self.initial = False
98 self.initial = False
99 if not os.path.isdir(self.index_location):
99 if not os.path.isdir(self.index_location):
100 os.makedirs(self.index_location)
100 os.makedirs(self.index_location)
101 log.info('Cannot run incremental index since it does not'
101 log.info('Cannot run incremental index since it does not'
102 ' yet exist running full build')
102 ' yet exist running full build')
103 self.initial = True
103 self.initial = True
104
104
105 def get_paths(self, repo):
105 def get_paths(self, repo):
106 """recursive walk in root dir and return a set of all path in that dir
106 """recursive walk in root dir and return a set of all path in that dir
107 based on repository walk function
107 based on repository walk function
108 """
108 """
109 index_paths_ = set()
109 index_paths_ = set()
110 try:
110 try:
111 tip = repo.get_changeset('tip')
111 tip = repo.get_changeset('tip')
112 for topnode, dirs, files in tip.walk('/'):
112 for topnode, dirs, files in tip.walk('/'):
113 for f in files:
113 for f in files:
114 index_paths_.add(jn(repo.path, f.path))
114 index_paths_.add(jn(repo.path, f.path))
115 for dir in dirs:
115 for dir in dirs:
116 for f in files:
116 for f in files:
117 index_paths_.add(jn(repo.path, f.path))
117 index_paths_.add(jn(repo.path, f.path))
118
118
119 except RepositoryError, e:
119 except RepositoryError, e:
120 log.debug(traceback.format_exc())
120 log.debug(traceback.format_exc())
121 pass
121 pass
122 return index_paths_
122 return index_paths_
123
123
124 def get_node(self, repo, path):
124 def get_node(self, repo, path):
125 n_path = path[len(repo.path) + 1:]
125 n_path = path[len(repo.path) + 1:]
126 node = repo.get_changeset().get_node(n_path)
126 node = repo.get_changeset().get_node(n_path)
127 return node
127 return node
128
128
129 def get_node_mtime(self, node):
129 def get_node_mtime(self, node):
130 return mktime(node.last_changeset.date.timetuple())
130 return mktime(node.last_changeset.date.timetuple())
131
131
132 def add_doc(self, writer, path, repo, repo_name):
132 def add_doc(self, writer, path, repo, repo_name):
133 """Adding doc to writer this function itself fetches data from
133 """Adding doc to writer this function itself fetches data from
134 the instance of vcs backend"""
134 the instance of vcs backend"""
135 node = self.get_node(repo, path)
135 node = self.get_node(repo, path)
136
136
137 #we just index the content of chosen files, and skip binary files
137 #we just index the content of chosen files, and skip binary files
138 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
138 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
139
139
140 u_content = node.content
140 u_content = node.content
141 if not isinstance(u_content, unicode):
141 if not isinstance(u_content, unicode):
142 log.warning(' >> %s Could not get this content as unicode '
142 log.warning(' >> %s Could not get this content as unicode '
143 'replacing with empty content', path)
143 'replacing with empty content', path)
144 u_content = u''
144 u_content = u''
145 else:
145 else:
146 log.debug(' >> %s [WITH CONTENT]' % path)
146 log.debug(' >> %s [WITH CONTENT]' % path)
147
147
148 else:
148 else:
149 log.debug(' >> %s' % path)
149 log.debug(' >> %s' % path)
150 #just index file name without it's content
150 #just index file name without it's content
151 u_content = u''
151 u_content = u''
152
152
153 writer.add_document(owner=unicode(repo.contact),
153 writer.add_document(owner=unicode(repo.contact),
154 repository=safe_unicode(repo_name),
154 repository=safe_unicode(repo_name),
155 path=safe_unicode(path),
155 path=safe_unicode(path),
156 content=u_content,
156 content=u_content,
157 modtime=self.get_node_mtime(node),
157 modtime=self.get_node_mtime(node),
158 extension=node.extension)
158 extension=node.extension)
159
159
160
160
161 def build_index(self):
161 def build_index(self):
162 if os.path.exists(self.index_location):
162 if os.path.exists(self.index_location):
163 log.debug('removing previous index')
163 log.debug('removing previous index')
164 rmtree(self.index_location)
164 rmtree(self.index_location)
165
165
166 if not os.path.exists(self.index_location):
166 if not os.path.exists(self.index_location):
167 os.mkdir(self.index_location)
167 os.mkdir(self.index_location)
168
168
169 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
169 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
170 writer = idx.writer()
170 writer = idx.writer()
171
171
172 for repo_name, repo in self.repo_paths.items():
172 for repo_name, repo in self.repo_paths.items():
173 log.debug('building index @ %s' % repo.path)
173 log.debug('building index @ %s' % repo.path)
174
174
175 for idx_path in self.get_paths(repo):
175 for idx_path in self.get_paths(repo):
176 self.add_doc(writer, idx_path, repo, repo_name)
176 self.add_doc(writer, idx_path, repo, repo_name)
177
177
178 log.debug('>> COMMITING CHANGES <<')
178 log.debug('>> COMMITING CHANGES <<')
179 writer.commit(merge=True)
179 writer.commit(merge=True)
180 log.debug('>>> FINISHED BUILDING INDEX <<<')
180 log.debug('>>> FINISHED BUILDING INDEX <<<')
181
181
182
182
183 def update_index(self):
183 def update_index(self):
184 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
184 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
185
185
186 idx = open_dir(self.index_location, indexname=self.indexname)
186 idx = open_dir(self.index_location, indexname=self.indexname)
187 # The set of all paths in the index
187 # The set of all paths in the index
188 indexed_paths = set()
188 indexed_paths = set()
189 # The set of all paths we need to re-index
189 # The set of all paths we need to re-index
190 to_index = set()
190 to_index = set()
191
191
192 reader = idx.reader()
192 reader = idx.reader()
193 writer = idx.writer()
193 writer = idx.writer()
194
194
195 # Loop over the stored fields in the index
195 # Loop over the stored fields in the index
196 for fields in reader.all_stored_fields():
196 for fields in reader.all_stored_fields():
197 indexed_path = fields['path']
197 indexed_path = fields['path']
198 indexed_paths.add(indexed_path)
198 indexed_paths.add(indexed_path)
199
199
200 repo = self.repo_paths[fields['repository']]
200 repo = self.repo_paths[fields['repository']]
201
201
202 try:
202 try:
203 node = self.get_node(repo, indexed_path)
203 node = self.get_node(repo, indexed_path)
204 except ChangesetError:
204 except ChangesetError:
205 # This file was deleted since it was indexed
205 # This file was deleted since it was indexed
206 log.debug('removing from index %s' % indexed_path)
206 log.debug('removing from index %s' % indexed_path)
207 writer.delete_by_term('path', indexed_path)
207 writer.delete_by_term('path', indexed_path)
208
208
209 else:
209 else:
210 # Check if this file was changed since it was indexed
210 # Check if this file was changed since it was indexed
211 indexed_time = fields['modtime']
211 indexed_time = fields['modtime']
212 mtime = self.get_node_mtime(node)
212 mtime = self.get_node_mtime(node)
213 if mtime > indexed_time:
213 if mtime > indexed_time:
214 # The file has changed, delete it and add it to the list of
214 # The file has changed, delete it and add it to the list of
215 # files to reindex
215 # files to reindex
216 log.debug('adding to reindex list %s' % indexed_path)
216 log.debug('adding to reindex list %s' % indexed_path)
217 writer.delete_by_term('path', indexed_path)
217 writer.delete_by_term('path', indexed_path)
218 to_index.add(indexed_path)
218 to_index.add(indexed_path)
219
219
220 # Loop over the files in the filesystem
220 # Loop over the files in the filesystem
221 # Assume we have a function that gathers the filenames of the
221 # Assume we have a function that gathers the filenames of the
222 # documents to be indexed
222 # documents to be indexed
223 for repo_name, repo in self.repo_paths.items():
223 for repo_name, repo in self.repo_paths.items():
224 for path in self.get_paths(repo):
224 for path in self.get_paths(repo):
225 if path in to_index or path not in indexed_paths:
225 if path in to_index or path not in indexed_paths:
226 # This is either a file that's changed, or a new file
226 # This is either a file that's changed, or a new file
227 # that wasn't indexed before. So index it!
227 # that wasn't indexed before. So index it!
228 self.add_doc(writer, path, repo, repo_name)
228 self.add_doc(writer, path, repo, repo_name)
229 log.debug('re indexing %s' % path)
229 log.debug('re indexing %s' % path)
230
230
231 log.debug('>> COMMITING CHANGES <<')
231 log.debug('>> COMMITING CHANGES <<')
232 writer.commit(merge=True)
232 writer.commit(merge=True)
233 log.debug('>>> FINISHED REBUILDING INDEX <<<')
233 log.debug('>>> FINISHED REBUILDING INDEX <<<')
234
234
235 def run(self, full_index=False):
235 def run(self, full_index=False):
236 """Run daemon"""
236 """Run daemon"""
237 if full_index or self.initial:
237 if full_index or self.initial:
238 self.build_index()
238 self.build_index()
239 else:
239 else:
240 self.update_index()
240 self.update_index()
General Comments 0
You need to be logged in to leave comments. Login now