##// END OF EJS Templates
caches: report damaged DB on key iterations too not only the GET call
super-admin -
r4701:bdcbd118 stable
parent child Browse files
Show More
@@ -1,302 +1,305 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
37
38
38
39 _default_max_size = 1024
39 _default_max_size = 1024
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
45 key_prefix = 'lru_mem_backend'
46 pickle_values = False
46 pickle_values = False
47
47
48 def __init__(self, arguments):
48 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
49 max_size = arguments.pop('max_size', _default_max_size)
50
50
51 LRUDictClass = LRUDict
51 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
52 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
53 LRUDictClass = LRUDictDebug
54
54
55 arguments['cache_dict'] = LRUDictClass(max_size)
55 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
56 super(LRUMemoryBackend, self).__init__(arguments)
57
57
58 def delete(self, key):
58 def delete(self, key):
59 try:
59 try:
60 del self._cache[key]
60 del self._cache[key]
61 except KeyError:
61 except KeyError:
62 # we don't care if key isn't there at deletion
62 # we don't care if key isn't there at deletion
63 pass
63 pass
64
64
65 def delete_multi(self, keys):
65 def delete_multi(self, keys):
66 for key in keys:
66 for key in keys:
67 self.delete(key)
67 self.delete(key)
68
68
69
69
70 class PickleSerializer(object):
70 class PickleSerializer(object):
71
71
72 def _dumps(self, value, safe=False):
72 def _dumps(self, value, safe=False):
73 try:
73 try:
74 return compat.pickle.dumps(value)
74 return compat.pickle.dumps(value)
75 except Exception:
75 except Exception:
76 if safe:
76 if safe:
77 return NO_VALUE
77 return NO_VALUE
78 else:
78 else:
79 raise
79 raise
80
80
81 def _loads(self, value, safe=True):
81 def _loads(self, value, safe=True):
82 try:
82 try:
83 return compat.pickle.loads(value)
83 return compat.pickle.loads(value)
84 except Exception:
84 except Exception:
85 if safe:
85 if safe:
86 return NO_VALUE
86 return NO_VALUE
87 else:
87 else:
88 raise
88 raise
89
89
90
90
91 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
92
92
93 def _dumps(self, value, safe=False):
93 def _dumps(self, value, safe=False):
94 try:
94 try:
95 return msgpack.packb(value)
95 return msgpack.packb(value)
96 except Exception:
96 except Exception:
97 if safe:
97 if safe:
98 return NO_VALUE
98 return NO_VALUE
99 else:
99 else:
100 raise
100 raise
101
101
102 def _loads(self, value, safe=True):
102 def _loads(self, value, safe=True):
103 """
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
105 msgpack does not, so it must be added back in.
106 """
106 """
107 try:
107 try:
108 value = msgpack.unpackb(value, use_list=False)
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
109 return CachedValue(*value)
110 except Exception:
110 except Exception:
111 if safe:
111 if safe:
112 return NO_VALUE
112 return NO_VALUE
113 else:
113 else:
114 raise
114 raise
115
115
116
116
117 import fcntl
117 import fcntl
118 flock_org = fcntl.flock
118 flock_org = fcntl.flock
119
119
120
120
121 class CustomLockFactory(FileLock):
121 class CustomLockFactory(FileLock):
122
122
123 @memoized_property
123 @memoized_property
124 def _module(self):
124 def _module(self):
125
125
126 def gevent_flock(fd, operation):
126 def gevent_flock(fd, operation):
127 """
127 """
128 Gevent compatible flock
128 Gevent compatible flock
129 """
129 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
131 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
132 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
133 timeout = 60 * 15 # 15min
134 while True:
134 while True:
135 try:
135 try:
136 flock_org(fd, operation)
136 flock_org(fd, operation)
137 # lock has been acquired
137 # lock has been acquired
138 break
138 break
139 except (OSError, IOError) as e:
139 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
140 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
141 if e.errno != errno.EAGAIN:
142 raise
142 raise
143 elif (time.time() - start_lock_time) > timeout:
143 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
144 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
146 self.filename, timeout)
147 raise
147 raise
148 wait_timeout = 0.03
148 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
150 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
151 gevent.sleep(wait_timeout)
152
152
153 fcntl.flock = gevent_flock
153 fcntl.flock = gevent_flock
154 return fcntl
154 return fcntl
155
155
156
156
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
158 key_prefix = 'file_backend'
159
159
160 def __init__(self, arguments):
160 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
161 arguments['lock_factory'] = CustomLockFactory
162 db_file = arguments.get('filename')
162 db_file = arguments.get('filename')
163
163
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 try:
165 try:
166 super(FileNamespaceBackend, self).__init__(arguments)
166 super(FileNamespaceBackend, self).__init__(arguments)
167 except Exception:
167 except Exception:
168 log.error('Failed to initialize db at: %s', db_file)
168 log.error('Failed to initialize db at: %s', db_file)
169 raise
169 raise
170
170
171 def __repr__(self):
171 def __repr__(self):
172 return '{} `{}`'.format(self.__class__, self.filename)
172 return '{} `{}`'.format(self.__class__, self.filename)
173
173
174 def list_keys(self, prefix=''):
174 def list_keys(self, prefix=''):
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176
176
177 def cond(v):
177 def cond(v):
178 if not prefix:
178 if not prefix:
179 return True
179 return True
180
180
181 if v.startswith(prefix):
181 if v.startswith(prefix):
182 return True
182 return True
183 return False
183 return False
184
184
185 with self._dbm_file(True) as dbm:
185 with self._dbm_file(True) as dbm:
186
186 try:
187 return filter(cond, dbm.keys())
187 return filter(cond, dbm.keys())
188 except Exception:
189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 raise
188
191
189 def get_store(self):
192 def get_store(self):
190 return self.filename
193 return self.filename
191
194
192 def _dbm_get(self, key):
195 def _dbm_get(self, key):
193 with self._dbm_file(False) as dbm:
196 with self._dbm_file(False) as dbm:
194 if hasattr(dbm, 'get'):
197 if hasattr(dbm, 'get'):
195 value = dbm.get(key, NO_VALUE)
198 value = dbm.get(key, NO_VALUE)
196 else:
199 else:
197 # gdbm objects lack a .get method
200 # gdbm objects lack a .get method
198 try:
201 try:
199 value = dbm[key]
202 value = dbm[key]
200 except KeyError:
203 except KeyError:
201 value = NO_VALUE
204 value = NO_VALUE
202 if value is not NO_VALUE:
205 if value is not NO_VALUE:
203 value = self._loads(value)
206 value = self._loads(value)
204 return value
207 return value
205
208
206 def get(self, key):
209 def get(self, key):
207 try:
210 try:
208 return self._dbm_get(key)
211 return self._dbm_get(key)
209 except Exception:
212 except Exception:
210 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
211 raise
214 raise
212
215
213 def set(self, key, value):
216 def set(self, key, value):
214 with self._dbm_file(True) as dbm:
217 with self._dbm_file(True) as dbm:
215 dbm[key] = self._dumps(value)
218 dbm[key] = self._dumps(value)
216
219
217 def set_multi(self, mapping):
220 def set_multi(self, mapping):
218 with self._dbm_file(True) as dbm:
221 with self._dbm_file(True) as dbm:
219 for key, value in mapping.items():
222 for key, value in mapping.items():
220 dbm[key] = self._dumps(value)
223 dbm[key] = self._dumps(value)
221
224
222
225
223 class BaseRedisBackend(redis_backend.RedisBackend):
226 class BaseRedisBackend(redis_backend.RedisBackend):
224
227
225 def _create_client(self):
228 def _create_client(self):
226 args = {}
229 args = {}
227
230
228 if self.url is not None:
231 if self.url is not None:
229 args.update(url=self.url)
232 args.update(url=self.url)
230
233
231 else:
234 else:
232 args.update(
235 args.update(
233 host=self.host, password=self.password,
236 host=self.host, password=self.password,
234 port=self.port, db=self.db
237 port=self.port, db=self.db
235 )
238 )
236
239
237 connection_pool = redis.ConnectionPool(**args)
240 connection_pool = redis.ConnectionPool(**args)
238
241
239 return redis.StrictRedis(connection_pool=connection_pool)
242 return redis.StrictRedis(connection_pool=connection_pool)
240
243
241 def list_keys(self, prefix=''):
244 def list_keys(self, prefix=''):
242 prefix = '{}:{}*'.format(self.key_prefix, prefix)
245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
243 return self.client.keys(prefix)
246 return self.client.keys(prefix)
244
247
245 def get_store(self):
248 def get_store(self):
246 return self.client.connection_pool
249 return self.client.connection_pool
247
250
248 def get(self, key):
251 def get(self, key):
249 value = self.client.get(key)
252 value = self.client.get(key)
250 if value is None:
253 if value is None:
251 return NO_VALUE
254 return NO_VALUE
252 return self._loads(value)
255 return self._loads(value)
253
256
254 def get_multi(self, keys):
257 def get_multi(self, keys):
255 if not keys:
258 if not keys:
256 return []
259 return []
257 values = self.client.mget(keys)
260 values = self.client.mget(keys)
258 loads = self._loads
261 loads = self._loads
259 return [
262 return [
260 loads(v) if v is not None else NO_VALUE
263 loads(v) if v is not None else NO_VALUE
261 for v in values]
264 for v in values]
262
265
263 def set(self, key, value):
266 def set(self, key, value):
264 if self.redis_expiration_time:
267 if self.redis_expiration_time:
265 self.client.setex(key, self.redis_expiration_time,
268 self.client.setex(key, self.redis_expiration_time,
266 self._dumps(value))
269 self._dumps(value))
267 else:
270 else:
268 self.client.set(key, self._dumps(value))
271 self.client.set(key, self._dumps(value))
269
272
270 def set_multi(self, mapping):
273 def set_multi(self, mapping):
271 dumps = self._dumps
274 dumps = self._dumps
272 mapping = dict(
275 mapping = dict(
273 (k, dumps(v))
276 (k, dumps(v))
274 for k, v in mapping.items()
277 for k, v in mapping.items()
275 )
278 )
276
279
277 if not self.redis_expiration_time:
280 if not self.redis_expiration_time:
278 self.client.mset(mapping)
281 self.client.mset(mapping)
279 else:
282 else:
280 pipe = self.client.pipeline()
283 pipe = self.client.pipeline()
281 for key, value in mapping.items():
284 for key, value in mapping.items():
282 pipe.setex(key, self.redis_expiration_time, value)
285 pipe.setex(key, self.redis_expiration_time, value)
283 pipe.execute()
286 pipe.execute()
284
287
285 def get_mutex(self, key):
288 def get_mutex(self, key):
286 u = redis_backend.u
289 u = redis_backend.u
287 if self.distributed_lock:
290 if self.distributed_lock:
288 lock_key = u('_lock_{0}').format(key)
291 lock_key = u('_lock_{0}').format(key)
289 log.debug('Trying to acquire Redis lock for key %s', lock_key)
292 log.debug('Trying to acquire Redis lock for key %s', lock_key)
290 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
293 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
291 else:
294 else:
292 return None
295 return None
293
296
294
297
295 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
298 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
296 key_prefix = 'redis_pickle_backend'
299 key_prefix = 'redis_pickle_backend'
297 pass
300 pass
298
301
299
302
300 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
303 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
301 key_prefix = 'redis_msgpack_backend'
304 key_prefix = 'redis_msgpack_backend'
302 pass
305 pass
General Comments 0
You need to be logged in to leave comments. Login now