##// END OF EJS Templates
caches: synced code with ce changes
super-admin -
r1062:837924f7 python3
parent child Browse files
Show More
@@ -1,79 +1,80 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import logging
19 19 from dogpile.cache import register_backend
20 module_name = 'vcsserver'
20 21
21 22 register_backend(
22 "dogpile.cache.rc.memory_lru", "vcsserver.lib.rc_cache.backends",
23 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
23 24 "LRUMemoryBackend")
24 25
25 26 register_backend(
26 "dogpile.cache.rc.file_namespace", "vcsserver.lib.rc_cache.backends",
27 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
27 28 "FileNamespaceBackend")
28 29
29 30 register_backend(
30 "dogpile.cache.rc.redis", "vcsserver.lib.rc_cache.backends",
31 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
31 32 "RedisPickleBackend")
32 33
33 34 register_backend(
34 "dogpile.cache.rc.redis_msgpack", "vcsserver.lib.rc_cache.backends",
35 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
35 36 "RedisMsgPackBackend")
36 37
37 38
38 39 log = logging.getLogger(__name__)
39 40
40 41 from . import region_meta
41 42 from .utils import (
42 43 get_default_cache_settings, backend_key_generator, get_or_create_region,
43 44 clear_cache_namespace, make_region)
44 45
45 46
46 47 def configure_dogpile_cache(settings):
47 48 cache_dir = settings.get('cache_dir')
48 49 if cache_dir:
49 50 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
50 51
51 52 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
52 53
53 54 # inspect available namespaces
54 55 avail_regions = set()
55 56 for key in rc_cache_data.keys():
56 57 namespace_name = key.split('.', 1)[0]
57 58 if namespace_name in avail_regions:
58 59 continue
59 60
60 61 avail_regions.add(namespace_name)
61 62 log.debug('dogpile: found following cache regions: %s', namespace_name)
62 63
63 64 new_region = make_region(
64 65 name=namespace_name,
65 66 function_key_generator=None
66 67 )
67 68
68 69 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
69 70 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
70 71 if log.isEnabledFor(logging.DEBUG):
71 72 region_args = dict(backend=new_region.actual_backend.__class__,
72 73 region_invalidator=new_region.region_invalidator.__class__)
73 74 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
74 75
75 76 region_meta.dogpile_cache_regions[namespace_name] = new_region
76 77
77 78
78 79 def includeme(config):
79 80 configure_dogpile_cache(config.registry.settings)
@@ -1,330 +1,239 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import time
19 19 import errno
20 20 import logging
21 import functools
21 22
22 23 import msgpack
23 24 import redis
24 25 import pickle
26 import fcntl
27 flock_org = fcntl.flock
28 from typing import Union
25 29
26 from dogpile.cache.api import CachedValue
27 30 from dogpile.cache.backends import memory as memory_backend
28 31 from dogpile.cache.backends import file as file_backend
29 32 from dogpile.cache.backends import redis as redis_backend
30 from dogpile.cache.backends.file import NO_VALUE, FileLock
33 from dogpile.cache.backends.file import FileLock
31 34 from dogpile.cache.util import memoized_property
35 from dogpile.cache.api import Serializer, Deserializer
32 36
33 37 from pyramid.settings import asbool
34 38
35 39 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 40 from vcsserver.str_utils import safe_str
37 41
38 42
39 43 _default_max_size = 1024
40 44
41 45 log = logging.getLogger(__name__)
42 46
43 47
44 48 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 49 key_prefix = 'lru_mem_backend'
46 50 pickle_values = False
47 51
48 52 def __init__(self, arguments):
49 53 max_size = arguments.pop('max_size', _default_max_size)
50 54
51 55 LRUDictClass = LRUDict
52 56 if arguments.pop('log_key_count', None):
53 57 LRUDictClass = LRUDictDebug
54 58
55 59 arguments['cache_dict'] = LRUDictClass(max_size)
56 60 super(LRUMemoryBackend, self).__init__(arguments)
57 61
58 62 def delete(self, key):
59 63 try:
60 64 del self._cache[key]
61 65 except KeyError:
62 66 # we don't care if key isn't there at deletion
63 67 pass
64 68
65 69 def delete_multi(self, keys):
66 70 for key in keys:
67 71 self.delete(key)
68 72
69 73
70 class PickleSerializer(object):
71
72 def _dumps(self, value, safe=False):
73 try:
74 return pickle.dumps(value)
75 except Exception:
76 if safe:
77 return NO_VALUE
78 else:
79 raise
80
81 def _loads(self, value, safe=True):
82 try:
83 return pickle.loads(value)
84 except Exception:
85 if safe:
86 return NO_VALUE
87 else:
88 raise
74 class PickleSerializer:
75 serializer: Union[None, Serializer] = staticmethod( # type: ignore
76 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
77 )
78 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
79 functools.partial(pickle.loads)
80 )
89 81
90 82
91 83 class MsgPackSerializer(object):
92
93 def _dumps(self, value, safe=False):
94 try:
95 return msgpack.packb(value)
96 except Exception:
97 if safe:
98 return NO_VALUE
99 else:
100 raise
101
102 def _loads(self, value, safe=True):
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
106 """
107 try:
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
110 except Exception:
111 if safe:
112 return NO_VALUE
113 else:
114 raise
115
116
117 import fcntl
118 flock_org = fcntl.flock
84 serializer: Union[None, Serializer] = staticmethod( # type: ignore
85 msgpack.packb
86 )
87 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
88 functools.partial(msgpack.unpackb, use_list=False)
89 )
119 90
120 91
121 92 class CustomLockFactory(FileLock):
122 93
123 94 pass
124 95
125 96
126 97 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
127 98 key_prefix = 'file_backend'
128 99
129 100 def __init__(self, arguments):
130 101 arguments['lock_factory'] = CustomLockFactory
131 102 db_file = arguments.get('filename')
132 103
133 104 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
134 105 try:
135 106 super(FileNamespaceBackend, self).__init__(arguments)
136 107 except Exception:
137 108 log.exception('Failed to initialize db at: %s', db_file)
138 109 raise
139 110
140 111 def __repr__(self):
141 112 return '{} `{}`'.format(self.__class__, self.filename)
142 113
143 114 def list_keys(self, prefix=''):
144 115 prefix = '{}:{}'.format(self.key_prefix, prefix)
145 116
146 117 def cond(v):
147 118 if not prefix:
148 119 return True
149 120
150 121 if v.startswith(prefix):
151 122 return True
152 123 return False
153 124
154 125 with self._dbm_file(True) as dbm:
155 126 try:
156 127 return list(filter(cond, list(dbm.keys())))
157 128 except Exception:
158 129 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
159 130 raise
160 131
161 132 def get_store(self):
162 133 return self.filename
163 134
164 def _dbm_get(self, key):
165 with self._dbm_file(False) as dbm:
166 if hasattr(dbm, 'get'):
167 value = dbm.get(key, NO_VALUE)
168 else:
169 # gdbm objects lack a .get method
170 try:
171 value = dbm[key]
172 except KeyError:
173 value = NO_VALUE
174 if value is not NO_VALUE:
175 value = self._loads(value)
176 return value
177
178 def get(self, key):
179 try:
180 return self._dbm_get(key)
181 except Exception:
182 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
183 raise
184
185 def set(self, key, value):
186 with self._dbm_file(True) as dbm:
187 dbm[key] = self._dumps(value)
188
189 def set_multi(self, mapping):
190 with self._dbm_file(True) as dbm:
191 for key, value in mapping.items():
192 dbm[key] = self._dumps(value)
193
194 135
195 136 class BaseRedisBackend(redis_backend.RedisBackend):
196 137 key_prefix = ''
197 138
198 139 def __init__(self, arguments):
199 140 super(BaseRedisBackend, self).__init__(arguments)
200 141 self._lock_timeout = self.lock_timeout
201 142 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
202 143
203 144 if self._lock_auto_renewal and not self._lock_timeout:
204 145 # set default timeout for auto_renewal
205 146 self._lock_timeout = 30
206 147
207 148 def _create_client(self):
208 149 args = {}
209 150
210 151 if self.url is not None:
211 152 args.update(url=self.url)
212 153
213 154 else:
214 155 args.update(
215 156 host=self.host, password=self.password,
216 157 port=self.port, db=self.db
217 158 )
218 159
219 160 connection_pool = redis.ConnectionPool(**args)
220
221 return redis.StrictRedis(connection_pool=connection_pool)
161 self.writer_client = redis.StrictRedis(
162 connection_pool=connection_pool
163 )
164 self.reader_client = self.writer_client
222 165
223 166 def list_keys(self, prefix=''):
224 167 prefix = '{}:{}*'.format(self.key_prefix, prefix)
225 return self.client.keys(prefix)
168 return self.reader_client.keys(prefix)
226 169
227 170 def get_store(self):
228 return self.client.connection_pool
229
230 def get(self, key):
231 value = self.client.get(key)
232 if value is None:
233 return NO_VALUE
234 return self._loads(value)
235
236 def get_multi(self, keys):
237 if not keys:
238 return []
239 values = self.client.mget(keys)
240 loads = self._loads
241 return [
242 loads(v) if v is not None else NO_VALUE
243 for v in values]
244
245 def set(self, key, value):
246 if self.redis_expiration_time:
247 self.client.setex(key, self.redis_expiration_time,
248 self._dumps(value))
249 else:
250 self.client.set(key, self._dumps(value))
251
252 def set_multi(self, mapping):
253 dumps = self._dumps
254 mapping = dict(
255 (k, dumps(v))
256 for k, v in mapping.items()
257 )
258
259 if not self.redis_expiration_time:
260 self.client.mset(mapping)
261 else:
262 pipe = self.client.pipeline()
263 for key, value in mapping.items():
264 pipe.setex(key, self.redis_expiration_time, value)
265 pipe.execute()
171 return self.reader_client.connection_pool
266 172
267 173 def get_mutex(self, key):
268 174 if self.distributed_lock:
269 175 lock_key = '_lock_{0}'.format(safe_str(key))
270 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
271 auto_renewal=self._lock_auto_renewal)
176 return get_mutex_lock(
177 self.writer_client, lock_key,
178 self._lock_timeout,
179 auto_renewal=self._lock_auto_renewal
180 )
272 181 else:
273 182 return None
274 183
275 184
276 185 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
277 186 key_prefix = 'redis_pickle_backend'
278 187 pass
279 188
280 189
281 190 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
282 191 key_prefix = 'redis_msgpack_backend'
283 192 pass
284 193
285 194
286 195 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
287 import redis_lock
196 from vcsserver.lib._vendor import redis_lock
288 197
289 198 class _RedisLockWrapper(object):
290 199 """LockWrapper for redis_lock"""
291 200
292 201 @classmethod
293 202 def get_lock(cls):
294 203 return redis_lock.Lock(
295 204 redis_client=client,
296 205 name=lock_key,
297 206 expire=lock_timeout,
298 207 auto_renewal=auto_renewal,
299 208 strict=True,
300 209 )
301 210
302 211 def __repr__(self):
303 212 return "{}:{}".format(self.__class__.__name__, lock_key)
304 213
305 214 def __str__(self):
306 215 return "{}:{}".format(self.__class__.__name__, lock_key)
307 216
308 217 def __init__(self):
309 218 self.lock = self.get_lock()
310 219 self.lock_key = lock_key
311 220
312 221 def acquire(self, wait=True):
313 222 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
314 223 try:
315 224 acquired = self.lock.acquire(wait)
316 225 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
317 226 return acquired
318 227 except redis_lock.AlreadyAcquired:
319 228 return False
320 229 except redis_lock.AlreadyStarted:
321 230 # refresh thread exists, but it also means we acquired the lock
322 231 return True
323 232
324 233 def release(self):
325 234 try:
326 235 self.lock.release()
327 236 except redis_lock.NotAcquired:
328 237 pass
329 238
330 239 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now