##// END OF EJS Templates
rccache: refactor and update code to support latest dogpile code changes (mostly on custom serializers)
super-admin -
r4985:fe735b46 default
parent child Browse files
Show More
@@ -1,87 +1,88 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22 from dogpile.cache import register_backend
23 module_name = 'rhodecode'
23 24
24 25 register_backend(
25 "dogpile.cache.rc.memory_lru", "rhodecode.lib.rc_cache.backends",
26 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
26 27 "LRUMemoryBackend")
27 28
28 29 register_backend(
29 "dogpile.cache.rc.file_namespace", "rhodecode.lib.rc_cache.backends",
30 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
30 31 "FileNamespaceBackend")
31 32
32 33 register_backend(
33 "dogpile.cache.rc.redis", "rhodecode.lib.rc_cache.backends",
34 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
34 35 "RedisPickleBackend")
35 36
36 37 register_backend(
37 "dogpile.cache.rc.redis_msgpack", "rhodecode.lib.rc_cache.backends",
38 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
38 39 "RedisMsgPackBackend")
39 40
40 41
41 42 log = logging.getLogger(__name__)
42 43
43 44 from . import region_meta
44 45 from .utils import (
45 46 get_default_cache_settings, backend_key_generator, get_or_create_region,
46 47 clear_cache_namespace, make_region, InvalidationContext,
47 48 FreshRegionCache, ActiveRegionCache)
48 49
49 50
50 51 FILE_TREE_CACHE_VER = 'v4'
51 52 LICENSE_CACHE_VER = 'v2'
52 53
53 54
54 55 def configure_dogpile_cache(settings):
55 56 cache_dir = settings.get('cache_dir')
56 57 if cache_dir:
57 58 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
58 59
59 60 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
60 61
61 62 # inspect available namespaces
62 63 avail_regions = set()
63 64 for key in rc_cache_data.keys():
64 65 namespace_name = key.split('.', 1)[0]
65 66 if namespace_name in avail_regions:
66 67 continue
67 68
68 69 avail_regions.add(namespace_name)
69 70 log.debug('dogpile: found following cache regions: %s', namespace_name)
70 71
71 72 new_region = make_region(
72 73 name=namespace_name,
73 74 function_key_generator=None
74 75 )
75 76
76 77 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
77 78 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
78 79 if log.isEnabledFor(logging.DEBUG):
79 80 region_args = dict(backend=new_region.actual_backend.__class__,
80 81 region_invalidator=new_region.region_invalidator.__class__)
81 82 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
82 83
83 84 region_meta.dogpile_cache_regions[namespace_name] = new_region
84 85
85 86
86 87 def includeme(config):
87 88 configure_dogpile_cache(config.registry.settings)
@@ -1,364 +1,274 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import errno
23 23 import logging
24 import functools
24 25
25 26 import msgpack
26 27 import redis
27 28 import gevent
29 import pickle
30 import fcntl
31 flock_org = fcntl.flock
32 from typing import Union
28 33
29 from dogpile.cache.api import CachedValue
30 34 from dogpile.cache.backends import memory as memory_backend
31 35 from dogpile.cache.backends import file as file_backend
32 36 from dogpile.cache.backends import redis as redis_backend
33 from dogpile.cache.backends.file import NO_VALUE, FileLock
37 from dogpile.cache.backends.file import FileLock
34 38 from dogpile.cache.util import memoized_property
39 from dogpile.cache.api import Serializer, Deserializer
35 40
36 41 from pyramid.settings import asbool
37 42
38 43 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from rhodecode.lib.utils import safe_str
44 from rhodecode.lib.str_utils import safe_str
40 45
41 46
42 47 _default_max_size = 1024
43 48
44 49 log = logging.getLogger(__name__)
45 50
46 51
47 52 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 53 key_prefix = 'lru_mem_backend'
49 54 pickle_values = False
50 55
51 56 def __init__(self, arguments):
52 57 max_size = arguments.pop('max_size', _default_max_size)
53 58
54 59 LRUDictClass = LRUDict
55 60 if arguments.pop('log_key_count', None):
56 61 LRUDictClass = LRUDictDebug
57 62
58 63 arguments['cache_dict'] = LRUDictClass(max_size)
59 64 super(LRUMemoryBackend, self).__init__(arguments)
60 65
61 66 def delete(self, key):
62 67 try:
63 68 del self._cache[key]
64 69 except KeyError:
65 70 # we don't care if key isn't there at deletion
66 71 pass
67 72
68 73 def delete_multi(self, keys):
69 74 for key in keys:
70 75 self.delete(key)
71 76
72 77
73 class PickleSerializer(object):
74
75 def _dumps(self, value, safe=False):
76 try:
77 return pickle.dumps(value)
78 except Exception:
79 if safe:
80 return NO_VALUE
81 else:
82 raise
83
84 def _loads(self, value, safe=True):
85 try:
86 return pickle.loads(value)
87 except Exception:
88 if safe:
89 return NO_VALUE
90 else:
91 raise
78 class PickleSerializer:
79 serializer: Union[None, Serializer] = staticmethod( # type: ignore
80 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
81 )
82 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
83 functools.partial(pickle.loads)
84 )
92 85
93 86
94 87 class MsgPackSerializer(object):
95
96 def _dumps(self, value, safe=False):
97 try:
98 return msgpack.packb(value)
99 except Exception:
100 if safe:
101 return NO_VALUE
102 else:
103 raise
104
105 def _loads(self, value, safe=True):
106 """
107 pickle maintained the `CachedValue` wrapper of the tuple
108 msgpack does not, so it must be added back in.
109 """
110 try:
111 value = msgpack.unpackb(value, use_list=False)
112 return CachedValue(*value)
113 except Exception:
114 if safe:
115 return NO_VALUE
116 else:
117 raise
118
119
120 import fcntl
121 flock_org = fcntl.flock
88 serializer: Union[None, Serializer] = staticmethod( # type: ignore
89 msgpack.packb
90 )
91 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
92 functools.partial(msgpack.unpackb, use_list=False)
93 )
122 94
123 95
124 96 class CustomLockFactory(FileLock):
125 97
126 98 @memoized_property
127 99 def _module(self):
128 100
129 101 def gevent_flock(fd, operation):
130 102 """
131 103 Gevent compatible flock
132 104 """
133 105 # set non-blocking, this will cause an exception if we cannot acquire a lock
134 106 operation |= fcntl.LOCK_NB
135 107 start_lock_time = time.time()
136 108 timeout = 60 * 15 # 15min
137 109 while True:
138 110 try:
139 111 flock_org(fd, operation)
140 112 # lock has been acquired
141 113 break
142 114 except (OSError, IOError) as e:
143 115 # raise on other errors than Resource temporarily unavailable
144 116 if e.errno != errno.EAGAIN:
145 117 raise
146 118 elif (time.time() - start_lock_time) > timeout:
147 119 # waited to much time on a lock, better fail than loop for ever
148 120 log.error('Failed to acquire lock on `%s` after waiting %ss',
149 121 self.filename, timeout)
150 122 raise
151 123 wait_timeout = 0.03
152 124 log.debug('Failed to acquire lock on `%s`, retry in %ss',
153 125 self.filename, wait_timeout)
154 126 gevent.sleep(wait_timeout)
155 127
156 128 fcntl.flock = gevent_flock
157 129 return fcntl
158 130
159 131
160 132 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
161 133 key_prefix = 'file_backend'
162 134
163 135 def __init__(self, arguments):
164 136 arguments['lock_factory'] = CustomLockFactory
165 137 db_file = arguments.get('filename')
166 138
167 139 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
168 140 try:
169 141 super(FileNamespaceBackend, self).__init__(arguments)
170 142 except Exception:
171 143 log.exception('Failed to initialize db at: %s', db_file)
172 144 raise
173 145
174 146 def __repr__(self):
175 147 return '{} `{}`'.format(self.__class__, self.filename)
176 148
177 149 def list_keys(self, prefix=''):
178 150 prefix = '{}:{}'.format(self.key_prefix, prefix)
179 151
180 152 def cond(v):
181 153 if not prefix:
182 154 return True
183 155
184 156 if v.startswith(prefix):
185 157 return True
186 158 return False
187 159
188 160 with self._dbm_file(True) as dbm:
189 161 try:
190 162 return list(filter(cond, list(dbm.keys())))
191 163 except Exception:
192 164 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
193 165 raise
194 166
195 167 def get_store(self):
196 168 return self.filename
197 169
198 def _dbm_get(self, key):
199 with self._dbm_file(False) as dbm:
200 if hasattr(dbm, 'get'):
201 value = dbm.get(key, NO_VALUE)
202 else:
203 # gdbm objects lack a .get method
204 try:
205 value = dbm[key]
206 except KeyError:
207 value = NO_VALUE
208 if value is not NO_VALUE:
209 value = self._loads(value)
210 return value
211
212 def get(self, key):
213 try:
214 return self._dbm_get(key)
215 except Exception:
216 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
217 raise
218
219 def set(self, key, value):
220 with self._dbm_file(True) as dbm:
221 dbm[key] = self._dumps(value)
222
223 def set_multi(self, mapping):
224 with self._dbm_file(True) as dbm:
225 for key, value in mapping.items():
226 dbm[key] = self._dumps(value)
227
228 170
229 171 class BaseRedisBackend(redis_backend.RedisBackend):
230 172 key_prefix = ''
231 173
232 174 def __init__(self, arguments):
233 175 super(BaseRedisBackend, self).__init__(arguments)
234 176 self._lock_timeout = self.lock_timeout
235 177 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
236 178
237 179 if self._lock_auto_renewal and not self._lock_timeout:
238 180 # set default timeout for auto_renewal
239 181 self._lock_timeout = 30
240 182
241 183 def _create_client(self):
242 184 args = {}
243 185
244 186 if self.url is not None:
245 187 args.update(url=self.url)
246 188
247 189 else:
248 190 args.update(
249 191 host=self.host, password=self.password,
250 192 port=self.port, db=self.db
251 193 )
252 194
253 195 connection_pool = redis.ConnectionPool(**args)
254
255 return redis.StrictRedis(connection_pool=connection_pool)
196 self.writer_client = redis.StrictRedis(
197 connection_pool=connection_pool
198 )
199 self.reader_client = self.writer_client
256 200
257 201 def list_keys(self, prefix=''):
258 202 prefix = '{}:{}*'.format(self.key_prefix, prefix)
259 return self.client.keys(prefix)
203 return self.reader_client.keys(prefix)
260 204
261 205 def get_store(self):
262 return self.client.connection_pool
263
264 def get(self, key):
265 value = self.client.get(key)
266 if value is None:
267 return NO_VALUE
268 return self._loads(value)
269
270 def get_multi(self, keys):
271 if not keys:
272 return []
273 values = self.client.mget(keys)
274 loads = self._loads
275 return [
276 loads(v) if v is not None else NO_VALUE
277 for v in values]
278
279 def set(self, key, value):
280 if self.redis_expiration_time:
281 self.client.setex(key, self.redis_expiration_time,
282 self._dumps(value))
283 else:
284 self.client.set(key, self._dumps(value))
285
286 def set_multi(self, mapping):
287 dumps = self._dumps
288 mapping = dict(
289 (k, dumps(v))
290 for k, v in mapping.items()
291 )
292
293 if not self.redis_expiration_time:
294 self.client.mset(mapping)
295 else:
296 pipe = self.client.pipeline()
297 for key, value in mapping.items():
298 pipe.setex(key, self.redis_expiration_time, value)
299 pipe.execute()
206 return self.reader_client.connection_pool
300 207
301 208 def get_mutex(self, key):
302 209 if self.distributed_lock:
303 210 lock_key = '_lock_{0}'.format(safe_str(key))
304 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
305 auto_renewal=self._lock_auto_renewal)
211 return get_mutex_lock(
212 self.writer_client, lock_key,
213 self._lock_timeout,
214 auto_renewal=self._lock_auto_renewal
215 )
306 216 else:
307 217 return None
308 218
309 219
310 220 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
311 221 key_prefix = 'redis_pickle_backend'
312 222 pass
313 223
314 224
315 225 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
316 226 key_prefix = 'redis_msgpack_backend'
317 227 pass
318 228
319 229
320 230 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
321 import redis_lock
231 from rhodecode.lib._vendor import redis_lock
322 232
323 233 class _RedisLockWrapper(object):
324 234 """LockWrapper for redis_lock"""
325 235
326 236 @classmethod
327 237 def get_lock(cls):
328 238 return redis_lock.Lock(
329 239 redis_client=client,
330 240 name=lock_key,
331 241 expire=lock_timeout,
332 242 auto_renewal=auto_renewal,
333 243 strict=True,
334 244 )
335 245
336 246 def __repr__(self):
337 247 return "{}:{}".format(self.__class__.__name__, lock_key)
338 248
339 249 def __str__(self):
340 250 return "{}:{}".format(self.__class__.__name__, lock_key)
341 251
342 252 def __init__(self):
343 253 self.lock = self.get_lock()
344 254 self.lock_key = lock_key
345 255
346 256 def acquire(self, wait=True):
347 257 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
348 258 try:
349 259 acquired = self.lock.acquire(wait)
350 260 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
351 261 return acquired
352 262 except redis_lock.AlreadyAcquired:
353 263 return False
354 264 except redis_lock.AlreadyStarted:
355 265 # refresh thread exists, but it also means we acquired the lock
356 266 return True
357 267
358 268 def release(self):
359 269 try:
360 270 self.lock.release()
361 271 except redis_lock.NotAcquired:
362 272 pass
363 273
364 274 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now