##// END OF EJS Templates
caches: report damaged DB on key iterations too not only the GET call
super-admin -
r4701:bdcbd118 stable
parent child Browse files
Show More
@@ -1,302 +1,305 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 37
38 38
39 39 _default_max_size = 1024
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 45 key_prefix = 'lru_mem_backend'
46 46 pickle_values = False
47 47
48 48 def __init__(self, arguments):
49 49 max_size = arguments.pop('max_size', _default_max_size)
50 50
51 51 LRUDictClass = LRUDict
52 52 if arguments.pop('log_key_count', None):
53 53 LRUDictClass = LRUDictDebug
54 54
55 55 arguments['cache_dict'] = LRUDictClass(max_size)
56 56 super(LRUMemoryBackend, self).__init__(arguments)
57 57
58 58 def delete(self, key):
59 59 try:
60 60 del self._cache[key]
61 61 except KeyError:
62 62 # we don't care if key isn't there at deletion
63 63 pass
64 64
65 65 def delete_multi(self, keys):
66 66 for key in keys:
67 67 self.delete(key)
68 68
69 69
70 70 class PickleSerializer(object):
71 71
72 72 def _dumps(self, value, safe=False):
73 73 try:
74 74 return compat.pickle.dumps(value)
75 75 except Exception:
76 76 if safe:
77 77 return NO_VALUE
78 78 else:
79 79 raise
80 80
81 81 def _loads(self, value, safe=True):
82 82 try:
83 83 return compat.pickle.loads(value)
84 84 except Exception:
85 85 if safe:
86 86 return NO_VALUE
87 87 else:
88 88 raise
89 89
90 90
91 91 class MsgPackSerializer(object):
92 92
93 93 def _dumps(self, value, safe=False):
94 94 try:
95 95 return msgpack.packb(value)
96 96 except Exception:
97 97 if safe:
98 98 return NO_VALUE
99 99 else:
100 100 raise
101 101
102 102 def _loads(self, value, safe=True):
103 103 """
104 104 pickle maintained the `CachedValue` wrapper of the tuple
105 105 msgpack does not, so it must be added back in.
106 106 """
107 107 try:
108 108 value = msgpack.unpackb(value, use_list=False)
109 109 return CachedValue(*value)
110 110 except Exception:
111 111 if safe:
112 112 return NO_VALUE
113 113 else:
114 114 raise
115 115
116 116
117 117 import fcntl
118 118 flock_org = fcntl.flock
119 119
120 120
121 121 class CustomLockFactory(FileLock):
122 122
123 123 @memoized_property
124 124 def _module(self):
125 125
126 126 def gevent_flock(fd, operation):
127 127 """
128 128 Gevent compatible flock
129 129 """
130 130 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 131 operation |= fcntl.LOCK_NB
132 132 start_lock_time = time.time()
133 133 timeout = 60 * 15 # 15min
134 134 while True:
135 135 try:
136 136 flock_org(fd, operation)
137 137 # lock has been acquired
138 138 break
139 139 except (OSError, IOError) as e:
140 140 # raise on other errors than Resource temporarily unavailable
141 141 if e.errno != errno.EAGAIN:
142 142 raise
143 143 elif (time.time() - start_lock_time) > timeout:
144 144 # waited to much time on a lock, better fail than loop for ever
145 145 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 146 self.filename, timeout)
147 147 raise
148 148 wait_timeout = 0.03
149 149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 150 self.filename, wait_timeout)
151 151 gevent.sleep(wait_timeout)
152 152
153 153 fcntl.flock = gevent_flock
154 154 return fcntl
155 155
156 156
157 157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 158 key_prefix = 'file_backend'
159 159
160 160 def __init__(self, arguments):
161 161 arguments['lock_factory'] = CustomLockFactory
162 162 db_file = arguments.get('filename')
163 163
164 164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 165 try:
166 166 super(FileNamespaceBackend, self).__init__(arguments)
167 167 except Exception:
168 168 log.error('Failed to initialize db at: %s', db_file)
169 169 raise
170 170
171 171 def __repr__(self):
172 172 return '{} `{}`'.format(self.__class__, self.filename)
173 173
174 174 def list_keys(self, prefix=''):
175 175 prefix = '{}:{}'.format(self.key_prefix, prefix)
176 176
177 177 def cond(v):
178 178 if not prefix:
179 179 return True
180 180
181 181 if v.startswith(prefix):
182 182 return True
183 183 return False
184 184
185 185 with self._dbm_file(True) as dbm:
186
186 try:
187 187 return filter(cond, dbm.keys())
188 except Exception:
189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 raise
188 191
189 192 def get_store(self):
190 193 return self.filename
191 194
192 195 def _dbm_get(self, key):
193 196 with self._dbm_file(False) as dbm:
194 197 if hasattr(dbm, 'get'):
195 198 value = dbm.get(key, NO_VALUE)
196 199 else:
197 200 # gdbm objects lack a .get method
198 201 try:
199 202 value = dbm[key]
200 203 except KeyError:
201 204 value = NO_VALUE
202 205 if value is not NO_VALUE:
203 206 value = self._loads(value)
204 207 return value
205 208
206 209 def get(self, key):
207 210 try:
208 211 return self._dbm_get(key)
209 212 except Exception:
210 213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
211 214 raise
212 215
213 216 def set(self, key, value):
214 217 with self._dbm_file(True) as dbm:
215 218 dbm[key] = self._dumps(value)
216 219
217 220 def set_multi(self, mapping):
218 221 with self._dbm_file(True) as dbm:
219 222 for key, value in mapping.items():
220 223 dbm[key] = self._dumps(value)
221 224
222 225
223 226 class BaseRedisBackend(redis_backend.RedisBackend):
224 227
225 228 def _create_client(self):
226 229 args = {}
227 230
228 231 if self.url is not None:
229 232 args.update(url=self.url)
230 233
231 234 else:
232 235 args.update(
233 236 host=self.host, password=self.password,
234 237 port=self.port, db=self.db
235 238 )
236 239
237 240 connection_pool = redis.ConnectionPool(**args)
238 241
239 242 return redis.StrictRedis(connection_pool=connection_pool)
240 243
241 244 def list_keys(self, prefix=''):
242 245 prefix = '{}:{}*'.format(self.key_prefix, prefix)
243 246 return self.client.keys(prefix)
244 247
245 248 def get_store(self):
246 249 return self.client.connection_pool
247 250
248 251 def get(self, key):
249 252 value = self.client.get(key)
250 253 if value is None:
251 254 return NO_VALUE
252 255 return self._loads(value)
253 256
254 257 def get_multi(self, keys):
255 258 if not keys:
256 259 return []
257 260 values = self.client.mget(keys)
258 261 loads = self._loads
259 262 return [
260 263 loads(v) if v is not None else NO_VALUE
261 264 for v in values]
262 265
263 266 def set(self, key, value):
264 267 if self.redis_expiration_time:
265 268 self.client.setex(key, self.redis_expiration_time,
266 269 self._dumps(value))
267 270 else:
268 271 self.client.set(key, self._dumps(value))
269 272
270 273 def set_multi(self, mapping):
271 274 dumps = self._dumps
272 275 mapping = dict(
273 276 (k, dumps(v))
274 277 for k, v in mapping.items()
275 278 )
276 279
277 280 if not self.redis_expiration_time:
278 281 self.client.mset(mapping)
279 282 else:
280 283 pipe = self.client.pipeline()
281 284 for key, value in mapping.items():
282 285 pipe.setex(key, self.redis_expiration_time, value)
283 286 pipe.execute()
284 287
285 288 def get_mutex(self, key):
286 289 u = redis_backend.u
287 290 if self.distributed_lock:
288 291 lock_key = u('_lock_{0}').format(key)
289 292 log.debug('Trying to acquire Redis lock for key %s', lock_key)
290 293 return self.client.lock(lock_key, self.lock_timeout, self.lock_sleep)
291 294 else:
292 295 return None
293 296
294 297
295 298 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
296 299 key_prefix = 'redis_pickle_backend'
297 300 pass
298 301
299 302
300 303 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
301 304 key_prefix = 'redis_msgpack_backend'
302 305 pass
General Comments 0
You need to be logged in to leave comments. Login now