##// END OF EJS Templates
fixed daemon typos
marcink -
r1377:78e5853d beta
parent child Browse files
Show More
@@ -1,240 +1,240 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.indexers.daemon
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 A deamon will read from task table and run tasks
6 A daemon will read from task table and run tasks
7 7
8 8 :created_on: Jan 26, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import logging
29 29 import traceback
30 30
31 31 from shutil import rmtree
32 32 from time import mktime
33 33
34 34 from os.path import dirname as dn
35 35 from os.path import join as jn
36 36
37 37 #to get the rhodecode import
38 38 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
39 39 sys.path.append(project_path)
40 40
41 41
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib import safe_unicode
44 44 from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
45 45
46 46 from vcs.exceptions import ChangesetError, RepositoryError
47 47
48 48 from whoosh.index import create_in, open_dir
49 49
50 50
51 51
52 52 log = logging.getLogger('whooshIndexer')
53 53 # create logger
54 54 log.setLevel(logging.DEBUG)
55 55 log.propagate = False
56 56 # create console handler and set level to debug
57 57 ch = logging.StreamHandler()
58 58 ch.setLevel(logging.DEBUG)
59 59
60 60 # create formatter
61 61 formatter = logging.Formatter("%(asctime)s - %(name)s -"
62 62 " %(levelname)s - %(message)s")
63 63
64 64 # add formatter to ch
65 65 ch.setFormatter(formatter)
66 66
67 67 # add ch to logger
68 68 log.addHandler(ch)
69 69
70 70 class WhooshIndexingDaemon(object):
71 71 """
72 Deamon for atomic jobs
72 Daemon for atomic jobs
73 73 """
74 74
75 75 def __init__(self, indexname='HG_INDEX', index_location=None,
76 76 repo_location=None, sa=None, repo_list=None):
77 77 self.indexname = indexname
78 78
79 79 self.index_location = index_location
80 80 if not index_location:
81 81 raise Exception('You have to provide index location')
82 82
83 83 self.repo_location = repo_location
84 84 if not repo_location:
85 85 raise Exception('You have to provide repositories location')
86 86
87 87 self.repo_paths = ScmModel(sa).repo_scan(self.repo_location)
88 88
89 89 if repo_list:
90 90 filtered_repo_paths = {}
91 91 for repo_name, repo in self.repo_paths.items():
92 92 if repo_name in repo_list:
93 93 filtered_repo_paths[repo_name] = repo
94 94
95 95 self.repo_paths = filtered_repo_paths
96 96
97 97
98 98 self.initial = False
99 99 if not os.path.isdir(self.index_location):
100 100 os.makedirs(self.index_location)
101 101 log.info('Cannot run incremental index since it does not'
102 102 ' yet exist running full build')
103 103 self.initial = True
104 104
105 105 def get_paths(self, repo):
106 106 """recursive walk in root dir and return a set of all path in that dir
107 107 based on repository walk function
108 108 """
109 109 index_paths_ = set()
110 110 try:
111 111 tip = repo.get_changeset('tip')
112 112 for topnode, dirs, files in tip.walk('/'):
113 113 for f in files:
114 114 index_paths_.add(jn(repo.path, f.path))
115 115 for dir in dirs:
116 116 for f in files:
117 117 index_paths_.add(jn(repo.path, f.path))
118 118
119 119 except RepositoryError, e:
120 120 log.debug(traceback.format_exc())
121 121 pass
122 122 return index_paths_
123 123
124 124 def get_node(self, repo, path):
125 125 n_path = path[len(repo.path) + 1:]
126 126 node = repo.get_changeset().get_node(n_path)
127 127 return node
128 128
129 129 def get_node_mtime(self, node):
130 130 return mktime(node.last_changeset.date.timetuple())
131 131
132 132 def add_doc(self, writer, path, repo, repo_name):
133 133 """Adding doc to writer this function itself fetches data from
134 134 the instance of vcs backend"""
135 135 node = self.get_node(repo, path)
136 136
137 137 #we just index the content of chosen files, and skip binary files
138 138 if node.extension in INDEX_EXTENSIONS and not node.is_binary:
139 139
140 140 u_content = node.content
141 141 if not isinstance(u_content, unicode):
142 142 log.warning(' >> %s Could not get this content as unicode '
143 143 'replacing with empty content', path)
144 144 u_content = u''
145 145 else:
146 146 log.debug(' >> %s [WITH CONTENT]' % path)
147 147
148 148 else:
149 149 log.debug(' >> %s' % path)
150 150 #just index file name without it's content
151 151 u_content = u''
152 152
153 153 writer.add_document(owner=unicode(repo.contact),
154 154 repository=safe_unicode(repo_name),
155 155 path=safe_unicode(path),
156 156 content=u_content,
157 157 modtime=self.get_node_mtime(node),
158 158 extension=node.extension)
159 159
160 160
161 161 def build_index(self):
162 162 if os.path.exists(self.index_location):
163 163 log.debug('removing previous index')
164 164 rmtree(self.index_location)
165 165
166 166 if not os.path.exists(self.index_location):
167 167 os.mkdir(self.index_location)
168 168
169 169 idx = create_in(self.index_location, SCHEMA, indexname=IDX_NAME)
170 170 writer = idx.writer()
171 171
172 172 for repo_name, repo in self.repo_paths.items():
173 173 log.debug('building index @ %s' % repo.path)
174 174
175 175 for idx_path in self.get_paths(repo):
176 176 self.add_doc(writer, idx_path, repo, repo_name)
177 177
178 178 log.debug('>> COMMITING CHANGES <<')
179 179 writer.commit(merge=True)
180 180 log.debug('>>> FINISHED BUILDING INDEX <<<')
181 181
182 182
183 183 def update_index(self):
184 184 log.debug('STARTING INCREMENTAL INDEXING UPDATE')
185 185
186 186 idx = open_dir(self.index_location, indexname=self.indexname)
187 187 # The set of all paths in the index
188 188 indexed_paths = set()
189 189 # The set of all paths we need to re-index
190 190 to_index = set()
191 191
192 192 reader = idx.reader()
193 193 writer = idx.writer()
194 194
195 195 # Loop over the stored fields in the index
196 196 for fields in reader.all_stored_fields():
197 197 indexed_path = fields['path']
198 198 indexed_paths.add(indexed_path)
199 199
200 200 repo = self.repo_paths[fields['repository']]
201 201
202 202 try:
203 203 node = self.get_node(repo, indexed_path)
204 204 except ChangesetError:
205 205 # This file was deleted since it was indexed
206 206 log.debug('removing from index %s' % indexed_path)
207 207 writer.delete_by_term('path', indexed_path)
208 208
209 209 else:
210 210 # Check if this file was changed since it was indexed
211 211 indexed_time = fields['modtime']
212 212 mtime = self.get_node_mtime(node)
213 213 if mtime > indexed_time:
214 214 # The file has changed, delete it and add it to the list of
215 215 # files to reindex
216 216 log.debug('adding to reindex list %s' % indexed_path)
217 217 writer.delete_by_term('path', indexed_path)
218 218 to_index.add(indexed_path)
219 219
220 220 # Loop over the files in the filesystem
221 221 # Assume we have a function that gathers the filenames of the
222 222 # documents to be indexed
223 223 for repo_name, repo in self.repo_paths.items():
224 224 for path in self.get_paths(repo):
225 225 if path in to_index or path not in indexed_paths:
226 226 # This is either a file that's changed, or a new file
227 227 # that wasn't indexed before. So index it!
228 228 self.add_doc(writer, path, repo, repo_name)
229 229 log.debug('re indexing %s' % path)
230 230
231 231 log.debug('>> COMMITING CHANGES <<')
232 232 writer.commit(merge=True)
233 233 log.debug('>>> FINISHED REBUILDING INDEX <<<')
234 234
235 235 def run(self, full_index=False):
236 236 """Run daemon"""
237 237 if full_index or self.initial:
238 238 self.build_index()
239 239 else:
240 240 self.update_index()
General Comments 0
You need to be logged in to leave comments. Login now