##// END OF EJS Templates
caches: optimized defaults for safer more reliable behaviour
super-admin -
r4720:69c58338 default
parent child Browse files
Show More
@@ -1,346 +1,348 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import time
21 import time
22 import errno
22 import errno
23 import logging
23 import logging
24
24
25 import msgpack
25 import msgpack
26 import gevent
26 import gevent
27 import redis
27 import redis
28
28
29 from dogpile.cache.api import CachedValue
29 from dogpile.cache.api import CachedValue
30 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
31 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
32 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
33 from dogpile.cache.backends.file import NO_VALUE, compat, FileLock
34 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35
35
36 from pyramid.settings import asbool
37
36 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
38 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
37
39
38
40
39 _default_max_size = 1024
41 _default_max_size = 1024
40
42
41 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
42
44
43
45
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
46 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
47 key_prefix = 'lru_mem_backend'
46 pickle_values = False
48 pickle_values = False
47
49
48 def __init__(self, arguments):
50 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
51 max_size = arguments.pop('max_size', _default_max_size)
50
52
51 LRUDictClass = LRUDict
53 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
54 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
55 LRUDictClass = LRUDictDebug
54
56
55 arguments['cache_dict'] = LRUDictClass(max_size)
57 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
58 super(LRUMemoryBackend, self).__init__(arguments)
57
59
58 def delete(self, key):
60 def delete(self, key):
59 try:
61 try:
60 del self._cache[key]
62 del self._cache[key]
61 except KeyError:
63 except KeyError:
62 # we don't care if key isn't there at deletion
64 # we don't care if key isn't there at deletion
63 pass
65 pass
64
66
65 def delete_multi(self, keys):
67 def delete_multi(self, keys):
66 for key in keys:
68 for key in keys:
67 self.delete(key)
69 self.delete(key)
68
70
69
71
70 class PickleSerializer(object):
72 class PickleSerializer(object):
71
73
72 def _dumps(self, value, safe=False):
74 def _dumps(self, value, safe=False):
73 try:
75 try:
74 return compat.pickle.dumps(value)
76 return compat.pickle.dumps(value)
75 except Exception:
77 except Exception:
76 if safe:
78 if safe:
77 return NO_VALUE
79 return NO_VALUE
78 else:
80 else:
79 raise
81 raise
80
82
81 def _loads(self, value, safe=True):
83 def _loads(self, value, safe=True):
82 try:
84 try:
83 return compat.pickle.loads(value)
85 return compat.pickle.loads(value)
84 except Exception:
86 except Exception:
85 if safe:
87 if safe:
86 return NO_VALUE
88 return NO_VALUE
87 else:
89 else:
88 raise
90 raise
89
91
90
92
91 class MsgPackSerializer(object):
93 class MsgPackSerializer(object):
92
94
93 def _dumps(self, value, safe=False):
95 def _dumps(self, value, safe=False):
94 try:
96 try:
95 return msgpack.packb(value)
97 return msgpack.packb(value)
96 except Exception:
98 except Exception:
97 if safe:
99 if safe:
98 return NO_VALUE
100 return NO_VALUE
99 else:
101 else:
100 raise
102 raise
101
103
102 def _loads(self, value, safe=True):
104 def _loads(self, value, safe=True):
103 """
105 """
104 pickle maintained the `CachedValue` wrapper of the tuple
106 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
107 msgpack does not, so it must be added back in.
106 """
108 """
107 try:
109 try:
108 value = msgpack.unpackb(value, use_list=False)
110 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
111 return CachedValue(*value)
110 except Exception:
112 except Exception:
111 if safe:
113 if safe:
112 return NO_VALUE
114 return NO_VALUE
113 else:
115 else:
114 raise
116 raise
115
117
116
118
117 import fcntl
119 import fcntl
118 flock_org = fcntl.flock
120 flock_org = fcntl.flock
119
121
120
122
121 class CustomLockFactory(FileLock):
123 class CustomLockFactory(FileLock):
122
124
123 @memoized_property
125 @memoized_property
124 def _module(self):
126 def _module(self):
125
127
126 def gevent_flock(fd, operation):
128 def gevent_flock(fd, operation):
127 """
129 """
128 Gevent compatible flock
130 Gevent compatible flock
129 """
131 """
130 # set non-blocking, this will cause an exception if we cannot acquire a lock
132 # set non-blocking, this will cause an exception if we cannot acquire a lock
131 operation |= fcntl.LOCK_NB
133 operation |= fcntl.LOCK_NB
132 start_lock_time = time.time()
134 start_lock_time = time.time()
133 timeout = 60 * 15 # 15min
135 timeout = 60 * 15 # 15min
134 while True:
136 while True:
135 try:
137 try:
136 flock_org(fd, operation)
138 flock_org(fd, operation)
137 # lock has been acquired
139 # lock has been acquired
138 break
140 break
139 except (OSError, IOError) as e:
141 except (OSError, IOError) as e:
140 # raise on other errors than Resource temporarily unavailable
142 # raise on other errors than Resource temporarily unavailable
141 if e.errno != errno.EAGAIN:
143 if e.errno != errno.EAGAIN:
142 raise
144 raise
143 elif (time.time() - start_lock_time) > timeout:
145 elif (time.time() - start_lock_time) > timeout:
144 # waited to much time on a lock, better fail than loop for ever
146 # waited to much time on a lock, better fail than loop for ever
145 log.error('Failed to acquire lock on `%s` after waiting %ss',
147 log.error('Failed to acquire lock on `%s` after waiting %ss',
146 self.filename, timeout)
148 self.filename, timeout)
147 raise
149 raise
148 wait_timeout = 0.03
150 wait_timeout = 0.03
149 log.debug('Failed to acquire lock on `%s`, retry in %ss',
151 log.debug('Failed to acquire lock on `%s`, retry in %ss',
150 self.filename, wait_timeout)
152 self.filename, wait_timeout)
151 gevent.sleep(wait_timeout)
153 gevent.sleep(wait_timeout)
152
154
153 fcntl.flock = gevent_flock
155 fcntl.flock = gevent_flock
154 return fcntl
156 return fcntl
155
157
156
158
157 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
159 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
158 key_prefix = 'file_backend'
160 key_prefix = 'file_backend'
159
161
160 def __init__(self, arguments):
162 def __init__(self, arguments):
161 arguments['lock_factory'] = CustomLockFactory
163 arguments['lock_factory'] = CustomLockFactory
162 db_file = arguments.get('filename')
164 db_file = arguments.get('filename')
163
165
164 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
166 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
165 try:
167 try:
166 super(FileNamespaceBackend, self).__init__(arguments)
168 super(FileNamespaceBackend, self).__init__(arguments)
167 except Exception:
169 except Exception:
168 log.error('Failed to initialize db at: %s', db_file)
170 log.error('Failed to initialize db at: %s', db_file)
169 raise
171 raise
170
172
171 def __repr__(self):
173 def __repr__(self):
172 return '{} `{}`'.format(self.__class__, self.filename)
174 return '{} `{}`'.format(self.__class__, self.filename)
173
175
174 def list_keys(self, prefix=''):
176 def list_keys(self, prefix=''):
175 prefix = '{}:{}'.format(self.key_prefix, prefix)
177 prefix = '{}:{}'.format(self.key_prefix, prefix)
176
178
177 def cond(v):
179 def cond(v):
178 if not prefix:
180 if not prefix:
179 return True
181 return True
180
182
181 if v.startswith(prefix):
183 if v.startswith(prefix):
182 return True
184 return True
183 return False
185 return False
184
186
185 with self._dbm_file(True) as dbm:
187 with self._dbm_file(True) as dbm:
186 try:
188 try:
187 return filter(cond, dbm.keys())
189 return filter(cond, dbm.keys())
188 except Exception:
190 except Exception:
189 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
191 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
190 raise
192 raise
191
193
192 def get_store(self):
194 def get_store(self):
193 return self.filename
195 return self.filename
194
196
195 def _dbm_get(self, key):
197 def _dbm_get(self, key):
196 with self._dbm_file(False) as dbm:
198 with self._dbm_file(False) as dbm:
197 if hasattr(dbm, 'get'):
199 if hasattr(dbm, 'get'):
198 value = dbm.get(key, NO_VALUE)
200 value = dbm.get(key, NO_VALUE)
199 else:
201 else:
200 # gdbm objects lack a .get method
202 # gdbm objects lack a .get method
201 try:
203 try:
202 value = dbm[key]
204 value = dbm[key]
203 except KeyError:
205 except KeyError:
204 value = NO_VALUE
206 value = NO_VALUE
205 if value is not NO_VALUE:
207 if value is not NO_VALUE:
206 value = self._loads(value)
208 value = self._loads(value)
207 return value
209 return value
208
210
209 def get(self, key):
211 def get(self, key):
210 try:
212 try:
211 return self._dbm_get(key)
213 return self._dbm_get(key)
212 except Exception:
214 except Exception:
213 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
215 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
214 raise
216 raise
215
217
216 def set(self, key, value):
218 def set(self, key, value):
217 with self._dbm_file(True) as dbm:
219 with self._dbm_file(True) as dbm:
218 dbm[key] = self._dumps(value)
220 dbm[key] = self._dumps(value)
219
221
220 def set_multi(self, mapping):
222 def set_multi(self, mapping):
221 with self._dbm_file(True) as dbm:
223 with self._dbm_file(True) as dbm:
222 for key, value in mapping.items():
224 for key, value in mapping.items():
223 dbm[key] = self._dumps(value)
225 dbm[key] = self._dumps(value)
224
226
225
227
226 class BaseRedisBackend(redis_backend.RedisBackend):
228 class BaseRedisBackend(redis_backend.RedisBackend):
227 key_prefix = ''
229 key_prefix = ''
228
230
229 def __init__(self, arguments):
231 def __init__(self, arguments):
230 super(BaseRedisBackend, self).__init__(arguments)
232 super(BaseRedisBackend, self).__init__(arguments)
231 self._lock_timeout = self.lock_timeout
233 self._lock_timeout = self.lock_timeout
232 self._lock_auto_renewal = arguments.pop("lock_auto_renewal", False)
234 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
233
235
234 if self._lock_auto_renewal and not self._lock_timeout:
236 if self._lock_auto_renewal and not self._lock_timeout:
235 # set default timeout for auto_renewal
237 # set default timeout for auto_renewal
236 self._lock_timeout = 60
238 self._lock_timeout = 30
237
239
238 def _create_client(self):
240 def _create_client(self):
239 args = {}
241 args = {}
240
242
241 if self.url is not None:
243 if self.url is not None:
242 args.update(url=self.url)
244 args.update(url=self.url)
243
245
244 else:
246 else:
245 args.update(
247 args.update(
246 host=self.host, password=self.password,
248 host=self.host, password=self.password,
247 port=self.port, db=self.db
249 port=self.port, db=self.db
248 )
250 )
249
251
250 connection_pool = redis.ConnectionPool(**args)
252 connection_pool = redis.ConnectionPool(**args)
251
253
252 return redis.StrictRedis(connection_pool=connection_pool)
254 return redis.StrictRedis(connection_pool=connection_pool)
253
255
254 def list_keys(self, prefix=''):
256 def list_keys(self, prefix=''):
255 prefix = '{}:{}*'.format(self.key_prefix, prefix)
257 prefix = '{}:{}*'.format(self.key_prefix, prefix)
256 return self.client.keys(prefix)
258 return self.client.keys(prefix)
257
259
258 def get_store(self):
260 def get_store(self):
259 return self.client.connection_pool
261 return self.client.connection_pool
260
262
261 def get(self, key):
263 def get(self, key):
262 value = self.client.get(key)
264 value = self.client.get(key)
263 if value is None:
265 if value is None:
264 return NO_VALUE
266 return NO_VALUE
265 return self._loads(value)
267 return self._loads(value)
266
268
267 def get_multi(self, keys):
269 def get_multi(self, keys):
268 if not keys:
270 if not keys:
269 return []
271 return []
270 values = self.client.mget(keys)
272 values = self.client.mget(keys)
271 loads = self._loads
273 loads = self._loads
272 return [
274 return [
273 loads(v) if v is not None else NO_VALUE
275 loads(v) if v is not None else NO_VALUE
274 for v in values]
276 for v in values]
275
277
276 def set(self, key, value):
278 def set(self, key, value):
277 if self.redis_expiration_time:
279 if self.redis_expiration_time:
278 self.client.setex(key, self.redis_expiration_time,
280 self.client.setex(key, self.redis_expiration_time,
279 self._dumps(value))
281 self._dumps(value))
280 else:
282 else:
281 self.client.set(key, self._dumps(value))
283 self.client.set(key, self._dumps(value))
282
284
283 def set_multi(self, mapping):
285 def set_multi(self, mapping):
284 dumps = self._dumps
286 dumps = self._dumps
285 mapping = dict(
287 mapping = dict(
286 (k, dumps(v))
288 (k, dumps(v))
287 for k, v in mapping.items()
289 for k, v in mapping.items()
288 )
290 )
289
291
290 if not self.redis_expiration_time:
292 if not self.redis_expiration_time:
291 self.client.mset(mapping)
293 self.client.mset(mapping)
292 else:
294 else:
293 pipe = self.client.pipeline()
295 pipe = self.client.pipeline()
294 for key, value in mapping.items():
296 for key, value in mapping.items():
295 pipe.setex(key, self.redis_expiration_time, value)
297 pipe.setex(key, self.redis_expiration_time, value)
296 pipe.execute()
298 pipe.execute()
297
299
298 def get_mutex(self, key):
300 def get_mutex(self, key):
299 if self.distributed_lock:
301 if self.distributed_lock:
300 lock_key = redis_backend.u('_lock_{0}').format(key)
302 lock_key = redis_backend.u('_lock_{0}').format(key)
301 log.debug('Trying to acquire Redis lock for key %s', lock_key)
303 log.debug('Trying to acquire Redis lock for key %s', lock_key)
302 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
303 auto_renewal=self._lock_auto_renewal)
305 auto_renewal=self._lock_auto_renewal)
304 else:
306 else:
305 return None
307 return None
306
308
307
309
308 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
310 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
309 key_prefix = 'redis_pickle_backend'
311 key_prefix = 'redis_pickle_backend'
310 pass
312 pass
311
313
312
314
313 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
315 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
314 key_prefix = 'redis_msgpack_backend'
316 key_prefix = 'redis_msgpack_backend'
315 pass
317 pass
316
318
317
319
318 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
320 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
319 import redis_lock
321 import redis_lock
320
322
321 class _RedisLockWrapper(object):
323 class _RedisLockWrapper(object):
322 """LockWrapper for redis_lock"""
324 """LockWrapper for redis_lock"""
323
325
324 def __init__(self):
326 def __init__(self):
325 pass
327 pass
326
328
327 @property
329 @property
328 def lock(self):
330 def lock(self):
329 return redis_lock.Lock(
331 return redis_lock.Lock(
330 redis_client=client,
332 redis_client=client,
331 name=lock_key,
333 name=lock_key,
332 expire=lock_timeout,
334 expire=lock_timeout,
333 auto_renewal=auto_renewal,
335 auto_renewal=auto_renewal,
334 strict=True,
336 strict=True,
335 )
337 )
336
338
337 def acquire(self, wait=True):
339 def acquire(self, wait=True):
338 return self.lock.acquire(wait)
340 return self.lock.acquire(wait)
339
341
340 def release(self):
342 def release(self):
341 try:
343 try:
342 self.lock.release()
344 self.lock.release()
343 except redis_lock.NotAcquired:
345 except redis_lock.NotAcquired:
344 pass
346 pass
345
347
346 return _RedisLockWrapper()
348 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now