##// END OF EJS Templates
caches: optimized defaults for safer more reliable behaviour
super-admin -
r4720:69c58338 default
parent child Browse files
Show More
@@ -1,346 +1,348 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 24
25 25 import msgpack
26 26 import gevent
27 27 import redis
28 28
29 29 from dogpile.cache.api import CachedValue
30 30 from dogpile.cache.backends import memory as memory_backend
31 31 from dogpile.cache.backends import file as file_backend
32 32 from dogpile.cache.backends import redis as redis_backend
33 33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 34 from dogpile.cache.util import memoized_property
35 35
36 from pyramid.settings import asbool
37
36 38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37 39
38 40
39 41 _default_max_size = 1024
40 42
41 43 log = logging.getLogger(__name__)
42 44
43 45
44 46 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 47 key_prefix = 'lru_mem_backend'
46 48 pickle_values = False
47 49
48 50 def __init__(self, arguments):
49 51 max_size = arguments.pop('max_size', _default_max_size)
50 52
51 53 LRUDictClass = LRUDict
52 54 if arguments.pop('log_key_count', None):
53 55 LRUDictClass = LRUDictDebug
54 56
55 57 arguments['cache_dict'] = LRUDictClass(max_size)
56 58 super(LRUMemoryBackend, self).__init__(arguments)
57 59
58 60 def delete(self, key):
59 61 try:
60 62 del self._cache[key]
61 63 except KeyError:
62 64 # we don't care if key isn't there at deletion
63 65 pass
64 66
65 67 def delete_multi(self, keys):
66 68 for key in keys:
67 69 self.delete(key)
68 70
69 71
70 72 class PickleSerializer(object):
71 73
72 74 def _dumps(self, value, safe=False):
73 75 try:
74 76 return compat.pickle.dumps(value)
75 77 except Exception:
76 78 if safe:
77 79 return NO_VALUE
78 80 else:
79 81 raise
80 82
81 83 def _loads(self, value, safe=True):
82 84 try:
83 85 return compat.pickle.loads(value)
84 86 except Exception:
85 87 if safe:
86 88 return NO_VALUE
87 89 else:
88 90 raise
89 91
90 92
91 93 class MsgPackSerializer(object):
92 94
93 95 def _dumps(self, value, safe=False):
94 96 try:
95 97 return msgpack.packb(value)
96 98 except Exception:
97 99 if safe:
98 100 return NO_VALUE
99 101 else:
100 102 raise
101 103
102 104 def _loads(self, value, safe=True):
103 105 """
104 106 pickle maintained the `CachedValue` wrapper of the tuple
105 107 msgpack does not, so it must be added back in.
106 108 """
107 109 try:
108 110 value = msgpack.unpackb(value, use_list=False)
109 111 return CachedValue(*value)
110 112 except Exception:
111 113 if safe:
112 114 return NO_VALUE
113 115 else:
114 116 raise
115 117
116 118
117 119 import fcntl
118 120 flock_org = fcntl.flock
119 121
120 122
121 123 class CustomLockFactory(FileLock):
122 124
123 125 @memoized_property
124 126 def _module(self):
125 127
126 128 def gevent_flock(fd, operation):
127 129 """
128 130 Gevent compatible flock
129 131 """
130 132 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 133 operation |= fcntl.LOCK_NB
132 134 start_lock_time = time.time()
133 135 timeout = 60 * 15 # 15min
134 136 while True:
135 137 try:
136 138 flock_org(fd, operation)
137 139 # lock has been acquired
138 140 break
139 141 except (OSError, IOError) as e:
140 142 # raise on other errors than Resource temporarily unavailable
141 143 if e.errno != errno.EAGAIN:
142 144 raise
143 145 elif (time.time() - start_lock_time) > timeout:
144 146 # waited to much time on a lock, better fail than loop for ever
145 147 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 148 self.filename, timeout)
147 149 raise
148 150 wait_timeout = 0.03
149 151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 152 self.filename, wait_timeout)
151 153 gevent.sleep(wait_timeout)
152 154
153 155 fcntl.flock = gevent_flock
154 156 return fcntl
155 157
156 158
157 159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 160 key_prefix = 'file_backend'
159 161
160 162 def __init__(self, arguments):
161 163 arguments['lock_factory'] = CustomLockFactory
162 164 db_file = arguments.get('filename')
163 165
164 166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 167 try:
166 168 super(FileNamespaceBackend, self).__init__(arguments)
167 169 except Exception:
168 170 log.error('Failed to initialize db at: %s', db_file)
169 171 raise
170 172
171 173 def __repr__(self):
172 174 return '{} `{}`'.format(self.__class__, self.filename)
173 175
174 176 def list_keys(self, prefix=''):
175 177 prefix = '{}:{}'.format(self.key_prefix, prefix)
176 178
177 179 def cond(v):
178 180 if not prefix:
179 181 return True
180 182
181 183 if v.startswith(prefix):
182 184 return True
183 185 return False
184 186
185 187 with self._dbm_file(True) as dbm:
186 188 try:
187 189 return filter(cond, dbm.keys())
188 190 except Exception:
189 191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 192 raise
191 193
192 194 def get_store(self):
193 195 return self.filename
194 196
195 197 def _dbm_get(self, key):
196 198 with self._dbm_file(False) as dbm:
197 199 if hasattr(dbm, 'get'):
198 200 value = dbm.get(key, NO_VALUE)
199 201 else:
200 202 # gdbm objects lack a .get method
201 203 try:
202 204 value = dbm[key]
203 205 except KeyError:
204 206 value = NO_VALUE
205 207 if value is not NO_VALUE:
206 208 value = self._loads(value)
207 209 return value
208 210
209 211 def get(self, key):
210 212 try:
211 213 return self._dbm_get(key)
212 214 except Exception:
213 215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 216 raise
215 217
216 218 def set(self, key, value):
217 219 with self._dbm_file(True) as dbm:
218 220 dbm[key] = self._dumps(value)
219 221
220 222 def set_multi(self, mapping):
221 223 with self._dbm_file(True) as dbm:
222 224 for key, value in mapping.items():
223 225 dbm[key] = self._dumps(value)
224 226
225 227
226 228 class BaseRedisBackend(redis_backend.RedisBackend):
227 229 key_prefix = ''
228 230
229 231 def __init__(self, arguments):
230 232 super(BaseRedisBackend, self).__init__(arguments)
231 233 self._lock_timeout = self.lock_timeout
232 self._lock_auto_renewal = arguments.pop("lock_auto_renewal", False)
234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
233 235
234 236 if self._lock_auto_renewal and not self._lock_timeout:
235 237 # set default timeout for auto_renewal
236 self._lock_timeout = 60
238 self._lock_timeout = 30
237 239
238 240 def _create_client(self):
239 241 args = {}
240 242
241 243 if self.url is not None:
242 244 args.update(url=self.url)
243 245
244 246 else:
245 247 args.update(
246 248 host=self.host, password=self.password,
247 249 port=self.port, db=self.db
248 250 )
249 251
250 252 connection_pool = redis.ConnectionPool(**args)
251 253
252 254 return redis.StrictRedis(connection_pool=connection_pool)
253 255
254 256 def list_keys(self, prefix=''):
255 257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
256 258 return self.client.keys(prefix)
257 259
258 260 def get_store(self):
259 261 return self.client.connection_pool
260 262
261 263 def get(self, key):
262 264 value = self.client.get(key)
263 265 if value is None:
264 266 return NO_VALUE
265 267 return self._loads(value)
266 268
267 269 def get_multi(self, keys):
268 270 if not keys:
269 271 return []
270 272 values = self.client.mget(keys)
271 273 loads = self._loads
272 274 return [
273 275 loads(v) if v is not None else NO_VALUE
274 276 for v in values]
275 277
276 278 def set(self, key, value):
277 279 if self.redis_expiration_time:
278 280 self.client.setex(key, self.redis_expiration_time,
279 281 self._dumps(value))
280 282 else:
281 283 self.client.set(key, self._dumps(value))
282 284
283 285 def set_multi(self, mapping):
284 286 dumps = self._dumps
285 287 mapping = dict(
286 288 (k, dumps(v))
287 289 for k, v in mapping.items()
288 290 )
289 291
290 292 if not self.redis_expiration_time:
291 293 self.client.mset(mapping)
292 294 else:
293 295 pipe = self.client.pipeline()
294 296 for key, value in mapping.items():
295 297 pipe.setex(key, self.redis_expiration_time, value)
296 298 pipe.execute()
297 299
298 300 def get_mutex(self, key):
299 301 if self.distributed_lock:
300 302 lock_key = redis_backend.u('_lock_{0}').format(key)
301 303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
302 304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
303 305 auto_renewal=self._lock_auto_renewal)
304 306 else:
305 307 return None
306 308
307 309
308 310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
309 311 key_prefix = 'redis_pickle_backend'
310 312 pass
311 313
312 314
313 315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
314 316 key_prefix = 'redis_msgpack_backend'
315 317 pass
316 318
317 319
318 320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
319 321 import redis_lock
320 322
321 323 class _RedisLockWrapper(object):
322 324 """LockWrapper for redis_lock"""
323 325
324 326 def __init__(self):
325 327 pass
326 328
327 329 @property
328 330 def lock(self):
329 331 return redis_lock.Lock(
330 332 redis_client=client,
331 333 name=lock_key,
332 334 expire=lock_timeout,
333 335 auto_renewal=auto_renewal,
334 336 strict=True,
335 337 )
336 338
337 339 def acquire(self, wait=True):
338 340 return self.lock.acquire(wait)
339 341
340 342 def release(self):
341 343 try:
342 344 self.lock.release()
343 345 except redis_lock.NotAcquired:
344 346 pass
345 347
346 348 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now