##// END OF EJS Templates
caches: synced code with ce changes
super-admin -
r1062:837924f7 python3
parent child Browse files
Show More
@@ -1,79 +1,80 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import logging
18 import logging
19 from dogpile.cache import register_backend
19 from dogpile.cache import register_backend
20 module_name = 'vcsserver'
20
21
21 register_backend(
22 register_backend(
22 "dogpile.cache.rc.memory_lru", "vcsserver.lib.rc_cache.backends",
23 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
23 "LRUMemoryBackend")
24 "LRUMemoryBackend")
24
25
25 register_backend(
26 register_backend(
26 "dogpile.cache.rc.file_namespace", "vcsserver.lib.rc_cache.backends",
27 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
27 "FileNamespaceBackend")
28 "FileNamespaceBackend")
28
29
29 register_backend(
30 register_backend(
30 "dogpile.cache.rc.redis", "vcsserver.lib.rc_cache.backends",
31 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
31 "RedisPickleBackend")
32 "RedisPickleBackend")
32
33
33 register_backend(
34 register_backend(
34 "dogpile.cache.rc.redis_msgpack", "vcsserver.lib.rc_cache.backends",
35 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
35 "RedisMsgPackBackend")
36 "RedisMsgPackBackend")
36
37
37
38
38 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
39
40
40 from . import region_meta
41 from . import region_meta
41 from .utils import (
42 from .utils import (
42 get_default_cache_settings, backend_key_generator, get_or_create_region,
43 get_default_cache_settings, backend_key_generator, get_or_create_region,
43 clear_cache_namespace, make_region)
44 clear_cache_namespace, make_region)
44
45
45
46
46 def configure_dogpile_cache(settings):
47 def configure_dogpile_cache(settings):
47 cache_dir = settings.get('cache_dir')
48 cache_dir = settings.get('cache_dir')
48 if cache_dir:
49 if cache_dir:
49 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
50 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
50
51
51 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
52 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
52
53
53 # inspect available namespaces
54 # inspect available namespaces
54 avail_regions = set()
55 avail_regions = set()
55 for key in rc_cache_data.keys():
56 for key in rc_cache_data.keys():
56 namespace_name = key.split('.', 1)[0]
57 namespace_name = key.split('.', 1)[0]
57 if namespace_name in avail_regions:
58 if namespace_name in avail_regions:
58 continue
59 continue
59
60
60 avail_regions.add(namespace_name)
61 avail_regions.add(namespace_name)
61 log.debug('dogpile: found following cache regions: %s', namespace_name)
62 log.debug('dogpile: found following cache regions: %s', namespace_name)
62
63
63 new_region = make_region(
64 new_region = make_region(
64 name=namespace_name,
65 name=namespace_name,
65 function_key_generator=None
66 function_key_generator=None
66 )
67 )
67
68
68 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
69 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
69 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
70 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
70 if log.isEnabledFor(logging.DEBUG):
71 if log.isEnabledFor(logging.DEBUG):
71 region_args = dict(backend=new_region.actual_backend.__class__,
72 region_args = dict(backend=new_region.actual_backend.__class__,
72 region_invalidator=new_region.region_invalidator.__class__)
73 region_invalidator=new_region.region_invalidator.__class__)
73 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
74 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
74
75
75 region_meta.dogpile_cache_regions[namespace_name] = new_region
76 region_meta.dogpile_cache_regions[namespace_name] = new_region
76
77
77
78
78 def includeme(config):
79 def includeme(config):
79 configure_dogpile_cache(config.registry.settings)
80 configure_dogpile_cache(config.registry.settings)
@@ -1,330 +1,239 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2020 RhodeCode GmbH
2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import time
18 import time
19 import errno
19 import errno
20 import logging
20 import logging
21 import functools
21
22
22 import msgpack
23 import msgpack
23 import redis
24 import redis
24 import pickle
25 import pickle
26 import fcntl
27 flock_org = fcntl.flock
28 from typing import Union
25
29
26 from dogpile.cache.api import CachedValue
27 from dogpile.cache.backends import memory as memory_backend
30 from dogpile.cache.backends import memory as memory_backend
28 from dogpile.cache.backends import file as file_backend
31 from dogpile.cache.backends import file as file_backend
29 from dogpile.cache.backends import redis as redis_backend
32 from dogpile.cache.backends import redis as redis_backend
30 from dogpile.cache.backends.file import NO_VALUE, FileLock
33 from dogpile.cache.backends.file import FileLock
31 from dogpile.cache.util import memoized_property
34 from dogpile.cache.util import memoized_property
35 from dogpile.cache.api import Serializer, Deserializer
32
36
33 from pyramid.settings import asbool
37 from pyramid.settings import asbool
34
38
35 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
39 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
36 from vcsserver.str_utils import safe_str
40 from vcsserver.str_utils import safe_str
37
41
38
42
39 _default_max_size = 1024
43 _default_max_size = 1024
40
44
41 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
42
46
43
47
44 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 class LRUMemoryBackend(memory_backend.MemoryBackend):
45 key_prefix = 'lru_mem_backend'
49 key_prefix = 'lru_mem_backend'
46 pickle_values = False
50 pickle_values = False
47
51
48 def __init__(self, arguments):
52 def __init__(self, arguments):
49 max_size = arguments.pop('max_size', _default_max_size)
53 max_size = arguments.pop('max_size', _default_max_size)
50
54
51 LRUDictClass = LRUDict
55 LRUDictClass = LRUDict
52 if arguments.pop('log_key_count', None):
56 if arguments.pop('log_key_count', None):
53 LRUDictClass = LRUDictDebug
57 LRUDictClass = LRUDictDebug
54
58
55 arguments['cache_dict'] = LRUDictClass(max_size)
59 arguments['cache_dict'] = LRUDictClass(max_size)
56 super(LRUMemoryBackend, self).__init__(arguments)
60 super(LRUMemoryBackend, self).__init__(arguments)
57
61
58 def delete(self, key):
62 def delete(self, key):
59 try:
63 try:
60 del self._cache[key]
64 del self._cache[key]
61 except KeyError:
65 except KeyError:
62 # we don't care if key isn't there at deletion
66 # we don't care if key isn't there at deletion
63 pass
67 pass
64
68
65 def delete_multi(self, keys):
69 def delete_multi(self, keys):
66 for key in keys:
70 for key in keys:
67 self.delete(key)
71 self.delete(key)
68
72
69
73
70 class PickleSerializer(object):
74 class PickleSerializer:
71
75 serializer: Union[None, Serializer] = staticmethod( # type: ignore
72 def _dumps(self, value, safe=False):
76 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
73 try:
77 )
74 return pickle.dumps(value)
78 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
75 except Exception:
79 functools.partial(pickle.loads)
76 if safe:
80 )
77 return NO_VALUE
78 else:
79 raise
80
81 def _loads(self, value, safe=True):
82 try:
83 return pickle.loads(value)
84 except Exception:
85 if safe:
86 return NO_VALUE
87 else:
88 raise
89
81
90
82
91 class MsgPackSerializer(object):
83 class MsgPackSerializer(object):
92
84 serializer: Union[None, Serializer] = staticmethod( # type: ignore
93 def _dumps(self, value, safe=False):
85 msgpack.packb
94 try:
86 )
95 return msgpack.packb(value)
87 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
96 except Exception:
88 functools.partial(msgpack.unpackb, use_list=False)
97 if safe:
89 )
98 return NO_VALUE
99 else:
100 raise
101
102 def _loads(self, value, safe=True):
103 """
104 pickle maintained the `CachedValue` wrapper of the tuple
105 msgpack does not, so it must be added back in.
106 """
107 try:
108 value = msgpack.unpackb(value, use_list=False)
109 return CachedValue(*value)
110 except Exception:
111 if safe:
112 return NO_VALUE
113 else:
114 raise
115
116
117 import fcntl
118 flock_org = fcntl.flock
119
90
120
91
121 class CustomLockFactory(FileLock):
92 class CustomLockFactory(FileLock):
122
93
123 pass
94 pass
124
95
125
96
126 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
97 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
127 key_prefix = 'file_backend'
98 key_prefix = 'file_backend'
128
99
129 def __init__(self, arguments):
100 def __init__(self, arguments):
130 arguments['lock_factory'] = CustomLockFactory
101 arguments['lock_factory'] = CustomLockFactory
131 db_file = arguments.get('filename')
102 db_file = arguments.get('filename')
132
103
133 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
104 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
134 try:
105 try:
135 super(FileNamespaceBackend, self).__init__(arguments)
106 super(FileNamespaceBackend, self).__init__(arguments)
136 except Exception:
107 except Exception:
137 log.exception('Failed to initialize db at: %s', db_file)
108 log.exception('Failed to initialize db at: %s', db_file)
138 raise
109 raise
139
110
140 def __repr__(self):
111 def __repr__(self):
141 return '{} `{}`'.format(self.__class__, self.filename)
112 return '{} `{}`'.format(self.__class__, self.filename)
142
113
143 def list_keys(self, prefix=''):
114 def list_keys(self, prefix=''):
144 prefix = '{}:{}'.format(self.key_prefix, prefix)
115 prefix = '{}:{}'.format(self.key_prefix, prefix)
145
116
146 def cond(v):
117 def cond(v):
147 if not prefix:
118 if not prefix:
148 return True
119 return True
149
120
150 if v.startswith(prefix):
121 if v.startswith(prefix):
151 return True
122 return True
152 return False
123 return False
153
124
154 with self._dbm_file(True) as dbm:
125 with self._dbm_file(True) as dbm:
155 try:
126 try:
156 return list(filter(cond, list(dbm.keys())))
127 return list(filter(cond, list(dbm.keys())))
157 except Exception:
128 except Exception:
158 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
129 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
159 raise
130 raise
160
131
161 def get_store(self):
132 def get_store(self):
162 return self.filename
133 return self.filename
163
134
164 def _dbm_get(self, key):
165 with self._dbm_file(False) as dbm:
166 if hasattr(dbm, 'get'):
167 value = dbm.get(key, NO_VALUE)
168 else:
169 # gdbm objects lack a .get method
170 try:
171 value = dbm[key]
172 except KeyError:
173 value = NO_VALUE
174 if value is not NO_VALUE:
175 value = self._loads(value)
176 return value
177
178 def get(self, key):
179 try:
180 return self._dbm_get(key)
181 except Exception:
182 log.error('Failed to fetch DBM key %s from DB: %s', key, self.get_store())
183 raise
184
185 def set(self, key, value):
186 with self._dbm_file(True) as dbm:
187 dbm[key] = self._dumps(value)
188
189 def set_multi(self, mapping):
190 with self._dbm_file(True) as dbm:
191 for key, value in mapping.items():
192 dbm[key] = self._dumps(value)
193
194
135
195 class BaseRedisBackend(redis_backend.RedisBackend):
136 class BaseRedisBackend(redis_backend.RedisBackend):
196 key_prefix = ''
137 key_prefix = ''
197
138
198 def __init__(self, arguments):
139 def __init__(self, arguments):
199 super(BaseRedisBackend, self).__init__(arguments)
140 super(BaseRedisBackend, self).__init__(arguments)
200 self._lock_timeout = self.lock_timeout
141 self._lock_timeout = self.lock_timeout
201 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
142 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
202
143
203 if self._lock_auto_renewal and not self._lock_timeout:
144 if self._lock_auto_renewal and not self._lock_timeout:
204 # set default timeout for auto_renewal
145 # set default timeout for auto_renewal
205 self._lock_timeout = 30
146 self._lock_timeout = 30
206
147
207 def _create_client(self):
148 def _create_client(self):
208 args = {}
149 args = {}
209
150
210 if self.url is not None:
151 if self.url is not None:
211 args.update(url=self.url)
152 args.update(url=self.url)
212
153
213 else:
154 else:
214 args.update(
155 args.update(
215 host=self.host, password=self.password,
156 host=self.host, password=self.password,
216 port=self.port, db=self.db
157 port=self.port, db=self.db
217 )
158 )
218
159
219 connection_pool = redis.ConnectionPool(**args)
160 connection_pool = redis.ConnectionPool(**args)
220
161 self.writer_client = redis.StrictRedis(
221 return redis.StrictRedis(connection_pool=connection_pool)
162 connection_pool=connection_pool
163 )
164 self.reader_client = self.writer_client
222
165
223 def list_keys(self, prefix=''):
166 def list_keys(self, prefix=''):
224 prefix = '{}:{}*'.format(self.key_prefix, prefix)
167 prefix = '{}:{}*'.format(self.key_prefix, prefix)
225 return self.client.keys(prefix)
168 return self.reader_client.keys(prefix)
226
169
227 def get_store(self):
170 def get_store(self):
228 return self.client.connection_pool
171 return self.reader_client.connection_pool
229
230 def get(self, key):
231 value = self.client.get(key)
232 if value is None:
233 return NO_VALUE
234 return self._loads(value)
235
236 def get_multi(self, keys):
237 if not keys:
238 return []
239 values = self.client.mget(keys)
240 loads = self._loads
241 return [
242 loads(v) if v is not None else NO_VALUE
243 for v in values]
244
245 def set(self, key, value):
246 if self.redis_expiration_time:
247 self.client.setex(key, self.redis_expiration_time,
248 self._dumps(value))
249 else:
250 self.client.set(key, self._dumps(value))
251
252 def set_multi(self, mapping):
253 dumps = self._dumps
254 mapping = dict(
255 (k, dumps(v))
256 for k, v in mapping.items()
257 )
258
259 if not self.redis_expiration_time:
260 self.client.mset(mapping)
261 else:
262 pipe = self.client.pipeline()
263 for key, value in mapping.items():
264 pipe.setex(key, self.redis_expiration_time, value)
265 pipe.execute()
266
172
267 def get_mutex(self, key):
173 def get_mutex(self, key):
268 if self.distributed_lock:
174 if self.distributed_lock:
269 lock_key = '_lock_{0}'.format(safe_str(key))
175 lock_key = '_lock_{0}'.format(safe_str(key))
270 return get_mutex_lock(self.client, lock_key, self._lock_timeout,
176 return get_mutex_lock(
271 auto_renewal=self._lock_auto_renewal)
177 self.writer_client, lock_key,
178 self._lock_timeout,
179 auto_renewal=self._lock_auto_renewal
180 )
272 else:
181 else:
273 return None
182 return None
274
183
275
184
276 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
185 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
277 key_prefix = 'redis_pickle_backend'
186 key_prefix = 'redis_pickle_backend'
278 pass
187 pass
279
188
280
189
281 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
190 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
282 key_prefix = 'redis_msgpack_backend'
191 key_prefix = 'redis_msgpack_backend'
283 pass
192 pass
284
193
285
194
286 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
195 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
287 import redis_lock
196 from vcsserver.lib._vendor import redis_lock
288
197
289 class _RedisLockWrapper(object):
198 class _RedisLockWrapper(object):
290 """LockWrapper for redis_lock"""
199 """LockWrapper for redis_lock"""
291
200
292 @classmethod
201 @classmethod
293 def get_lock(cls):
202 def get_lock(cls):
294 return redis_lock.Lock(
203 return redis_lock.Lock(
295 redis_client=client,
204 redis_client=client,
296 name=lock_key,
205 name=lock_key,
297 expire=lock_timeout,
206 expire=lock_timeout,
298 auto_renewal=auto_renewal,
207 auto_renewal=auto_renewal,
299 strict=True,
208 strict=True,
300 )
209 )
301
210
302 def __repr__(self):
211 def __repr__(self):
303 return "{}:{}".format(self.__class__.__name__, lock_key)
212 return "{}:{}".format(self.__class__.__name__, lock_key)
304
213
305 def __str__(self):
214 def __str__(self):
306 return "{}:{}".format(self.__class__.__name__, lock_key)
215 return "{}:{}".format(self.__class__.__name__, lock_key)
307
216
308 def __init__(self):
217 def __init__(self):
309 self.lock = self.get_lock()
218 self.lock = self.get_lock()
310 self.lock_key = lock_key
219 self.lock_key = lock_key
311
220
312 def acquire(self, wait=True):
221 def acquire(self, wait=True):
313 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
222 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
314 try:
223 try:
315 acquired = self.lock.acquire(wait)
224 acquired = self.lock.acquire(wait)
316 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
225 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
317 return acquired
226 return acquired
318 except redis_lock.AlreadyAcquired:
227 except redis_lock.AlreadyAcquired:
319 return False
228 return False
320 except redis_lock.AlreadyStarted:
229 except redis_lock.AlreadyStarted:
321 # refresh thread exists, but it also means we acquired the lock
230 # refresh thread exists, but it also means we acquired the lock
322 return True
231 return True
323
232
324 def release(self):
233 def release(self):
325 try:
234 try:
326 self.lock.release()
235 self.lock.release()
327 except redis_lock.NotAcquired:
236 except redis_lock.NotAcquired:
328 pass
237 pass
329
238
330 return _RedisLockWrapper()
239 return _RedisLockWrapper()
General Comments 0
You need to be logged in to leave comments. Login now