##// END OF EJS Templates
caches: new updated rc_cache with archive cache module and python3 changes
super-admin -
r5067:bf4fcdb7 default
parent child Browse files
Show More
@@ -0,0 +1,88 b''
1 # Copyright (C) 2015-2020 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import logging
20 import os
21 import diskcache
22 from diskcache import RLock
23
24 log = logging.getLogger(__name__)
25
26 cache_meta = None
27
28
29 class ReentrantLock(RLock):
30 def __enter__(self):
31 reentrant_lock_key = self._key
32
33 log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key)
34 #self.acquire()
35 log.debug('Lock for key=%s acquired', reentrant_lock_key)
36
37 def __exit__(self, *exc_info):
38 #self.release()
39 pass
40
41
42 def get_archival_config(config):
43
44 final_config = {
45 'archive_cache.eviction_policy': 'least-frequently-used'
46 }
47
48 for k, v in config.items():
49 if k.startswith('archive_cache'):
50 final_config[k] = v
51
52 return final_config
53
54
55 def get_archival_cache_store(config):
56
57 global cache_meta
58 if cache_meta is not None:
59 return cache_meta
60
61 config = get_archival_config(config)
62
63 archive_cache_dir = config['archive_cache.store_dir']
64 archive_cache_size_gb = config['archive_cache.cache_size_gb']
65 archive_cache_shards = config['archive_cache.cache_shards']
66 archive_cache_eviction_policy = config['archive_cache.eviction_policy']
67
68 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
69
70 # check if it's ok to write, and re-create the archive cache
71 if not os.path.isdir(archive_cache_dir):
72 os.makedirs(archive_cache_dir, exist_ok=True)
73
74 d_cache = diskcache.FanoutCache(
75 archive_cache_dir, shards=archive_cache_shards,
76 cull_limit=0, # manual eviction required
77 size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
78 eviction_policy=archive_cache_eviction_policy,
79 timeout=30
80 )
81 cache_meta = d_cache
82 return cache_meta
83
84
85 def includeme(config):
86 # init our cache at start
87 settings = config.get_settings()
88 get_archival_cache_store(settings)
@@ -1,89 +1,120 b''
1
2
3 1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 2 #
5 3 # This program is free software: you can redistribute it and/or modify
6 4 # it under the terms of the GNU Affero General Public License, version 3
7 5 # (only), as published by the Free Software Foundation.
8 6 #
9 7 # This program is distributed in the hope that it will be useful,
10 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 10 # GNU General Public License for more details.
13 11 #
14 12 # You should have received a copy of the GNU Affero General Public License
15 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 14 #
17 15 # This program is dual-licensed. If you wish to learn more about the
18 16 # RhodeCode Enterprise Edition, including its added features, Support services,
19 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 18
21 19 import logging
20 import threading
21
22 22 from dogpile.cache import register_backend
23
24 from . import region_meta
25 from .utils import (
26 ActiveRegionCache,
27 FreshRegionCache,
28 InvalidationContext,
29 backend_key_generator,
30 clear_cache_namespace,
31 get_default_cache_settings,
32 get_or_create_region,
33 make_region,
34 str2bool,
35 )
36
23 37 module_name = 'rhodecode'
24 38
25 39 register_backend(
26 40 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
27 41 "LRUMemoryBackend")
28 42
29 43 register_backend(
30 44 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
31 45 "FileNamespaceBackend")
32 46
33 47 register_backend(
34 48 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
35 49 "RedisPickleBackend")
36 50
37 51 register_backend(
38 52 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
39 53 "RedisMsgPackBackend")
40 54
41 55
42 56 log = logging.getLogger(__name__)
43 57
44 from . import region_meta
45 from .utils import (
46 get_default_cache_settings, backend_key_generator, get_or_create_region,
47 clear_cache_namespace, make_region, InvalidationContext,
48 FreshRegionCache, ActiveRegionCache
49 )
50
51 58
52 59 FILE_TREE_CACHE_VER = 'v4'
53 60 LICENSE_CACHE_VER = 'v2'
54 61
55 62
63 CLEAR_DELETE = 'delete'
64 CLEAR_INVALIDATE = 'invalidate'
65
66
67 def async_creation_runner(cache, somekey, creator, mutex):
68
69 def runner():
70 try:
71 value = creator()
72 cache.set(somekey, value)
73 finally:
74 mutex.release()
75
76 thread = threading.Thread(target=runner)
77 thread.start()
78
79
56 80 def configure_dogpile_cache(settings):
57 81 cache_dir = settings.get('cache_dir')
58 82 if cache_dir:
59 83 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
60 84
61 85 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
62 86
63 87 # inspect available namespaces
64 88 avail_regions = set()
65 89 for key in rc_cache_data.keys():
66 90 namespace_name = key.split('.', 1)[0]
67 91 if namespace_name in avail_regions:
68 92 continue
69 93
70 94 avail_regions.add(namespace_name)
71 95 log.debug('dogpile: found following cache regions: %s', namespace_name)
72 96
73 97 new_region = make_region(
74 98 name=namespace_name,
75 function_key_generator=None
99 function_key_generator=None,
100 async_creation_runner=None
76 101 )
77 102
78 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
103 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
79 104 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
105
106 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
107 if async_creator:
108 log.debug('configuring region %s with async creator', new_region)
109 new_region.async_creation_runner = async_creation_runner
110
80 111 if log.isEnabledFor(logging.DEBUG):
81 region_args = dict(backend=new_region.actual_backend.__class__,
112 region_args = dict(backend=new_region.actual_backend,
82 113 region_invalidator=new_region.region_invalidator.__class__)
83 114 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
84 115
85 116 region_meta.dogpile_cache_regions[namespace_name] = new_region
86 117
87 118
88 119 def includeme(config):
89 120 configure_dogpile_cache(config.registry.settings)
@@ -1,273 +1,299 b''
1
2 1 # Copyright (C) 2015-2020 RhodeCode GmbH
3 2 #
4 3 # This program is free software: you can redistribute it and/or modify
5 4 # it under the terms of the GNU Affero General Public License, version 3
6 5 # (only), as published by the Free Software Foundation.
7 6 #
8 7 # This program is distributed in the hope that it will be useful,
9 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 10 # GNU General Public License for more details.
12 11 #
13 12 # You should have received a copy of the GNU Affero General Public License
14 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 14 #
16 15 # This program is dual-licensed. If you wish to learn more about the
17 16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 18
20 import time
21 19 import errno
20 import fcntl
21 import functools
22 22 import logging
23 import functools
23 import os
24 import pickle
25 import time
24 26
27 import gevent
25 28 import msgpack
26 29 import redis
27 import gevent
28 import pickle
29 import fcntl
30
30 31 flock_org = fcntl.flock
31 32 from typing import Union
32 33
34 from dogpile.cache.api import Deserializer, Serializer
35 from dogpile.cache.backends import file as file_backend
33 36 from dogpile.cache.backends import memory as memory_backend
34 from dogpile.cache.backends import file as file_backend
35 37 from dogpile.cache.backends import redis as redis_backend
36 38 from dogpile.cache.backends.file import FileLock
37 39 from dogpile.cache.util import memoized_property
38 from dogpile.cache.api import Serializer, Deserializer
39
40 from pyramid.settings import asbool
41 40
42 41 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
43 from rhodecode.lib.str_utils import safe_str, safe_bytes
44
42 from rhodecode.lib.str_utils import safe_bytes, safe_str
43 from rhodecode.lib.type_utils import str2bool
45 44
46 45 _default_max_size = 1024
47 46
48 47 log = logging.getLogger(__name__)
49 48
50 49
51 50 class LRUMemoryBackend(memory_backend.MemoryBackend):
52 51 key_prefix = 'lru_mem_backend'
53 52 pickle_values = False
54 53
55 54 def __init__(self, arguments):
56 max_size = arguments.pop('max_size', _default_max_size)
55 self.max_size = arguments.pop('max_size', _default_max_size)
57 56
58 57 LRUDictClass = LRUDict
59 58 if arguments.pop('log_key_count', None):
60 59 LRUDictClass = LRUDictDebug
61 60
62 arguments['cache_dict'] = LRUDictClass(max_size)
63 super(LRUMemoryBackend, self).__init__(arguments)
61 arguments['cache_dict'] = LRUDictClass(self.max_size)
62 super().__init__(arguments)
63
64 def __repr__(self):
65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
66
67 def __str__(self):
68 return self.__repr__()
64 69
65 70 def delete(self, key):
66 71 try:
67 72 del self._cache[key]
68 73 except KeyError:
69 74 # we don't care if key isn't there at deletion
70 75 pass
71 76
72 77 def delete_multi(self, keys):
73 78 for key in keys:
74 79 self.delete(key)
75 80
76 81
77 82 class PickleSerializer:
78 serializer: Union[None, Serializer] = staticmethod( # type: ignore
83 serializer: None | Serializer = staticmethod( # type: ignore
79 84 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
80 85 )
81 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
86 deserializer: None | Deserializer = staticmethod( # type: ignore
82 87 functools.partial(pickle.loads)
83 88 )
84 89
85 90
86 91 class MsgPackSerializer(object):
87 serializer: Union[None, Serializer] = staticmethod( # type: ignore
92 serializer: None | Serializer = staticmethod( # type: ignore
88 93 msgpack.packb
89 94 )
90 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
95 deserializer: None | Deserializer = staticmethod( # type: ignore
91 96 functools.partial(msgpack.unpackb, use_list=False)
92 97 )
93 98
94 99
95 100 class CustomLockFactory(FileLock):
96 101
97 102 @memoized_property
98 103 def _module(self):
99 104
100 105 def gevent_flock(fd, operation):
101 106 """
102 107 Gevent compatible flock
103 108 """
104 109 # set non-blocking, this will cause an exception if we cannot acquire a lock
105 110 operation |= fcntl.LOCK_NB
106 111 start_lock_time = time.time()
107 112 timeout = 60 * 15 # 15min
108 113 while True:
109 114 try:
110 115 flock_org(fd, operation)
111 116 # lock has been acquired
112 117 break
113 118 except (OSError, IOError) as e:
114 119 # raise on other errors than Resource temporarily unavailable
115 120 if e.errno != errno.EAGAIN:
116 121 raise
117 122 elif (time.time() - start_lock_time) > timeout:
118 123 # waited to much time on a lock, better fail than loop for ever
119 124 log.error('Failed to acquire lock on `%s` after waiting %ss',
120 125 self.filename, timeout)
121 126 raise
122 127 wait_timeout = 0.03
123 128 log.debug('Failed to acquire lock on `%s`, retry in %ss',
124 129 self.filename, wait_timeout)
125 130 gevent.sleep(wait_timeout)
126 131
127 132 fcntl.flock = gevent_flock
128 133 return fcntl
129 134
130 135
131 136 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
132 137 key_prefix = 'file_backend'
133 138
134 139 def __init__(self, arguments):
135 140 arguments['lock_factory'] = CustomLockFactory
136 141 db_file = arguments.get('filename')
137 142
138 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
143 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
144 db_file_dir = os.path.dirname(db_file)
145 if not os.path.isdir(db_file_dir):
146 os.makedirs(db_file_dir)
147
139 148 try:
140 super(FileNamespaceBackend, self).__init__(arguments)
149 super().__init__(arguments)
141 150 except Exception:
142 151 log.exception('Failed to initialize db at: %s', db_file)
143 152 raise
144 153
145 154 def __repr__(self):
146 return '{} `{}`'.format(self.__class__, self.filename)
155 return f'{self.__class__}(file=`{self.filename}`)'
156
157 def __str__(self):
158 return self.__repr__()
159
160 def _get_keys_pattern(self, prefix: bytes = b''):
161 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
147 162
148 163 def list_keys(self, prefix: bytes = b''):
149 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
164 prefix = self._get_keys_pattern(prefix)
150 165
151 166 def cond(dbm_key: bytes):
152 167 if not prefix:
153 168 return True
154 169
155 170 if dbm_key.startswith(prefix):
156 171 return True
157 172 return False
158 173
159 174 with self._dbm_file(True) as dbm:
160 175 try:
161 176 return list(filter(cond, dbm.keys()))
162 177 except Exception:
163 178 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
164 179 raise
165 180
166 181 def get_store(self):
167 182 return self.filename
168 183
169 184
170 185 class BaseRedisBackend(redis_backend.RedisBackend):
171 186 key_prefix = ''
172 187
173 188 def __init__(self, arguments):
174 super(BaseRedisBackend, self).__init__(arguments)
189 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
190 super().__init__(arguments)
191
175 192 self._lock_timeout = self.lock_timeout
176 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
193 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
177 194
178 195 if self._lock_auto_renewal and not self._lock_timeout:
179 196 # set default timeout for auto_renewal
180 197 self._lock_timeout = 30
181 198
199 def __repr__(self):
200 return f'{self.__class__}(conn=`{self.db_conn}`)'
201
202 def __str__(self):
203 return self.__repr__()
204
182 205 def _create_client(self):
183 206 args = {}
184 207
185 208 if self.url is not None:
186 209 args.update(url=self.url)
187 210
188 211 else:
189 212 args.update(
190 213 host=self.host, password=self.password,
191 214 port=self.port, db=self.db
192 215 )
193 216
194 217 connection_pool = redis.ConnectionPool(**args)
195 218 self.writer_client = redis.StrictRedis(
196 219 connection_pool=connection_pool
197 220 )
198 221 self.reader_client = self.writer_client
199 222
200 def list_keys(self, prefix=''):
201 prefix = '{}:{}*'.format(self.key_prefix, prefix)
223 def _get_keys_pattern(self, prefix: bytes = b''):
224 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
225
226 def list_keys(self, prefix: bytes = b''):
227 prefix = self._get_keys_pattern(prefix)
202 228 return self.reader_client.keys(prefix)
203 229
204 230 def get_store(self):
205 231 return self.reader_client.connection_pool
206 232
207 233 def get_mutex(self, key):
208 234 if self.distributed_lock:
209 lock_key = '_lock_{0}'.format(safe_str(key))
235 lock_key = f'_lock_{safe_str(key)}'
210 236 return get_mutex_lock(
211 237 self.writer_client, lock_key,
212 238 self._lock_timeout,
213 239 auto_renewal=self._lock_auto_renewal
214 240 )
215 241 else:
216 242 return None
217 243
218 244
219 245 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
220 246 key_prefix = 'redis_pickle_backend'
221 247 pass
222 248
223 249
224 250 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
225 251 key_prefix = 'redis_msgpack_backend'
226 252 pass
227 253
228 254
229 255 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
230 256 from rhodecode.lib._vendor import redis_lock
231 257
232 258 class _RedisLockWrapper(object):
233 259 """LockWrapper for redis_lock"""
234 260
235 261 @classmethod
236 262 def get_lock(cls):
237 263 return redis_lock.Lock(
238 264 redis_client=client,
239 265 name=lock_key,
240 266 expire=lock_timeout,
241 267 auto_renewal=auto_renewal,
242 268 strict=True,
243 269 )
244 270
245 271 def __repr__(self):
246 return "{}:{}".format(self.__class__.__name__, lock_key)
272 return f"{self.__class__.__name__}:{lock_key}"
247 273
248 274 def __str__(self):
249 return "{}:{}".format(self.__class__.__name__, lock_key)
275 return f"{self.__class__.__name__}:{lock_key}"
250 276
251 277 def __init__(self):
252 278 self.lock = self.get_lock()
253 279 self.lock_key = lock_key
254 280
255 281 def acquire(self, wait=True):
256 282 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
257 283 try:
258 284 acquired = self.lock.acquire(wait)
259 285 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
260 286 return acquired
261 287 except redis_lock.AlreadyAcquired:
262 288 return False
263 289 except redis_lock.AlreadyStarted:
264 290 # refresh thread exists, but it also means we acquired the lock
265 291 return True
266 292
267 293 def release(self):
268 294 try:
269 295 self.lock.release()
270 296 except redis_lock.NotAcquired:
271 297 pass
272 298
273 299 return _RedisLockWrapper()
@@ -1,70 +1,71 b''
1 1
2 2
3 3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import os
22 import sys
23 21 import atexit
24 22 import logging
23 import os
25 24 import signal
25 import sys
26
26 27 import rhodecode
27 28
28 29 log = logging.getLogger(__name__)
29 30
30 31 cache_keys_by_pid = set()
31 32
32 33
33 34 def sigHandler(signo, frame):
34 35 """
35 36 signals trigger sys.exit so there's a single handler to cleanup the code.
36 37 """
37 38 if rhodecode.is_test:
38 39 return
39 40
40 41 sys.exit(0)
41 42
42 43
43 44 def free_cache_keys(*args):
44 from rhodecode.model.db import Session, CacheKey
45 from rhodecode.model.db import CacheKey, Session
45 46
46 47 if rhodecode.is_test:
47 48 return
48 49
49 50 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
50 51 if ssh_cmd:
51 52 return
52 53
53 54 if cache_keys_by_pid:
54 55 try:
55 56 for cache_proc in set(cache_keys_by_pid):
56 57 like_expression = '{}%'.format(cache_proc)
57 58 qry = CacheKey.query().filter(CacheKey.cache_key.like(like_expression))
58 59 count = qry.count()
59 60 log.info('Clearing %s: %s cache keys, total: %s', cache_proc, len(cache_keys_by_pid), count)
60 61 qry.delete(synchronize_session='fetch')
61 62 cache_keys_by_pid.remove(cache_proc)
62 63 Session().commit()
63 64 except Exception:
64 65 log.exception('Failed to clear keys, exiting gracefully')
65 66
66 67 atexit.register(free_cache_keys)
67 68
68 69 signal.signal(signal.SIGTERM, sigHandler)
69 70 signal.signal(signal.SIGINT, sigHandler)
70 71
@@ -1,28 +1,27 b''
1
2
3 1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 2 #
5 3 # This program is free software: you can redistribute it and/or modify
6 4 # it under the terms of the GNU Affero General Public License, version 3
7 5 # (only), as published by the Free Software Foundation.
8 6 #
9 7 # This program is distributed in the hope that it will be useful,
10 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 10 # GNU General Public License for more details.
13 11 #
14 12 # You should have received a copy of the GNU Affero General Public License
15 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 14 #
17 15 # This program is dual-licensed. If you wish to learn more about the
18 16 # RhodeCode Enterprise Edition, including its added features, Support services,
19 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
20 19 import os
21 20 import tempfile
22 21
23 22 dogpile_config_defaults = {
24 23 'cache_dir': os.path.join(tempfile.gettempdir(), 'rc_cache')
25 24 }
26 25
27 26 # GLOBAL TO STORE ALL REGISTERED REGIONS
28 27 dogpile_cache_regions = {}
@@ -1,370 +1,405 b''
1 1 # Copyright (C) 2015-2020 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 import os
19 import time
20 import logging
18
21 19 import functools
22 import decorator
20 import logging
21 import os
23 22 import threading
23 import time
24 24
25 import decorator
25 26 from dogpile.cache import CacheRegion
26 27
27 28 import rhodecode
28 29 from rhodecode.lib.hash_utils import sha1
30 from rhodecode.lib.str_utils import safe_bytes
29 31 from rhodecode.lib.type_utils import str2bool
30 from rhodecode.lib.str_utils import safe_bytes
31 32
32 from rhodecode.lib.rc_cache import cache_key_meta
33 from rhodecode.lib.rc_cache import region_meta
33 from . import region_meta, cache_key_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
37 37
38 38 def isCython(func):
39 39 """
40 40 Private helper that checks if a function is a cython function.
41 41 """
42 42 return func.__class__.__name__ == 'cython_function_or_method'
43 43
44 44
45 45 class RhodeCodeCacheRegion(CacheRegion):
46 46
47 def __repr__(self):
48 return f'{self.__class__}(name={self.name})'
49
47 50 def conditional_cache_on_arguments(
48 51 self, namespace=None,
49 52 expiration_time=None,
50 53 should_cache_fn=None,
51 54 to_str=str,
52 55 function_key_generator=None,
53 56 condition=True):
54 57 """
55 58 Custom conditional decorator, that will not touch any dogpile internals if
56 condition isn't meet. This works a bit different than should_cache_fn
59 condition isn't meet. This works a bit different from should_cache_fn
57 60 And it's faster in cases we don't ever want to compute cached values
58 61 """
59 62 expiration_time_is_callable = callable(expiration_time)
63 if not namespace:
64 namespace = getattr(self, '_default_namespace', None)
60 65
61 66 if function_key_generator is None:
62 67 function_key_generator = self.function_key_generator
63 68
64 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
65 70
66 71 if not condition:
67 72 log.debug('Calling un-cached method:%s', user_func.__name__)
68 73 start = time.time()
69 74 result = user_func(*arg, **kw)
70 75 total = time.time() - start
71 76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
72 77 return result
73 78
74 key = key_generator(*arg, **kw)
79 key = func_key_generator(*arg, **kw)
75 80
76 81 timeout = expiration_time() if expiration_time_is_callable \
77 82 else expiration_time
78 83
79 84 log.debug('Calling cached method:`%s`', user_func.__name__)
80 85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
81 86
82 87 def cache_decorator(user_func):
83 88 if to_str is str:
84 89 # backwards compatible
85 90 key_generator = function_key_generator(namespace, user_func)
86 91 else:
87 92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
88 93
89 94 def refresh(*arg, **kw):
90 95 """
91 96 Like invalidate, but regenerates the value instead
92 97 """
93 98 key = key_generator(*arg, **kw)
94 99 value = user_func(*arg, **kw)
95 100 self.set(key, value)
96 101 return value
97 102
98 103 def invalidate(*arg, **kw):
99 104 key = key_generator(*arg, **kw)
100 105 self.delete(key)
101 106
102 107 def set_(value, *arg, **kw):
103 108 key = key_generator(*arg, **kw)
104 109 self.set(key, value)
105 110
106 111 def get(*arg, **kw):
107 112 key = key_generator(*arg, **kw)
108 113 return self.get(key)
109 114
110 115 user_func.set = set_
111 116 user_func.invalidate = invalidate
112 117 user_func.get = get
113 118 user_func.refresh = refresh
114 119 user_func.key_generator = key_generator
115 120 user_func.original = user_func
116 121
117 122 # Use `decorate` to preserve the signature of :param:`user_func`.
118 123 return decorator.decorate(user_func, functools.partial(
119 124 get_or_create_for_user_func, key_generator))
120 125
121 126 return cache_decorator
122 127
123 128
124 129 def make_region(*arg, **kw):
125 130 return RhodeCodeCacheRegion(*arg, **kw)
126 131
127 132
128 133 def get_default_cache_settings(settings, prefixes=None):
129 134 prefixes = prefixes or []
130 135 cache_settings = {}
131 136 for key in settings.keys():
132 137 for prefix in prefixes:
133 138 if key.startswith(prefix):
134 139 name = key.split(prefix)[1].strip()
135 140 val = settings[key]
136 141 if isinstance(val, str):
137 142 val = val.strip()
138 143 cache_settings[name] = val
139 144 return cache_settings
140 145
141 146
142 147 def compute_key_from_params(*args):
143 148 """
144 149 Helper to compute key from given params to be used in cache manager
145 150 """
146 151 return sha1(safe_bytes("_".join(map(str, args))))
147 152
148 153
149 def backend_key_generator(backend):
150 """
151 Special wrapper that also sends over the backend to the key generator
152 """
153 def wrapper(namespace, fn):
154 return key_generator(backend, namespace, fn)
155 return wrapper
156
157
158 def key_generator(backend, namespace, fn):
159 fname = fn.__name__
154 def custom_key_generator(backend, namespace, fn):
155 func_name = fn.__name__
160 156
161 157 def generate_key(*args):
162 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
163 159 namespace_pref = namespace or 'default_namespace'
164 160 arg_key = compute_key_from_params(*args)
165 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
166 162
167 163 return final_key
168 164
169 165 return generate_key
170 166
171 167
172 def get_or_create_region(region_name, region_namespace=None):
173 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
168 def backend_key_generator(backend):
169 """
170 Special wrapper that also sends over the backend to the key generator
171 """
172 def wrapper(namespace, fn):
173 return custom_key_generator(backend, namespace, fn)
174 return wrapper
175
176
177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 from .backends import FileNamespaceBackend
179 from . import async_creation_runner
180
174 181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
175 182 if not region_obj:
176 raise EnvironmentError(
177 'Region `{}` not in configured: {}.'.format(
178 region_name, list(region_meta.dogpile_cache_regions.keys())))
183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
184 raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.')
185
186 region_uid_name = f'{region_name}:{region_namespace}'
179 187
180 region_uid_name = '{}:{}'.format(region_name, region_namespace)
181 188 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
189 if not region_namespace:
190 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
191
182 192 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
183 193 if region_exist:
184 194 log.debug('Using already configured region: %s', region_namespace)
185 195 return region_exist
186 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
196
187 197 expiration_time = region_obj.expiration_time
188 198
189 if not os.path.isdir(cache_dir):
190 os.makedirs(cache_dir)
199 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
200 namespace_cache_dir = cache_dir
201
202 # we default the namespace_cache_dir to our default cache dir.
203 # however if this backend is configured with filename= param, we prioritize that
204 # so all caches within that particular region, even those namespaced end up in the same path
205 if region_obj.actual_backend.filename:
206 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
207
208 if not os.path.isdir(namespace_cache_dir):
209 os.makedirs(namespace_cache_dir)
191 210 new_region = make_region(
192 211 name=region_uid_name,
193 212 function_key_generator=backend_key_generator(region_obj.actual_backend)
194 213 )
214
195 215 namespace_filename = os.path.join(
196 cache_dir, "{}.cache.dbm".format(region_namespace))
216 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
197 217 # special type that allows 1db per namespace
198 218 new_region.configure(
199 219 backend='dogpile.cache.rc.file_namespace',
200 220 expiration_time=expiration_time,
201 221 arguments={"filename": namespace_filename}
202 222 )
203 223
204 224 # create and save in region caches
205 225 log.debug('configuring new region: %s', region_uid_name)
206 226 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
207 227
228 region_obj._default_namespace = region_namespace
229 if use_async_runner:
230 region_obj.async_creation_runner = async_creation_runner
208 231 return region_obj
209 232
210 233
211 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
212 region = get_or_create_region(cache_region, cache_namespace_uid)
213 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
214 num_delete_keys = len(cache_keys)
215 if invalidate:
216 region.invalidate(hard=False)
217 else:
218 if num_delete_keys:
219 region.delete_multi(cache_keys)
220 return num_delete_keys
234 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
235 from . import CLEAR_DELETE, CLEAR_INVALIDATE
236
237 if not isinstance(cache_region, RhodeCodeCacheRegion):
238 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
239 log.debug('clearing cache region: %s with method=%s', cache_region, method)
240
241 num_affected_keys = None
242
243 if method == CLEAR_INVALIDATE:
244 # NOTE: The CacheRegion.invalidate() method’s default mode of
245 # operation is to set a timestamp local to this CacheRegion in this Python process only.
246 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
247 cache_region.invalidate(hard=True)
248
249 if method == CLEAR_DELETE:
250 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
251 num_affected_keys = len(cache_keys)
252 if num_affected_keys:
253 cache_region.delete_multi(cache_keys)
254
255 return num_affected_keys
221 256
222 257
223 258 class ActiveRegionCache(object):
224 259 def __init__(self, context, cache_data):
225 260 self.context = context
226 261 self.cache_data = cache_data
227 262
228 263 def should_invalidate(self):
229 264 return False
230 265
231 266
232 267 class FreshRegionCache(object):
233 268 def __init__(self, context, cache_data):
234 269 self.context = context
235 270 self.cache_data = cache_data
236 271
237 272 def should_invalidate(self):
238 273 return True
239 274
240 275
241 276 class InvalidationContext(object):
242 277 """
243 278 usage::
244 279
245 280 from rhodecode.lib import rc_cache
246 281
247 282 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
248 283 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
249 284
250 285 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
251 286 def heavy_compute(cache_name, param1, param2):
252 287 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
253 288
254 289 # invalidation namespace is shared namespace key for all process caches
255 290 # we use it to send a global signal
256 291 invalidation_namespace = 'repo_cache:1'
257 292
258 293 inv_context_manager = rc_cache.InvalidationContext(
259 294 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
260 295 with inv_context_manager as invalidation_context:
261 296 args = ('one', 'two')
262 297 # re-compute and store cache if we get invalidate signal
263 298 if invalidation_context.should_invalidate():
264 299 result = heavy_compute.refresh(*args)
265 300 else:
266 301 result = heavy_compute(*args)
267 302
268 303 compute_time = inv_context_manager.compute_time
269 304 log.debug('result computed in %.4fs', compute_time)
270 305
271 306 # To send global invalidation signal, simply run
272 307 CacheKey.set_invalidate(invalidation_namespace)
273 308
274 309 """
275 310
276 311 def __repr__(self):
277 312 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
278 313
279 314 def __init__(self, uid, invalidation_namespace='',
280 315 raise_exception=False, thread_scoped=None):
281 316 self.uid = uid
282 317 self.invalidation_namespace = invalidation_namespace
283 318 self.raise_exception = raise_exception
284 319 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
285 320 self.thread_id = 'global'
286 321
287 322 if thread_scoped is None:
288 323 # if we set "default" we can override this via .ini settings
289 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
324 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
290 325
291 326 # Append the thread id to the cache key if this invalidation context
292 327 # should be scoped to the current thread.
293 328 if thread_scoped is True:
294 329 self.thread_id = threading.current_thread().ident
295 330
296 331 self.cache_key = compute_key_from_params(uid)
297 332 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
298 333 self.proc_id, self.thread_id, self.cache_key)
299 334 self.proc_key = 'proc:{}'.format(self.proc_id)
300 335 self.compute_time = 0
301 336
302 337 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
303 338 from rhodecode.model.db import CacheKey
304 339
305 340 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
306 341 # fetch all cache keys for this namespace and convert them to a map to find if we
307 342 # have specific cache_key object registered. We do this because we want to have
308 343 # all consistent cache_state_uid for newly registered objects
309 344 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
310 345 cache_obj = cache_obj_map.get(self.cache_key)
311 346 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
312 347
313 348 if not cache_obj:
314 349 new_cache_args = invalidation_namespace
315 350 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
316 351 cache_state_uid = None
317 352 if first_cache_obj:
318 353 cache_state_uid = first_cache_obj.cache_state_uid
319 354 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
320 355 cache_state_uid=cache_state_uid)
321 356 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
322 357
323 358 return cache_obj
324 359
325 360 def __enter__(self):
326 361 """
327 362 Test if current object is valid, and return CacheRegion function
328 363 that does invalidation and calculation
329 364 """
330 365 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
331 366 # register or get a new key based on uid
332 367 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
333 368 cache_data = self.cache_obj.get_dict()
334 369 self._start_time = time.time()
335 370 if self.cache_obj.cache_active:
336 371 # means our cache obj is existing and marked as it's
337 372 # cache is not outdated, we return ActiveRegionCache
338 373 self.skip_cache_active_change = True
339 374
340 375 return ActiveRegionCache(context=self, cache_data=cache_data)
341 376
342 377 # the key is either not existing or set to False, we return
343 378 # the real invalidator which re-computes value. We additionally set
344 379 # the flag to actually update the Database objects
345 380 self.skip_cache_active_change = False
346 381 return FreshRegionCache(context=self, cache_data=cache_data)
347 382
348 383 def __exit__(self, exc_type, exc_val, exc_tb):
349 from rhodecode.model.db import Session, IntegrityError
384 from rhodecode.model.db import IntegrityError, Session
350 385
351 386 # save compute time
352 387 self.compute_time = time.time() - self._start_time
353 388
354 389 if self.skip_cache_active_change:
355 390 return
356 391
357 392 try:
358 393 self.cache_obj.cache_active = True
359 394 Session().add(self.cache_obj)
360 395 Session().commit()
361 396 except IntegrityError:
362 397 # if we catch integrity error, it means we inserted this object
363 398 # assumption is that's really an edge race-condition case and
364 399 # it's safe is to skip it
365 400 Session().rollback()
366 401 except Exception:
367 402 log.exception('Failed to commit on cache key update')
368 403 Session().rollback()
369 404 if self.raise_exception:
370 405 raise
General Comments 0
You need to be logged in to leave comments. Login now