##// END OF EJS Templates
caches: new updated rc_cache with archive cache module and python3 changes
super-admin -
r5067:bf4fcdb7 default
parent child Browse files
Show More
@@ -0,0 +1,88 b''
1 # Copyright (C) 2015-2020 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import logging
20 import os
21 import diskcache
22 from diskcache import RLock
23
24 log = logging.getLogger(__name__)
25
26 cache_meta = None
27
28
29 class ReentrantLock(RLock):
30 def __enter__(self):
31 reentrant_lock_key = self._key
32
33 log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key)
34 #self.acquire()
35 log.debug('Lock for key=%s acquired', reentrant_lock_key)
36
37 def __exit__(self, *exc_info):
38 #self.release()
39 pass
40
41
42 def get_archival_config(config):
43
44 final_config = {
45 'archive_cache.eviction_policy': 'least-frequently-used'
46 }
47
48 for k, v in config.items():
49 if k.startswith('archive_cache'):
50 final_config[k] = v
51
52 return final_config
53
54
55 def get_archival_cache_store(config):
56
57 global cache_meta
58 if cache_meta is not None:
59 return cache_meta
60
61 config = get_archival_config(config)
62
63 archive_cache_dir = config['archive_cache.store_dir']
64 archive_cache_size_gb = config['archive_cache.cache_size_gb']
65 archive_cache_shards = config['archive_cache.cache_shards']
66 archive_cache_eviction_policy = config['archive_cache.eviction_policy']
67
68 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
69
70 # check if it's ok to write, and re-create the archive cache
71 if not os.path.isdir(archive_cache_dir):
72 os.makedirs(archive_cache_dir, exist_ok=True)
73
74 d_cache = diskcache.FanoutCache(
75 archive_cache_dir, shards=archive_cache_shards,
76 cull_limit=0, # manual eviction required
77 size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
78 eviction_policy=archive_cache_eviction_policy,
79 timeout=30
80 )
81 cache_meta = d_cache
82 return cache_meta
83
84
85 def includeme(config):
86 # init our cache at start
87 settings = config.get_settings()
88 get_archival_cache_store(settings)
@@ -1,5 +1,3 b''
1
2
3 1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 2 #
5 3 # This program is free software: you can redistribute it and/or modify
@@ -19,7 +17,23 b''
19 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 18
21 19 import logging
20 import threading
21
22 22 from dogpile.cache import register_backend
23
24 from . import region_meta
25 from .utils import (
26 ActiveRegionCache,
27 FreshRegionCache,
28 InvalidationContext,
29 backend_key_generator,
30 clear_cache_namespace,
31 get_default_cache_settings,
32 get_or_create_region,
33 make_region,
34 str2bool,
35 )
36
23 37 module_name = 'rhodecode'
24 38
25 39 register_backend(
@@ -41,18 +55,28 b' register_backend('
41 55
42 56 log = logging.getLogger(__name__)
43 57
44 from . import region_meta
45 from .utils import (
46 get_default_cache_settings, backend_key_generator, get_or_create_region,
47 clear_cache_namespace, make_region, InvalidationContext,
48 FreshRegionCache, ActiveRegionCache
49 )
50
51 58
52 59 FILE_TREE_CACHE_VER = 'v4'
53 60 LICENSE_CACHE_VER = 'v2'
54 61
55 62
63 CLEAR_DELETE = 'delete'
64 CLEAR_INVALIDATE = 'invalidate'
65
66
67 def async_creation_runner(cache, somekey, creator, mutex):
68
69 def runner():
70 try:
71 value = creator()
72 cache.set(somekey, value)
73 finally:
74 mutex.release()
75
76 thread = threading.Thread(target=runner)
77 thread.start()
78
79
56 80 def configure_dogpile_cache(settings):
57 81 cache_dir = settings.get('cache_dir')
58 82 if cache_dir:
@@ -72,13 +96,20 b' def configure_dogpile_cache(settings):'
72 96
73 97 new_region = make_region(
74 98 name=namespace_name,
75 function_key_generator=None
99 function_key_generator=None,
100 async_creation_runner=None
76 101 )
77 102
78 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
103 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
79 104 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
105
106 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
107 if async_creator:
108 log.debug('configuring region %s with async creator', new_region)
109 new_region.async_creation_runner = async_creation_runner
110
80 111 if log.isEnabledFor(logging.DEBUG):
81 region_args = dict(backend=new_region.actual_backend.__class__,
112 region_args = dict(backend=new_region.actual_backend,
82 113 region_invalidator=new_region.region_invalidator.__class__)
83 114 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
84 115
@@ -1,4 +1,3 b''
1
2 1 # Copyright (C) 2015-2020 RhodeCode GmbH
3 2 #
4 3 # This program is free software: you can redistribute it and/or modify
@@ -17,31 +16,31 b''
17 16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 18
20 import time
21 19 import errno
20 import fcntl
21 import functools
22 22 import logging
23 import functools
23 import os
24 import pickle
25 import time
24 26
27 import gevent
25 28 import msgpack
26 29 import redis
27 import gevent
28 import pickle
29 import fcntl
30
30 31 flock_org = fcntl.flock
31 32 from typing import Union
32 33
34 from dogpile.cache.api import Deserializer, Serializer
35 from dogpile.cache.backends import file as file_backend
33 36 from dogpile.cache.backends import memory as memory_backend
34 from dogpile.cache.backends import file as file_backend
35 37 from dogpile.cache.backends import redis as redis_backend
36 38 from dogpile.cache.backends.file import FileLock
37 39 from dogpile.cache.util import memoized_property
38 from dogpile.cache.api import Serializer, Deserializer
39
40 from pyramid.settings import asbool
41 40
42 41 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
43 from rhodecode.lib.str_utils import safe_str, safe_bytes
44
42 from rhodecode.lib.str_utils import safe_bytes, safe_str
43 from rhodecode.lib.type_utils import str2bool
45 44
46 45 _default_max_size = 1024
47 46
@@ -53,14 +52,20 b' class LRUMemoryBackend(memory_backend.Me'
53 52 pickle_values = False
54 53
55 54 def __init__(self, arguments):
56 max_size = arguments.pop('max_size', _default_max_size)
55 self.max_size = arguments.pop('max_size', _default_max_size)
57 56
58 57 LRUDictClass = LRUDict
59 58 if arguments.pop('log_key_count', None):
60 59 LRUDictClass = LRUDictDebug
61 60
62 arguments['cache_dict'] = LRUDictClass(max_size)
63 super(LRUMemoryBackend, self).__init__(arguments)
61 arguments['cache_dict'] = LRUDictClass(self.max_size)
62 super().__init__(arguments)
63
64 def __repr__(self):
65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
66
67 def __str__(self):
68 return self.__repr__()
64 69
65 70 def delete(self, key):
66 71 try:
@@ -75,19 +80,19 b' class LRUMemoryBackend(memory_backend.Me'
75 80
76 81
77 82 class PickleSerializer:
78 serializer: Union[None, Serializer] = staticmethod( # type: ignore
83 serializer: None | Serializer = staticmethod( # type: ignore
79 84 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
80 85 )
81 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
86 deserializer: None | Deserializer = staticmethod( # type: ignore
82 87 functools.partial(pickle.loads)
83 88 )
84 89
85 90
86 91 class MsgPackSerializer(object):
87 serializer: Union[None, Serializer] = staticmethod( # type: ignore
92 serializer: None | Serializer = staticmethod( # type: ignore
88 93 msgpack.packb
89 94 )
90 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
95 deserializer: None | Deserializer = staticmethod( # type: ignore
91 96 functools.partial(msgpack.unpackb, use_list=False)
92 97 )
93 98
@@ -135,18 +140,28 b' class FileNamespaceBackend(PickleSeriali'
135 140 arguments['lock_factory'] = CustomLockFactory
136 141 db_file = arguments.get('filename')
137 142
138 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
143 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
144 db_file_dir = os.path.dirname(db_file)
145 if not os.path.isdir(db_file_dir):
146 os.makedirs(db_file_dir)
147
139 148 try:
140 super(FileNamespaceBackend, self).__init__(arguments)
149 super().__init__(arguments)
141 150 except Exception:
142 151 log.exception('Failed to initialize db at: %s', db_file)
143 152 raise
144 153
145 154 def __repr__(self):
146 return '{} `{}`'.format(self.__class__, self.filename)
155 return f'{self.__class__}(file=`{self.filename}`)'
156
157 def __str__(self):
158 return self.__repr__()
159
160 def _get_keys_pattern(self, prefix: bytes = b''):
161 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
147 162
148 163 def list_keys(self, prefix: bytes = b''):
149 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
164 prefix = self._get_keys_pattern(prefix)
150 165
151 166 def cond(dbm_key: bytes):
152 167 if not prefix:
@@ -171,14 +186,22 b' class BaseRedisBackend(redis_backend.Red'
171 186 key_prefix = ''
172 187
173 188 def __init__(self, arguments):
174 super(BaseRedisBackend, self).__init__(arguments)
189 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
190 super().__init__(arguments)
191
175 192 self._lock_timeout = self.lock_timeout
176 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
193 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
177 194
178 195 if self._lock_auto_renewal and not self._lock_timeout:
179 196 # set default timeout for auto_renewal
180 197 self._lock_timeout = 30
181 198
199 def __repr__(self):
200 return f'{self.__class__}(conn=`{self.db_conn}`)'
201
202 def __str__(self):
203 return self.__repr__()
204
182 205 def _create_client(self):
183 206 args = {}
184 207
@@ -197,8 +220,11 b' class BaseRedisBackend(redis_backend.Red'
197 220 )
198 221 self.reader_client = self.writer_client
199 222
200 def list_keys(self, prefix=''):
201 prefix = '{}:{}*'.format(self.key_prefix, prefix)
223 def _get_keys_pattern(self, prefix: bytes = b''):
224 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
225
226 def list_keys(self, prefix: bytes = b''):
227 prefix = self._get_keys_pattern(prefix)
202 228 return self.reader_client.keys(prefix)
203 229
204 230 def get_store(self):
@@ -206,7 +232,7 b' class BaseRedisBackend(redis_backend.Red'
206 232
207 233 def get_mutex(self, key):
208 234 if self.distributed_lock:
209 lock_key = '_lock_{0}'.format(safe_str(key))
235 lock_key = f'_lock_{safe_str(key)}'
210 236 return get_mutex_lock(
211 237 self.writer_client, lock_key,
212 238 self._lock_timeout,
@@ -243,10 +269,10 b' def get_mutex_lock(client, lock_key, loc'
243 269 )
244 270
245 271 def __repr__(self):
246 return "{}:{}".format(self.__class__.__name__, lock_key)
272 return f"{self.__class__.__name__}:{lock_key}"
247 273
248 274 def __str__(self):
249 return "{}:{}".format(self.__class__.__name__, lock_key)
275 return f"{self.__class__.__name__}:{lock_key}"
250 276
251 277 def __init__(self):
252 278 self.lock = self.get_lock()
@@ -18,11 +18,12 b''
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
22 import sys
23 21 import atexit
24 22 import logging
23 import os
25 24 import signal
25 import sys
26
26 27 import rhodecode
27 28
28 29 log = logging.getLogger(__name__)
@@ -41,7 +42,7 b' def sigHandler(signo, frame):'
41 42
42 43
43 44 def free_cache_keys(*args):
44 from rhodecode.model.db import Session, CacheKey
45 from rhodecode.model.db import CacheKey, Session
45 46
46 47 if rhodecode.is_test:
47 48 return
@@ -1,5 +1,3 b''
1
2
3 1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 2 #
5 3 # This program is free software: you can redistribute it and/or modify
@@ -17,6 +15,7 b''
17 15 # This program is dual-licensed. If you wish to learn more about the
18 16 # RhodeCode Enterprise Edition, including its added features, Support services,
19 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
20 19 import os
21 20 import tempfile
22 21
@@ -15,22 +15,22 b''
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 import os
19 import time
20 import logging
18
21 19 import functools
22 import decorator
20 import logging
21 import os
23 22 import threading
23 import time
24 24
25 import decorator
25 26 from dogpile.cache import CacheRegion
26 27
27 28 import rhodecode
28 29 from rhodecode.lib.hash_utils import sha1
30 from rhodecode.lib.str_utils import safe_bytes
29 31 from rhodecode.lib.type_utils import str2bool
30 from rhodecode.lib.str_utils import safe_bytes
31 32
32 from rhodecode.lib.rc_cache import cache_key_meta
33 from rhodecode.lib.rc_cache import region_meta
33 from . import region_meta, cache_key_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
@@ -44,6 +44,9 b' def isCython(func):'
44 44
45 45 class RhodeCodeCacheRegion(CacheRegion):
46 46
47 def __repr__(self):
48 return f'{self.__class__}(name={self.name})'
49
47 50 def conditional_cache_on_arguments(
48 51 self, namespace=None,
49 52 expiration_time=None,
@@ -53,15 +56,17 b' class RhodeCodeCacheRegion(CacheRegion):'
53 56 condition=True):
54 57 """
55 58 Custom conditional decorator, that will not touch any dogpile internals if
56 condition isn't meet. This works a bit different than should_cache_fn
59 condition isn't meet. This works a bit different from should_cache_fn
57 60 And it's faster in cases we don't ever want to compute cached values
58 61 """
59 62 expiration_time_is_callable = callable(expiration_time)
63 if not namespace:
64 namespace = getattr(self, '_default_namespace', None)
60 65
61 66 if function_key_generator is None:
62 67 function_key_generator = self.function_key_generator
63 68
64 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
65 70
66 71 if not condition:
67 72 log.debug('Calling un-cached method:%s', user_func.__name__)
@@ -71,7 +76,7 b' class RhodeCodeCacheRegion(CacheRegion):'
71 76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
72 77 return result
73 78
74 key = key_generator(*arg, **kw)
79 key = func_key_generator(*arg, **kw)
75 80
76 81 timeout = expiration_time() if expiration_time_is_callable \
77 82 else expiration_time
@@ -146,54 +151,69 b' def compute_key_from_params(*args):'
146 151 return sha1(safe_bytes("_".join(map(str, args))))
147 152
148 153
149 def backend_key_generator(backend):
150 """
151 Special wrapper that also sends over the backend to the key generator
152 """
153 def wrapper(namespace, fn):
154 return key_generator(backend, namespace, fn)
155 return wrapper
156
157
158 def key_generator(backend, namespace, fn):
159 fname = fn.__name__
154 def custom_key_generator(backend, namespace, fn):
155 func_name = fn.__name__
160 156
161 157 def generate_key(*args):
162 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
163 159 namespace_pref = namespace or 'default_namespace'
164 160 arg_key = compute_key_from_params(*args)
165 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
166 162
167 163 return final_key
168 164
169 165 return generate_key
170 166
171 167
172 def get_or_create_region(region_name, region_namespace=None):
173 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
168 def backend_key_generator(backend):
169 """
170 Special wrapper that also sends over the backend to the key generator
171 """
172 def wrapper(namespace, fn):
173 return custom_key_generator(backend, namespace, fn)
174 return wrapper
175
176
177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 from .backends import FileNamespaceBackend
179 from . import async_creation_runner
180
174 181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
175 182 if not region_obj:
176 raise EnvironmentError(
177 'Region `{}` not in configured: {}.'.format(
178 region_name, list(region_meta.dogpile_cache_regions.keys())))
183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.')
185
186 region_uid_name = f'{region_name}:{region_namespace}'
179 187
180 region_uid_name = '{}:{}'.format(region_name, region_namespace)
181 188 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
189 if not region_namespace:
190 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
191
182 192 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
183 193 if region_exist:
184 194 log.debug('Using already configured region: %s', region_namespace)
185 195 return region_exist
186 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
196
187 197 expiration_time = region_obj.expiration_time
188 198
189 if not os.path.isdir(cache_dir):
190 os.makedirs(cache_dir)
199 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
200 namespace_cache_dir = cache_dir
201
202 # we default the namespace_cache_dir to our default cache dir.
203 # however if this backend is configured with filename= param, we prioritize that
204 # so all caches within that particular region, even those namespaced end up in the same path
205 if region_obj.actual_backend.filename:
206 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
207
208 if not os.path.isdir(namespace_cache_dir):
209 os.makedirs(namespace_cache_dir)
191 210 new_region = make_region(
192 211 name=region_uid_name,
193 212 function_key_generator=backend_key_generator(region_obj.actual_backend)
194 213 )
214
195 215 namespace_filename = os.path.join(
196 cache_dir, "{}.cache.dbm".format(region_namespace))
216 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
197 217 # special type that allows 1db per namespace
198 218 new_region.configure(
199 219 backend='dogpile.cache.rc.file_namespace',
@@ -205,19 +225,34 b' def get_or_create_region(region_name, re'
205 225 log.debug('configuring new region: %s', region_uid_name)
206 226 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
207 227
228 region_obj._default_namespace = region_namespace
229 if use_async_runner:
230 region_obj.async_creation_runner = async_creation_runner
208 231 return region_obj
209 232
210 233
211 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
212 region = get_or_create_region(cache_region, cache_namespace_uid)
213 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
214 num_delete_keys = len(cache_keys)
215 if invalidate:
216 region.invalidate(hard=False)
217 else:
218 if num_delete_keys:
219 region.delete_multi(cache_keys)
220 return num_delete_keys
234 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
235 from . import CLEAR_DELETE, CLEAR_INVALIDATE
236
237 if not isinstance(cache_region, RhodeCodeCacheRegion):
238 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
239 log.debug('clearing cache region: %s with method=%s', cache_region, method)
240
241 num_affected_keys = None
242
243 if method == CLEAR_INVALIDATE:
244 # NOTE: The CacheRegion.invalidate() method’s default mode of
245 # operation is to set a timestamp local to this CacheRegion in this Python process only.
246 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
247 cache_region.invalidate(hard=True)
248
249 if method == CLEAR_DELETE:
250 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
251 num_affected_keys = len(cache_keys)
252 if num_affected_keys:
253 cache_region.delete_multi(cache_keys)
254
255 return num_affected_keys
221 256
222 257
223 258 class ActiveRegionCache(object):
@@ -286,7 +321,7 b' class InvalidationContext(object):'
286 321
287 322 if thread_scoped is None:
288 323 # if we set "default" we can override this via .ini settings
289 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
324 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
290 325
291 326 # Append the thread id to the cache key if this invalidation context
292 327 # should be scoped to the current thread.
@@ -346,7 +381,7 b' class InvalidationContext(object):'
346 381 return FreshRegionCache(context=self, cache_data=cache_data)
347 382
348 383 def __exit__(self, exc_type, exc_val, exc_tb):
349 from rhodecode.model.db import Session, IntegrityError
384 from rhodecode.model.db import IntegrityError, Session
350 385
351 386 # save compute time
352 387 self.compute_time = time.time() - self._start_time
General Comments 0
You need to be logged in to leave comments. Login now