##// END OF EJS Templates
caches: synced with CE code
super-admin -
r1113:dcc620cb python3
parent child Browse files
Show More
@@ -1,86 +1,110 b''
1 # -*- coding: utf-8 -*-
2
1 3 # RhodeCode VCSServer provides access to different vcs backends via network.
2 4 # Copyright (C) 2014-2020 RhodeCode GmbH
3 5 #
4 6 # This program is free software; you can redistribute it and/or modify
5 7 # it under the terms of the GNU General Public License as published by
6 8 # the Free Software Foundation; either version 3 of the License, or
7 9 # (at your option) any later version.
8 10 #
9 11 # This program is distributed in the hope that it will be useful,
10 12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 14 # GNU General Public License for more details.
13 15 #
14 16 # You should have received a copy of the GNU General Public License
15 17 # along with this program; if not, write to the Free Software Foundation,
16 18 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 19
18 20 import logging
21 import threading
19 22
20 23 from dogpile.cache import register_backend
21 24
25 from . import region_meta
26 from .utils import (
27 backend_key_generator,
28 clear_cache_namespace,
29 get_default_cache_settings,
30 get_or_create_region,
31 make_region,
32 str2bool,
33 )
34
22 35 module_name = 'vcsserver'
23 36
24 37 register_backend(
25 38 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
26 39 "LRUMemoryBackend")
27 40
28 41 register_backend(
29 42 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
30 43 "FileNamespaceBackend")
31 44
32 45 register_backend(
33 46 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
34 47 "RedisPickleBackend")
35 48
36 49 register_backend(
37 50 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
38 51 "RedisMsgPackBackend")
39 52
40 53
41 54 log = logging.getLogger(__name__)
42 55
43 from . import region_meta
44 from .utils import (
45 backend_key_generator,
46 clear_cache_namespace,
47 get_default_cache_settings,
48 get_or_create_region,
49 make_region,
50 )
56
57 def async_creation_runner(cache, somekey, creator, mutex):
58
59 def runner():
60 try:
61 value = creator()
62 cache.set(somekey, value)
63 finally:
64 mutex.release()
65
66 thread = threading.Thread(target=runner)
67 thread.start()
51 68
52 69
53 70 def configure_dogpile_cache(settings):
54 71 cache_dir = settings.get('cache_dir')
55 72 if cache_dir:
56 73 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
57 74
58 75 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
59 76
60 77 # inspect available namespaces
61 78 avail_regions = set()
62 79 for key in rc_cache_data.keys():
63 80 namespace_name = key.split('.', 1)[0]
64 81 if namespace_name in avail_regions:
65 82 continue
66 83
67 84 avail_regions.add(namespace_name)
68 85 log.debug('dogpile: found following cache regions: %s', namespace_name)
69 86
70 87 new_region = make_region(
71 88 name=namespace_name,
72 function_key_generator=None
89 function_key_generator=None,
90 async_creation_runner=None
73 91 )
74 92
75 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
93 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
76 94 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
95
96 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
97 if async_creator:
98 log.debug('configuring region %s with async creator', new_region)
99 new_region.async_creation_runner = async_creation_runner
100
77 101 if log.isEnabledFor(logging.DEBUG):
78 region_args = dict(backend=new_region.actual_backend.__class__,
102 region_args = dict(backend=new_region.actual_backend,
79 103 region_invalidator=new_region.region_invalidator.__class__)
80 104 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
81 105
82 106 region_meta.dogpile_cache_regions[namespace_name] = new_region
83 107
84 108
85 109 def includeme(config):
86 110 configure_dogpile_cache(config.registry.settings)
@@ -1,238 +1,261 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import errno
19 19 import fcntl
20 20 import functools
21 21 import logging
22 import os
22 23 import pickle
23 import time
24 #import time
24 25
26 #import gevent
25 27 import msgpack
26 28 import redis
27 29
28 30 flock_org = fcntl.flock
29 31 from typing import Union
30 32
31 33 from dogpile.cache.api import Deserializer, Serializer
32 34 from dogpile.cache.backends import file as file_backend
33 35 from dogpile.cache.backends import memory as memory_backend
34 36 from dogpile.cache.backends import redis as redis_backend
35 37 from dogpile.cache.backends.file import FileLock
36 38 from dogpile.cache.util import memoized_property
37 from pyramid.settings import asbool
38 39
39 40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
40 41 from vcsserver.str_utils import safe_bytes, safe_str
42 from vcsserver.type_utils import str2bool
41 43
42 44 _default_max_size = 1024
43 45
44 46 log = logging.getLogger(__name__)
45 47
46 48
47 49 class LRUMemoryBackend(memory_backend.MemoryBackend):
48 50 key_prefix = 'lru_mem_backend'
49 51 pickle_values = False
50 52
51 53 def __init__(self, arguments):
52 max_size = arguments.pop('max_size', _default_max_size)
54 self.max_size = arguments.pop('max_size', _default_max_size)
53 55
54 56 LRUDictClass = LRUDict
55 57 if arguments.pop('log_key_count', None):
56 58 LRUDictClass = LRUDictDebug
57 59
58 arguments['cache_dict'] = LRUDictClass(max_size)
60 arguments['cache_dict'] = LRUDictClass(self.max_size)
59 61 super(LRUMemoryBackend, self).__init__(arguments)
60 62
63 def __repr__(self):
64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65
66 def __str__(self):
67 return self.__repr__()
68
61 69 def delete(self, key):
62 70 try:
63 71 del self._cache[key]
64 72 except KeyError:
65 73 # we don't care if key isn't there at deletion
66 74 pass
67 75
68 76 def delete_multi(self, keys):
69 77 for key in keys:
70 78 self.delete(key)
71 79
72 80
73 81 class PickleSerializer:
74 82 serializer: Union[None, Serializer] = staticmethod( # type: ignore
75 83 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
76 84 )
77 85 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
78 86 functools.partial(pickle.loads)
79 87 )
80 88
81 89
82 90 class MsgPackSerializer(object):
83 91 serializer: Union[None, Serializer] = staticmethod( # type: ignore
84 92 msgpack.packb
85 93 )
86 94 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
87 95 functools.partial(msgpack.unpackb, use_list=False)
88 96 )
89 97
90 98
91 99 class CustomLockFactory(FileLock):
92 100
93 101 pass
94 102
95 103
96 104 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
97 105 key_prefix = 'file_backend'
98 106
99 107 def __init__(self, arguments):
100 108 arguments['lock_factory'] = CustomLockFactory
101 109 db_file = arguments.get('filename')
102 110
103 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
111 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
112 db_file_dir = os.path.dirname(db_file)
113 if not os.path.isdir(db_file_dir):
114 os.makedirs(db_file_dir)
115
104 116 try:
105 117 super(FileNamespaceBackend, self).__init__(arguments)
106 118 except Exception:
107 119 log.exception('Failed to initialize db at: %s', db_file)
108 120 raise
109 121
110 122 def __repr__(self):
111 return '{} `{}`'.format(self.__class__, self.filename)
123 return f'{self.__class__}(file=`{self.filename}`)'
124
125 def __str__(self):
126 return self.__repr__()
112 127
113 128 def list_keys(self, prefix: bytes = b''):
114 129 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
115 130
116 131 def cond(dbm_key: bytes):
117 132 if not prefix:
118 133 return True
119 134
120 135 if dbm_key.startswith(prefix):
121 136 return True
122 137 return False
123 138
124 139 with self._dbm_file(True) as dbm:
125 140 try:
126 141 return list(filter(cond, dbm.keys()))
127 142 except Exception:
128 143 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
129 144 raise
130 145
131 146 def get_store(self):
132 147 return self.filename
133 148
134 149
135 150 class BaseRedisBackend(redis_backend.RedisBackend):
136 151 key_prefix = ''
137 152
138 153 def __init__(self, arguments):
154 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
139 155 super(BaseRedisBackend, self).__init__(arguments)
156
140 157 self._lock_timeout = self.lock_timeout
141 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
158 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
142 159
143 160 if self._lock_auto_renewal and not self._lock_timeout:
144 161 # set default timeout for auto_renewal
145 162 self._lock_timeout = 30
146 163
164 def __repr__(self):
165 return f'{self.__class__}(conn=`{self.db_conn}`)'
166
167 def __str__(self):
168 return self.__repr__()
169
147 170 def _create_client(self):
148 171 args = {}
149 172
150 173 if self.url is not None:
151 174 args.update(url=self.url)
152 175
153 176 else:
154 177 args.update(
155 178 host=self.host, password=self.password,
156 179 port=self.port, db=self.db
157 180 )
158 181
159 182 connection_pool = redis.ConnectionPool(**args)
160 183 self.writer_client = redis.StrictRedis(
161 184 connection_pool=connection_pool
162 185 )
163 186 self.reader_client = self.writer_client
164 187
165 188 def list_keys(self, prefix=''):
166 prefix = '{}:{}*'.format(self.key_prefix, prefix)
189 prefix = f'{self.key_prefix}:{prefix}*'
167 190 return self.reader_client.keys(prefix)
168 191
169 192 def get_store(self):
170 193 return self.reader_client.connection_pool
171 194
172 195 def get_mutex(self, key):
173 196 if self.distributed_lock:
174 lock_key = '_lock_{0}'.format(safe_str(key))
197 lock_key = f'_lock_{safe_str(key)}'
175 198 return get_mutex_lock(
176 199 self.writer_client, lock_key,
177 200 self._lock_timeout,
178 201 auto_renewal=self._lock_auto_renewal
179 202 )
180 203 else:
181 204 return None
182 205
183 206
184 207 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
185 208 key_prefix = 'redis_pickle_backend'
186 209 pass
187 210
188 211
189 212 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
190 213 key_prefix = 'redis_msgpack_backend'
191 214 pass
192 215
193 216
194 217 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
195 218 from vcsserver.lib._vendor import redis_lock
196 219
197 220 class _RedisLockWrapper(object):
198 221 """LockWrapper for redis_lock"""
199 222
200 223 @classmethod
201 224 def get_lock(cls):
202 225 return redis_lock.Lock(
203 226 redis_client=client,
204 227 name=lock_key,
205 228 expire=lock_timeout,
206 229 auto_renewal=auto_renewal,
207 230 strict=True,
208 231 )
209 232
210 233 def __repr__(self):
211 return "{}:{}".format(self.__class__.__name__, lock_key)
234 return f"{self.__class__.__name__}:{lock_key}"
212 235
213 236 def __str__(self):
214 return "{}:{}".format(self.__class__.__name__, lock_key)
237 return f"{self.__class__.__name__}:{lock_key}"
215 238
216 239 def __init__(self):
217 240 self.lock = self.get_lock()
218 241 self.lock_key = lock_key
219 242
220 243 def acquire(self, wait=True):
221 244 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
222 245 try:
223 246 acquired = self.lock.acquire(wait)
224 247 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
225 248 return acquired
226 249 except redis_lock.AlreadyAcquired:
227 250 return False
228 251 except redis_lock.AlreadyStarted:
229 252 # refresh thread exists, but it also means we acquired the lock
230 253 return True
231 254
232 255 def release(self):
233 256 try:
234 257 self.lock.release()
235 258 except redis_lock.NotAcquired:
236 259 pass
237 260
238 261 return _RedisLockWrapper()
@@ -1,207 +1,232 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2020 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import functools
19 19 import logging
20 20 import os
21 import threading
21 22 import time
22 23
23 24 import decorator
24 25 from dogpile.cache import CacheRegion
25 26
26 27 from vcsserver.lib.rc_cache import region_meta
27 28 from vcsserver.str_utils import safe_bytes
29 from vcsserver.type_utils import str2bool
28 30 from vcsserver.utils import sha1
29 31
30 32 log = logging.getLogger(__name__)
31 33
32 34
33 35 class RhodeCodeCacheRegion(CacheRegion):
34 36
37 def __repr__(self):
38 return f'{self.__class__}(name={self.name})'
39
35 40 def conditional_cache_on_arguments(
36 41 self, namespace=None,
37 42 expiration_time=None,
38 43 should_cache_fn=None,
39 44 to_str=str,
40 45 function_key_generator=None,
41 46 condition=True):
42 47 """
43 48 Custom conditional decorator, that will not touch any dogpile internals if
44 49 condition isn't meet. This works a bit different from should_cache_fn
45 50 And it's faster in cases we don't ever want to compute cached values
46 51 """
47 52 expiration_time_is_callable = callable(expiration_time)
48 53
49 54 if function_key_generator is None:
50 55 function_key_generator = self.function_key_generator
51 56
52 57 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
53 58
54 59 if not condition:
55 60 log.debug('Calling un-cached method:%s', user_func.__name__)
56 61 start = time.time()
57 62 result = user_func(*arg, **kw)
58 63 total = time.time() - start
59 64 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
60 65 return result
61 66
62 67 key = key_generator(*arg, **kw)
63 68
64 69 timeout = expiration_time() if expiration_time_is_callable \
65 70 else expiration_time
66 71
67 72 log.debug('Calling cached method:`%s`', user_func.__name__)
68 73 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
69 74
70 75 def cache_decorator(user_func):
71 76 if to_str is str:
72 77 # backwards compatible
73 78 key_generator = function_key_generator(namespace, user_func)
74 79 else:
75 80 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
76 81
77 82 def refresh(*arg, **kw):
78 83 """
79 84 Like invalidate, but regenerates the value instead
80 85 """
81 86 key = key_generator(*arg, **kw)
82 87 value = user_func(*arg, **kw)
83 88 self.set(key, value)
84 89 return value
85 90
86 91 def invalidate(*arg, **kw):
87 92 key = key_generator(*arg, **kw)
88 93 self.delete(key)
89 94
90 95 def set_(value, *arg, **kw):
91 96 key = key_generator(*arg, **kw)
92 97 self.set(key, value)
93 98
94 99 def get(*arg, **kw):
95 100 key = key_generator(*arg, **kw)
96 101 return self.get(key)
97 102
98 103 user_func.set = set_
99 104 user_func.invalidate = invalidate
100 105 user_func.get = get
101 106 user_func.refresh = refresh
102 107 user_func.key_generator = key_generator
103 108 user_func.original = user_func
104 109
105 110 # Use `decorate` to preserve the signature of :param:`user_func`.
106 111 return decorator.decorate(user_func, functools.partial(
107 112 get_or_create_for_user_func, key_generator))
108 113
109 114 return cache_decorator
110 115
111 116
112 117 def make_region(*arg, **kw):
113 118 return RhodeCodeCacheRegion(*arg, **kw)
114 119
115 120
116 121 def get_default_cache_settings(settings, prefixes=None):
117 122 prefixes = prefixes or []
118 123 cache_settings = {}
119 124 for key in settings.keys():
120 125 for prefix in prefixes:
121 126 if key.startswith(prefix):
122 127 name = key.split(prefix)[1].strip()
123 128 val = settings[key]
124 129 if isinstance(val, str):
125 130 val = val.strip()
126 131 cache_settings[name] = val
127 132 return cache_settings
128 133
129 134
130 135 def compute_key_from_params(*args):
131 136 """
132 137 Helper to compute key from given params to be used in cache manager
133 138 """
134 139 return sha1(safe_bytes("_".join(map(str, args))))
135 140
136 141
137 142 def backend_key_generator(backend):
138 143 """
139 144 Special wrapper that also sends over the backend to the key generator
140 145 """
141 146 def wrapper(namespace, fn):
142 147 return key_generator(backend, namespace, fn)
143 148 return wrapper
144 149
145 150
146 151 def key_generator(backend, namespace, fn):
147 fname = fn.__name__
152 func_name = fn.__name__
148 153
149 154 def generate_key(*args):
150 155 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 156 namespace_pref = namespace or 'default_namespace'
152 157 arg_key = compute_key_from_params(*args)
153 final_key = f"{backend_prefix}:{namespace_pref}:{fname}_{arg_key}"
158 final_key = f"{backend_prefix}:{namespace_pref}:{func_name}_{arg_key}"
154 159
155 160 return final_key
156 161
157 162 return generate_key
158 163
159 164
160 def get_or_create_region(region_name, region_namespace=None):
165 def get_or_create_region(region_name, region_namespace: str = None):
161 166 from vcsserver.lib.rc_cache.backends import FileNamespaceBackend
167
162 168 region_obj = region_meta.dogpile_cache_regions.get(region_name)
163 169 if not region_obj:
164 170 reg_keys = list(region_meta.dogpile_cache_regions.keys())
165 171 raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.')
166 172
167 173 region_uid_name = f'{region_name}:{region_namespace}'
174
168 175 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
176 if not region_namespace:
177 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
178
169 179 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
170 180 if region_exist:
171 181 log.debug('Using already configured region: %s', region_namespace)
172 182 return region_exist
173 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
183
174 184 expiration_time = region_obj.expiration_time
175 185
176 if not os.path.isdir(cache_dir):
177 os.makedirs(cache_dir)
186 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
187 namespace_cache_dir = cache_dir
188
189 # we default the namespace_cache_dir to our default cache dir.
190 # however if this backend is configured with filename= param, we prioritize that
191 # so all caches within that particular region, even those namespaced end up in the same path
192 if region_obj.actual_backend.filename:
193 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
194
195 if not os.path.isdir(namespace_cache_dir):
196 os.makedirs(namespace_cache_dir)
178 197 new_region = make_region(
179 198 name=region_uid_name,
180 199 function_key_generator=backend_key_generator(region_obj.actual_backend)
181 200 )
201
182 202 namespace_filename = os.path.join(
183 cache_dir, f"{region_namespace}.cache.dbm")
203 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
184 204 # special type that allows 1db per namespace
185 205 new_region.configure(
186 206 backend='dogpile.cache.rc.file_namespace',
187 207 expiration_time=expiration_time,
188 208 arguments={"filename": namespace_filename}
189 209 )
190 210
191 211 # create and save in region caches
192 212 log.debug('configuring new region: %s', region_uid_name)
193 213 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
194 214
195 215 return region_obj
196 216
197 217
198 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
199 region = get_or_create_region(cache_region, cache_namespace_uid)
200 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
218 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, invalidate: bool = False, hard: bool = False):
219 if not isinstance(cache_region, RhodeCodeCacheRegion):
220 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
221
222 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
201 223 num_delete_keys = len(cache_keys)
202 224 if invalidate:
203 region.invalidate(hard=False)
225 # NOTE: The CacheRegion.invalidate() method’s default mode of
226 # operation is to set a timestamp local to this CacheRegion in this Python process only.
227 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
228 cache_region.invalidate(hard=hard)
204 229 else:
205 230 if num_delete_keys:
206 region.delete_multi(cache_keys)
231 cache_region.delete_multi(cache_keys)
207 232 return num_delete_keys
General Comments 0
You need to be logged in to leave comments. Login now