##// END OF EJS Templates
caches: fixed wrong except
super-admin -
r4724:f2c57809 default
parent child Browse files
Show More
@@ -1,351 +1,351 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 36 from pyramid.settings import asbool
37 37
38 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 39
40 40
41 41 _default_max_size = 1024
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 class LRUMemoryBackend(memory_backend.MemoryBackend):
47 47 key_prefix = 'lru_mem_backend'
48 48 pickle_values = False
49 49
50 50 def __init__(self, arguments):
51 51 max_size = arguments.pop('max_size', _default_max_size)
52 52
53 53 LRUDictClass = LRUDict
54 54 if arguments.pop('log_key_count', None):
55 55 LRUDictClass = LRUDictDebug
56 56
57 57 arguments['cache_dict'] = LRUDictClass(max_size)
58 58 super(LRUMemoryBackend, self).__init__(arguments)
59 59
60 60 def delete(self, key):
61 61 try:
62 62 del self._cache[key]
63 63 except KeyError:
64 64 # we don't care if key isn't there at deletion
65 65 pass
66 66
67 67 def delete_multi(self, keys):
68 68 for key in keys:
69 69 self.delete(key)
70 70
71 71
72 72 class PickleSerializer(object):
73 73
74 74 def _dumps(self, value, safe=False):
75 75 try:
76 76 return compat.pickle.dumps(value)
77 77 except Exception:
78 78 if safe:
79 79 return NO_VALUE
80 80 else:
81 81 raise
82 82
83 83 def _loads(self, value, safe=True):
84 84 try:
85 85 return compat.pickle.loads(value)
86 86 except Exception:
87 87 if safe:
88 88 return NO_VALUE
89 89 else:
90 90 raise
91 91
92 92
93 93 class MsgPackSerializer(object):
94 94
95 95 def _dumps(self, value, safe=False):
96 96 try:
97 97 return msgpack.packb(value)
98 98 except Exception:
99 99 if safe:
100 100 return NO_VALUE
101 101 else:
102 102 raise
103 103
104 104 def _loads(self, value, safe=True):
105 105 """
106 106 pickle maintained the `CachedValue` wrapper of the tuple
107 107 msgpack does not, so it must be added back in.
108 108 """
109 109 try:
110 110 value = msgpack.unpackb(value, use_list=False)
111 111 return CachedValue(*value)
112 112 except Exception:
113 113 if safe:
114 114 return NO_VALUE
115 115 else:
116 116 raise
117 117
118 118
119 119 import fcntl
120 120 flock_org = fcntl.flock
121 121
122 122
123 123 class CustomLockFactory(FileLock):
124 124
125 125 @memoized_property
126 126 def _module(self):
127 127
128 128 def gevent_flock(fd, operation):
129 129 """
130 130 Gevent compatible flock
131 131 """
132 132 # set non-blocking, this will cause an exception if we cannot acquire a lock
133 133 operation |= fcntl.LOCK_NB
134 134 start_lock_time = time.time()
135 135 timeout = 60 * 15 # 15min
136 136 while True:
137 137 try:
138 138 flock_org(fd, operation)
139 139 # lock has been acquired
140 140 break
141 141 except (OSError, IOError) as e:
142 142 # raise on other errors than Resource temporarily unavailable
143 143 if e.errno != errno.EAGAIN:
144 144 raise
145 145 elif (time.time() - start_lock_time) > timeout:
146 146 # waited to much time on a lock, better fail than loop for ever
147 147 log.error('Failed to acquire lock on `%s` after waiting %ss',
148 148 self.filename, timeout)
149 149 raise
150 150 wait_timeout = 0.03
151 151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
152 152 self.filename, wait_timeout)
153 153 gevent.sleep(wait_timeout)
154 154
155 155 fcntl.flock = gevent_flock
156 156 return fcntl
157 157
158 158
159 159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
160 160 key_prefix = 'file_backend'
161 161
162 162 def __init__(self, arguments):
163 163 arguments['lock_factory'] = CustomLockFactory
164 164 db_file = arguments.get('filename')
165 165
166 166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
167 167 try:
168 168 super(FileNamespaceBackend, self).__init__(arguments)
169 169 except Exception:
170 170 log.error('Failed to initialize db at: %s', db_file)
171 171 raise
172 172
173 173 def __repr__(self):
174 174 return '{} `{}`'.format(self.__class__, self.filename)
175 175
176 176 def list_keys(self, prefix=''):
177 177 prefix = '{}:{}'.format(self.key_prefix, prefix)
178 178
179 179 def cond(v):
180 180 if not prefix:
181 181 return True
182 182
183 183 if v.startswith(prefix):
184 184 return True
185 185 return False
186 186
187 187 with self._dbm_file(True) as dbm:
188 188 try:
189 189 return filter(cond, dbm.keys())
190 190 except Exception:
191 191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
192 192 raise
193 193
194 194 def get_store(self):
195 195 return self.filename
196 196
197 197 def _dbm_get(self, key):
198 198 with self._dbm_file(False) as dbm:
199 199 if hasattr(dbm, 'get'):
200 200 value = dbm.get(key, NO_VALUE)
201 201 else:
202 202 # gdbm objects lack a .get method
203 203 try:
204 204 value = dbm[key]
205 205 except KeyError:
206 206 value = NO_VALUE
207 207 if value is not NO_VALUE:
208 208 value = self._loads(value)
209 209 return value
210 210
211 211 def get(self, key):
212 212 try:
213 213 return self._dbm_get(key)
214 214 except Exception:
215 215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
216 216 raise
217 217
218 218 def set(self, key, value):
219 219 with self._dbm_file(True) as dbm:
220 220 dbm[key] = self._dumps(value)
221 221
222 222 def set_multi(self, mapping):
223 223 with self._dbm_file(True) as dbm:
224 224 for key, value in mapping.items():
225 225 dbm[key] = self._dumps(value)
226 226
227 227
228 228 class BaseRedisBackend(redis_backend.RedisBackend):
229 229 key_prefix = ''
230 230
231 231 def __init__(self, arguments):
232 232 super(BaseRedisBackend, self).__init__(arguments)
233 233 self._lock_timeout = self.lock_timeout
234 234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
235 235
236 236 if self._lock_auto_renewal and not self._lock_timeout:
237 237 # set default timeout for auto_renewal
238 238 self._lock_timeout = 30
239 239
240 240 def _create_client(self):
241 241 args = {}
242 242
243 243 if self.url is not None:
244 244 args.update(url=self.url)
245 245
246 246 else:
247 247 args.update(
248 248 host=self.host, password=self.password,
249 249 port=self.port, db=self.db
250 250 )
251 251
252 252 connection_pool = redis.ConnectionPool(**args)
253 253
254 254 return redis.StrictRedis(connection_pool=connection_pool)
255 255
256 256 def list_keys(self, prefix=''):
257 257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
258 258 return self.client.keys(prefix)
259 259
260 260 def get_store(self):
261 261 return self.client.connection_pool
262 262
263 263 def get(self, key):
264 264 value = self.client.get(key)
265 265 if value is None:
266 266 return NO_VALUE
267 267 return self._loads(value)
268 268
269 269 def get_multi(self, keys):
270 270 if not keys:
271 271 return []
272 272 values = self.client.mget(keys)
273 273 loads = self._loads
274 274 return [
275 275 loads(v) if v is not None else NO_VALUE
276 276 for v in values]
277 277
278 278 def set(self, key, value):
279 279 if self.redis_expiration_time:
280 280 self.client.setex(key, self.redis_expiration_time,
281 281 self._dumps(value))
282 282 else:
283 283 self.client.set(key, self._dumps(value))
284 284
285 285 def set_multi(self, mapping):
286 286 dumps = self._dumps
287 287 mapping = dict(
288 288 (k, dumps(v))
289 289 for k, v in mapping.items()
290 290 )
291 291
292 292 if not self.redis_expiration_time:
293 293 self.client.mset(mapping)
294 294 else:
295 295 pipe = self.client.pipeline()
296 296 for key, value in mapping.items():
297 297 pipe.setex(key, self.redis_expiration_time, value)
298 298 pipe.execute()
299 299
300 300 def get_mutex(self, key):
301 301 if self.distributed_lock:
302 302 lock_key = redis_backend.u('_lock_{0}').format(key)
303 303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
304 304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 305 auto_renewal=self._lock_auto_renewal)
306 306 else:
307 307 return None
308 308
309 309
310 310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 311 key_prefix = 'redis_pickle_backend'
312 312 pass
313 313
314 314
315 315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 316 key_prefix = 'redis_msgpack_backend'
317 317 pass
318 318
319 319
320 320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 321 import redis_lock
322 322
323 323 class _RedisLockWrapper(object):
324 324 """LockWrapper for redis_lock"""
325 325
326 326 @classmethod
327 327 def get_lock(cls):
328 328 return redis_lock.Lock(
329 329 redis_client=client,
330 330 name=lock_key,
331 331 expire=lock_timeout,
332 332 auto_renewal=auto_renewal,
333 333 strict=True,
334 334 )
335 335
336 336 def __init__(self):
337 337 self.lock = self.get_lock()
338 338
339 339 def acquire(self, wait=True):
340 340 try:
341 341 return self.lock.acquire(wait)
342 except redis_lock.AlreadyAquited:
342 except redis_lock.AlreadyAcquired:
343 343 return False
344 344
345 345 def release(self):
346 346 try:
347 347 self.lock.release()
348 348 except redis_lock.NotAcquired:
349 349 pass
350 350
351 351 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now