##// END OF EJS Templates
caches: new updated rc_cache with archive cache module and python3 changes
super-admin -
r5067:bf4fcdb7 default
parent child Browse files
Show More
@@ -0,0 +1,88 b''
1 # Copyright (C) 2015-2020 RhodeCode GmbH
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
11 #
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
19 import logging
20 import os
21 import diskcache
22 from diskcache import RLock
23
24 log = logging.getLogger(__name__)
25
26 cache_meta = None
27
28
29 class ReentrantLock(RLock):
30 def __enter__(self):
31 reentrant_lock_key = self._key
32
33 log.debug('Acquire ReentrantLock(key=%s) for archive cache generation...', reentrant_lock_key)
34 #self.acquire()
35 log.debug('Lock for key=%s acquired', reentrant_lock_key)
36
37 def __exit__(self, *exc_info):
38 #self.release()
39 pass
40
41
42 def get_archival_config(config):
43
44 final_config = {
45 'archive_cache.eviction_policy': 'least-frequently-used'
46 }
47
48 for k, v in config.items():
49 if k.startswith('archive_cache'):
50 final_config[k] = v
51
52 return final_config
53
54
55 def get_archival_cache_store(config):
56
57 global cache_meta
58 if cache_meta is not None:
59 return cache_meta
60
61 config = get_archival_config(config)
62
63 archive_cache_dir = config['archive_cache.store_dir']
64 archive_cache_size_gb = config['archive_cache.cache_size_gb']
65 archive_cache_shards = config['archive_cache.cache_shards']
66 archive_cache_eviction_policy = config['archive_cache.eviction_policy']
67
68 log.debug('Initializing archival cache instance under %s', archive_cache_dir)
69
70 # check if it's ok to write, and re-create the archive cache
71 if not os.path.isdir(archive_cache_dir):
72 os.makedirs(archive_cache_dir, exist_ok=True)
73
74 d_cache = diskcache.FanoutCache(
75 archive_cache_dir, shards=archive_cache_shards,
76 cull_limit=0, # manual eviction required
77 size_limit=archive_cache_size_gb * 1024 * 1024 * 1024,
78 eviction_policy=archive_cache_eviction_policy,
79 timeout=30
80 )
81 cache_meta = d_cache
82 return cache_meta
83
84
85 def includeme(config):
86 # init our cache at start
87 settings = config.get_settings()
88 get_archival_cache_store(settings)
@@ -1,89 +1,120 b''
1
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
2 #
5 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
8 #
6 #
9 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
10 # GNU General Public License for more details.
13 #
11 #
14 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
14 #
17 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
18
21 import logging
19 import logging
20 import threading
21
22 from dogpile.cache import register_backend
22 from dogpile.cache import register_backend
23
24 from . import region_meta
25 from .utils import (
26 ActiveRegionCache,
27 FreshRegionCache,
28 InvalidationContext,
29 backend_key_generator,
30 clear_cache_namespace,
31 get_default_cache_settings,
32 get_or_create_region,
33 make_region,
34 str2bool,
35 )
36
23 module_name = 'rhodecode'
37 module_name = 'rhodecode'
24
38
25 register_backend(
39 register_backend(
26 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
40 "dogpile.cache.rc.memory_lru", f"{module_name}.lib.rc_cache.backends",
27 "LRUMemoryBackend")
41 "LRUMemoryBackend")
28
42
29 register_backend(
43 register_backend(
30 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
44 "dogpile.cache.rc.file_namespace", f"{module_name}.lib.rc_cache.backends",
31 "FileNamespaceBackend")
45 "FileNamespaceBackend")
32
46
33 register_backend(
47 register_backend(
34 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
48 "dogpile.cache.rc.redis", f"{module_name}.lib.rc_cache.backends",
35 "RedisPickleBackend")
49 "RedisPickleBackend")
36
50
37 register_backend(
51 register_backend(
38 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
52 "dogpile.cache.rc.redis_msgpack", f"{module_name}.lib.rc_cache.backends",
39 "RedisMsgPackBackend")
53 "RedisMsgPackBackend")
40
54
41
55
42 log = logging.getLogger(__name__)
56 log = logging.getLogger(__name__)
43
57
44 from . import region_meta
45 from .utils import (
46 get_default_cache_settings, backend_key_generator, get_or_create_region,
47 clear_cache_namespace, make_region, InvalidationContext,
48 FreshRegionCache, ActiveRegionCache
49 )
50
51
58
52 FILE_TREE_CACHE_VER = 'v4'
59 FILE_TREE_CACHE_VER = 'v4'
53 LICENSE_CACHE_VER = 'v2'
60 LICENSE_CACHE_VER = 'v2'
54
61
55
62
63 CLEAR_DELETE = 'delete'
64 CLEAR_INVALIDATE = 'invalidate'
65
66
67 def async_creation_runner(cache, somekey, creator, mutex):
68
69 def runner():
70 try:
71 value = creator()
72 cache.set(somekey, value)
73 finally:
74 mutex.release()
75
76 thread = threading.Thread(target=runner)
77 thread.start()
78
79
56 def configure_dogpile_cache(settings):
80 def configure_dogpile_cache(settings):
57 cache_dir = settings.get('cache_dir')
81 cache_dir = settings.get('cache_dir')
58 if cache_dir:
82 if cache_dir:
59 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
83 region_meta.dogpile_config_defaults['cache_dir'] = cache_dir
60
84
61 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
85 rc_cache_data = get_default_cache_settings(settings, prefixes=['rc_cache.'])
62
86
63 # inspect available namespaces
87 # inspect available namespaces
64 avail_regions = set()
88 avail_regions = set()
65 for key in rc_cache_data.keys():
89 for key in rc_cache_data.keys():
66 namespace_name = key.split('.', 1)[0]
90 namespace_name = key.split('.', 1)[0]
67 if namespace_name in avail_regions:
91 if namespace_name in avail_regions:
68 continue
92 continue
69
93
70 avail_regions.add(namespace_name)
94 avail_regions.add(namespace_name)
71 log.debug('dogpile: found following cache regions: %s', namespace_name)
95 log.debug('dogpile: found following cache regions: %s', namespace_name)
72
96
73 new_region = make_region(
97 new_region = make_region(
74 name=namespace_name,
98 name=namespace_name,
75 function_key_generator=None
99 function_key_generator=None,
100 async_creation_runner=None
76 )
101 )
77
102
78 new_region.configure_from_config(settings, 'rc_cache.{}.'.format(namespace_name))
103 new_region.configure_from_config(settings, f'rc_cache.{namespace_name}.')
79 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
104 new_region.function_key_generator = backend_key_generator(new_region.actual_backend)
105
106 async_creator = str2bool(settings.pop(f'rc_cache.{namespace_name}.async_creator', 'false'))
107 if async_creator:
108 log.debug('configuring region %s with async creator', new_region)
109 new_region.async_creation_runner = async_creation_runner
110
80 if log.isEnabledFor(logging.DEBUG):
111 if log.isEnabledFor(logging.DEBUG):
81 region_args = dict(backend=new_region.actual_backend.__class__,
112 region_args = dict(backend=new_region.actual_backend,
82 region_invalidator=new_region.region_invalidator.__class__)
113 region_invalidator=new_region.region_invalidator.__class__)
83 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
114 log.debug('dogpile: registering a new region `%s` %s', namespace_name, region_args)
84
115
85 region_meta.dogpile_cache_regions[namespace_name] = new_region
116 region_meta.dogpile_cache_regions[namespace_name] = new_region
86
117
87
118
88 def includeme(config):
119 def includeme(config):
89 configure_dogpile_cache(config.registry.settings)
120 configure_dogpile_cache(config.registry.settings)
@@ -1,273 +1,299 b''
1
2 # Copyright (C) 2015-2020 RhodeCode GmbH
1 # Copyright (C) 2015-2020 RhodeCode GmbH
3 #
2 #
4 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
6 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
7 #
6 #
8 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
10 # GNU General Public License for more details.
12 #
11 #
13 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 #
14 #
16 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
17 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
19
18
20 import time
21 import errno
19 import errno
20 import fcntl
21 import functools
22 import logging
22 import logging
23 import functools
23 import os
24 import pickle
25 import time
24
26
27 import gevent
25 import msgpack
28 import msgpack
26 import redis
29 import redis
27 import gevent
30
28 import pickle
29 import fcntl
30 flock_org = fcntl.flock
31 flock_org = fcntl.flock
31 from typing import Union
32 from typing import Union
32
33
34 from dogpile.cache.api import Deserializer, Serializer
35 from dogpile.cache.backends import file as file_backend
33 from dogpile.cache.backends import memory as memory_backend
36 from dogpile.cache.backends import memory as memory_backend
34 from dogpile.cache.backends import file as file_backend
35 from dogpile.cache.backends import redis as redis_backend
37 from dogpile.cache.backends import redis as redis_backend
36 from dogpile.cache.backends.file import FileLock
38 from dogpile.cache.backends.file import FileLock
37 from dogpile.cache.util import memoized_property
39 from dogpile.cache.util import memoized_property
38 from dogpile.cache.api import Serializer, Deserializer
39
40 from pyramid.settings import asbool
41
40
42 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from rhodecode.lib.memory_lru_dict import LRUDict, LRUDictDebug
43 from rhodecode.lib.str_utils import safe_str, safe_bytes
42 from rhodecode.lib.str_utils import safe_bytes, safe_str
44
43 from rhodecode.lib.type_utils import str2bool
45
44
46 _default_max_size = 1024
45 _default_max_size = 1024
47
46
48 log = logging.getLogger(__name__)
47 log = logging.getLogger(__name__)
49
48
50
49
51 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 class LRUMemoryBackend(memory_backend.MemoryBackend):
52 key_prefix = 'lru_mem_backend'
51 key_prefix = 'lru_mem_backend'
53 pickle_values = False
52 pickle_values = False
54
53
55 def __init__(self, arguments):
54 def __init__(self, arguments):
56 max_size = arguments.pop('max_size', _default_max_size)
55 self.max_size = arguments.pop('max_size', _default_max_size)
57
56
58 LRUDictClass = LRUDict
57 LRUDictClass = LRUDict
59 if arguments.pop('log_key_count', None):
58 if arguments.pop('log_key_count', None):
60 LRUDictClass = LRUDictDebug
59 LRUDictClass = LRUDictDebug
61
60
62 arguments['cache_dict'] = LRUDictClass(max_size)
61 arguments['cache_dict'] = LRUDictClass(self.max_size)
63 super(LRUMemoryBackend, self).__init__(arguments)
62 super().__init__(arguments)
63
64 def __repr__(self):
65 return f'{self.__class__}(maxsize=`{self.max_size}`)'
66
67 def __str__(self):
68 return self.__repr__()
64
69
65 def delete(self, key):
70 def delete(self, key):
66 try:
71 try:
67 del self._cache[key]
72 del self._cache[key]
68 except KeyError:
73 except KeyError:
69 # we don't care if key isn't there at deletion
74 # we don't care if key isn't there at deletion
70 pass
75 pass
71
76
72 def delete_multi(self, keys):
77 def delete_multi(self, keys):
73 for key in keys:
78 for key in keys:
74 self.delete(key)
79 self.delete(key)
75
80
76
81
77 class PickleSerializer:
82 class PickleSerializer:
78 serializer: Union[None, Serializer] = staticmethod( # type: ignore
83 serializer: None | Serializer = staticmethod( # type: ignore
79 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
84 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
80 )
85 )
81 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
86 deserializer: None | Deserializer = staticmethod( # type: ignore
82 functools.partial(pickle.loads)
87 functools.partial(pickle.loads)
83 )
88 )
84
89
85
90
86 class MsgPackSerializer(object):
91 class MsgPackSerializer(object):
87 serializer: Union[None, Serializer] = staticmethod( # type: ignore
92 serializer: None | Serializer = staticmethod( # type: ignore
88 msgpack.packb
93 msgpack.packb
89 )
94 )
90 deserializer: Union[None, Deserializer] = staticmethod( # type: ignore
95 deserializer: None | Deserializer = staticmethod( # type: ignore
91 functools.partial(msgpack.unpackb, use_list=False)
96 functools.partial(msgpack.unpackb, use_list=False)
92 )
97 )
93
98
94
99
95 class CustomLockFactory(FileLock):
100 class CustomLockFactory(FileLock):
96
101
97 @memoized_property
102 @memoized_property
98 def _module(self):
103 def _module(self):
99
104
100 def gevent_flock(fd, operation):
105 def gevent_flock(fd, operation):
101 """
106 """
102 Gevent compatible flock
107 Gevent compatible flock
103 """
108 """
104 # set non-blocking, this will cause an exception if we cannot acquire a lock
109 # set non-blocking, this will cause an exception if we cannot acquire a lock
105 operation |= fcntl.LOCK_NB
110 operation |= fcntl.LOCK_NB
106 start_lock_time = time.time()
111 start_lock_time = time.time()
107 timeout = 60 * 15 # 15min
112 timeout = 60 * 15 # 15min
108 while True:
113 while True:
109 try:
114 try:
110 flock_org(fd, operation)
115 flock_org(fd, operation)
111 # lock has been acquired
116 # lock has been acquired
112 break
117 break
113 except (OSError, IOError) as e:
118 except (OSError, IOError) as e:
114 # raise on other errors than Resource temporarily unavailable
119 # raise on other errors than Resource temporarily unavailable
115 if e.errno != errno.EAGAIN:
120 if e.errno != errno.EAGAIN:
116 raise
121 raise
117 elif (time.time() - start_lock_time) > timeout:
122 elif (time.time() - start_lock_time) > timeout:
118 # waited to much time on a lock, better fail than loop for ever
123 # waited to much time on a lock, better fail than loop for ever
119 log.error('Failed to acquire lock on `%s` after waiting %ss',
124 log.error('Failed to acquire lock on `%s` after waiting %ss',
120 self.filename, timeout)
125 self.filename, timeout)
121 raise
126 raise
122 wait_timeout = 0.03
127 wait_timeout = 0.03
123 log.debug('Failed to acquire lock on `%s`, retry in %ss',
128 log.debug('Failed to acquire lock on `%s`, retry in %ss',
124 self.filename, wait_timeout)
129 self.filename, wait_timeout)
125 gevent.sleep(wait_timeout)
130 gevent.sleep(wait_timeout)
126
131
127 fcntl.flock = gevent_flock
132 fcntl.flock = gevent_flock
128 return fcntl
133 return fcntl
129
134
130
135
131 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
136 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
132 key_prefix = 'file_backend'
137 key_prefix = 'file_backend'
133
138
134 def __init__(self, arguments):
139 def __init__(self, arguments):
135 arguments['lock_factory'] = CustomLockFactory
140 arguments['lock_factory'] = CustomLockFactory
136 db_file = arguments.get('filename')
141 db_file = arguments.get('filename')
137
142
138 log.debug('initialing %s DB in %s', self.__class__.__name__, db_file)
143 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
144 db_file_dir = os.path.dirname(db_file)
145 if not os.path.isdir(db_file_dir):
146 os.makedirs(db_file_dir)
147
139 try:
148 try:
140 super(FileNamespaceBackend, self).__init__(arguments)
149 super().__init__(arguments)
141 except Exception:
150 except Exception:
142 log.exception('Failed to initialize db at: %s', db_file)
151 log.exception('Failed to initialize db at: %s', db_file)
143 raise
152 raise
144
153
145 def __repr__(self):
154 def __repr__(self):
146 return '{} `{}`'.format(self.__class__, self.filename)
155 return f'{self.__class__}(file=`{self.filename}`)'
156
157 def __str__(self):
158 return self.__repr__()
159
160 def _get_keys_pattern(self, prefix: bytes = b''):
161 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
147
162
148 def list_keys(self, prefix: bytes = b''):
163 def list_keys(self, prefix: bytes = b''):
149 prefix = b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
164 prefix = self._get_keys_pattern(prefix)
150
165
151 def cond(dbm_key: bytes):
166 def cond(dbm_key: bytes):
152 if not prefix:
167 if not prefix:
153 return True
168 return True
154
169
155 if dbm_key.startswith(prefix):
170 if dbm_key.startswith(prefix):
156 return True
171 return True
157 return False
172 return False
158
173
159 with self._dbm_file(True) as dbm:
174 with self._dbm_file(True) as dbm:
160 try:
175 try:
161 return list(filter(cond, dbm.keys()))
176 return list(filter(cond, dbm.keys()))
162 except Exception:
177 except Exception:
163 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
178 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
164 raise
179 raise
165
180
166 def get_store(self):
181 def get_store(self):
167 return self.filename
182 return self.filename
168
183
169
184
170 class BaseRedisBackend(redis_backend.RedisBackend):
185 class BaseRedisBackend(redis_backend.RedisBackend):
171 key_prefix = ''
186 key_prefix = ''
172
187
173 def __init__(self, arguments):
188 def __init__(self, arguments):
174 super(BaseRedisBackend, self).__init__(arguments)
189 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
190 super().__init__(arguments)
191
175 self._lock_timeout = self.lock_timeout
192 self._lock_timeout = self.lock_timeout
176 self._lock_auto_renewal = asbool(arguments.pop("lock_auto_renewal", True))
193 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
177
194
178 if self._lock_auto_renewal and not self._lock_timeout:
195 if self._lock_auto_renewal and not self._lock_timeout:
179 # set default timeout for auto_renewal
196 # set default timeout for auto_renewal
180 self._lock_timeout = 30
197 self._lock_timeout = 30
181
198
199 def __repr__(self):
200 return f'{self.__class__}(conn=`{self.db_conn}`)'
201
202 def __str__(self):
203 return self.__repr__()
204
182 def _create_client(self):
205 def _create_client(self):
183 args = {}
206 args = {}
184
207
185 if self.url is not None:
208 if self.url is not None:
186 args.update(url=self.url)
209 args.update(url=self.url)
187
210
188 else:
211 else:
189 args.update(
212 args.update(
190 host=self.host, password=self.password,
213 host=self.host, password=self.password,
191 port=self.port, db=self.db
214 port=self.port, db=self.db
192 )
215 )
193
216
194 connection_pool = redis.ConnectionPool(**args)
217 connection_pool = redis.ConnectionPool(**args)
195 self.writer_client = redis.StrictRedis(
218 self.writer_client = redis.StrictRedis(
196 connection_pool=connection_pool
219 connection_pool=connection_pool
197 )
220 )
198 self.reader_client = self.writer_client
221 self.reader_client = self.writer_client
199
222
200 def list_keys(self, prefix=''):
223 def _get_keys_pattern(self, prefix: bytes = b''):
201 prefix = '{}:{}*'.format(self.key_prefix, prefix)
224 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
225
226 def list_keys(self, prefix: bytes = b''):
227 prefix = self._get_keys_pattern(prefix)
202 return self.reader_client.keys(prefix)
228 return self.reader_client.keys(prefix)
203
229
204 def get_store(self):
230 def get_store(self):
205 return self.reader_client.connection_pool
231 return self.reader_client.connection_pool
206
232
207 def get_mutex(self, key):
233 def get_mutex(self, key):
208 if self.distributed_lock:
234 if self.distributed_lock:
209 lock_key = '_lock_{0}'.format(safe_str(key))
235 lock_key = f'_lock_{safe_str(key)}'
210 return get_mutex_lock(
236 return get_mutex_lock(
211 self.writer_client, lock_key,
237 self.writer_client, lock_key,
212 self._lock_timeout,
238 self._lock_timeout,
213 auto_renewal=self._lock_auto_renewal
239 auto_renewal=self._lock_auto_renewal
214 )
240 )
215 else:
241 else:
216 return None
242 return None
217
243
218
244
219 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
245 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
220 key_prefix = 'redis_pickle_backend'
246 key_prefix = 'redis_pickle_backend'
221 pass
247 pass
222
248
223
249
224 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
250 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
225 key_prefix = 'redis_msgpack_backend'
251 key_prefix = 'redis_msgpack_backend'
226 pass
252 pass
227
253
228
254
229 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
255 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
230 from rhodecode.lib._vendor import redis_lock
256 from rhodecode.lib._vendor import redis_lock
231
257
232 class _RedisLockWrapper(object):
258 class _RedisLockWrapper(object):
233 """LockWrapper for redis_lock"""
259 """LockWrapper for redis_lock"""
234
260
235 @classmethod
261 @classmethod
236 def get_lock(cls):
262 def get_lock(cls):
237 return redis_lock.Lock(
263 return redis_lock.Lock(
238 redis_client=client,
264 redis_client=client,
239 name=lock_key,
265 name=lock_key,
240 expire=lock_timeout,
266 expire=lock_timeout,
241 auto_renewal=auto_renewal,
267 auto_renewal=auto_renewal,
242 strict=True,
268 strict=True,
243 )
269 )
244
270
245 def __repr__(self):
271 def __repr__(self):
246 return "{}:{}".format(self.__class__.__name__, lock_key)
272 return f"{self.__class__.__name__}:{lock_key}"
247
273
248 def __str__(self):
274 def __str__(self):
249 return "{}:{}".format(self.__class__.__name__, lock_key)
275 return f"{self.__class__.__name__}:{lock_key}"
250
276
251 def __init__(self):
277 def __init__(self):
252 self.lock = self.get_lock()
278 self.lock = self.get_lock()
253 self.lock_key = lock_key
279 self.lock_key = lock_key
254
280
255 def acquire(self, wait=True):
281 def acquire(self, wait=True):
256 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
282 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
257 try:
283 try:
258 acquired = self.lock.acquire(wait)
284 acquired = self.lock.acquire(wait)
259 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
285 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
260 return acquired
286 return acquired
261 except redis_lock.AlreadyAcquired:
287 except redis_lock.AlreadyAcquired:
262 return False
288 return False
263 except redis_lock.AlreadyStarted:
289 except redis_lock.AlreadyStarted:
264 # refresh thread exists, but it also means we acquired the lock
290 # refresh thread exists, but it also means we acquired the lock
265 return True
291 return True
266
292
267 def release(self):
293 def release(self):
268 try:
294 try:
269 self.lock.release()
295 self.lock.release()
270 except redis_lock.NotAcquired:
296 except redis_lock.NotAcquired:
271 pass
297 pass
272
298
273 return _RedisLockWrapper()
299 return _RedisLockWrapper()
@@ -1,70 +1,71 b''
1
1
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
22 import sys
23 import atexit
21 import atexit
24 import logging
22 import logging
23 import os
25 import signal
24 import signal
25 import sys
26
26 import rhodecode
27 import rhodecode
27
28
28 log = logging.getLogger(__name__)
29 log = logging.getLogger(__name__)
29
30
30 cache_keys_by_pid = set()
31 cache_keys_by_pid = set()
31
32
32
33
33 def sigHandler(signo, frame):
34 def sigHandler(signo, frame):
34 """
35 """
35 signals trigger sys.exit so there's a single handler to cleanup the code.
36 signals trigger sys.exit so there's a single handler to cleanup the code.
36 """
37 """
37 if rhodecode.is_test:
38 if rhodecode.is_test:
38 return
39 return
39
40
40 sys.exit(0)
41 sys.exit(0)
41
42
42
43
43 def free_cache_keys(*args):
44 def free_cache_keys(*args):
44 from rhodecode.model.db import Session, CacheKey
45 from rhodecode.model.db import CacheKey, Session
45
46
46 if rhodecode.is_test:
47 if rhodecode.is_test:
47 return
48 return
48
49
49 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
50 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
50 if ssh_cmd:
51 if ssh_cmd:
51 return
52 return
52
53
53 if cache_keys_by_pid:
54 if cache_keys_by_pid:
54 try:
55 try:
55 for cache_proc in set(cache_keys_by_pid):
56 for cache_proc in set(cache_keys_by_pid):
56 like_expression = '{}%'.format(cache_proc)
57 like_expression = '{}%'.format(cache_proc)
57 qry = CacheKey.query().filter(CacheKey.cache_key.like(like_expression))
58 qry = CacheKey.query().filter(CacheKey.cache_key.like(like_expression))
58 count = qry.count()
59 count = qry.count()
59 log.info('Clearing %s: %s cache keys, total: %s', cache_proc, len(cache_keys_by_pid), count)
60 log.info('Clearing %s: %s cache keys, total: %s', cache_proc, len(cache_keys_by_pid), count)
60 qry.delete(synchronize_session='fetch')
61 qry.delete(synchronize_session='fetch')
61 cache_keys_by_pid.remove(cache_proc)
62 cache_keys_by_pid.remove(cache_proc)
62 Session().commit()
63 Session().commit()
63 except Exception:
64 except Exception:
64 log.exception('Failed to clear keys, exiting gracefully')
65 log.exception('Failed to clear keys, exiting gracefully')
65
66
66 atexit.register(free_cache_keys)
67 atexit.register(free_cache_keys)
67
68
68 signal.signal(signal.SIGTERM, sigHandler)
69 signal.signal(signal.SIGTERM, sigHandler)
69 signal.signal(signal.SIGINT, sigHandler)
70 signal.signal(signal.SIGINT, sigHandler)
70
71
@@ -1,28 +1,27 b''
1
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
1 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
2 #
5 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
8 #
6 #
9 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
10 # GNU General Public License for more details.
13 #
11 #
14 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
14 #
17 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18
20 import os
19 import os
21 import tempfile
20 import tempfile
22
21
23 dogpile_config_defaults = {
22 dogpile_config_defaults = {
24 'cache_dir': os.path.join(tempfile.gettempdir(), 'rc_cache')
23 'cache_dir': os.path.join(tempfile.gettempdir(), 'rc_cache')
25 }
24 }
26
25
27 # GLOBAL TO STORE ALL REGISTERED REGIONS
26 # GLOBAL TO STORE ALL REGISTERED REGIONS
28 dogpile_cache_regions = {}
27 dogpile_cache_regions = {}
@@ -1,370 +1,405 b''
1 # Copyright (C) 2015-2020 RhodeCode GmbH
1 # Copyright (C) 2015-2020 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 import os
18
19 import time
20 import logging
21 import functools
19 import functools
22 import decorator
20 import logging
21 import os
23 import threading
22 import threading
23 import time
24
24
25 import decorator
25 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
26
27
27 import rhodecode
28 import rhodecode
28 from rhodecode.lib.hash_utils import sha1
29 from rhodecode.lib.hash_utils import sha1
30 from rhodecode.lib.str_utils import safe_bytes
29 from rhodecode.lib.type_utils import str2bool
31 from rhodecode.lib.type_utils import str2bool
30 from rhodecode.lib.str_utils import safe_bytes
31
32
32 from rhodecode.lib.rc_cache import cache_key_meta
33 from . import region_meta, cache_key_meta
33 from rhodecode.lib.rc_cache import region_meta
34
34
35 log = logging.getLogger(__name__)
35 log = logging.getLogger(__name__)
36
36
37
37
38 def isCython(func):
38 def isCython(func):
39 """
39 """
40 Private helper that checks if a function is a cython function.
40 Private helper that checks if a function is a cython function.
41 """
41 """
42 return func.__class__.__name__ == 'cython_function_or_method'
42 return func.__class__.__name__ == 'cython_function_or_method'
43
43
44
44
45 class RhodeCodeCacheRegion(CacheRegion):
45 class RhodeCodeCacheRegion(CacheRegion):
46
46
47 def __repr__(self):
48 return f'{self.__class__}(name={self.name})'
49
47 def conditional_cache_on_arguments(
50 def conditional_cache_on_arguments(
48 self, namespace=None,
51 self, namespace=None,
49 expiration_time=None,
52 expiration_time=None,
50 should_cache_fn=None,
53 should_cache_fn=None,
51 to_str=str,
54 to_str=str,
52 function_key_generator=None,
55 function_key_generator=None,
53 condition=True):
56 condition=True):
54 """
57 """
55 Custom conditional decorator, that will not touch any dogpile internals if
58 Custom conditional decorator, that will not touch any dogpile internals if
56 condition isn't meet. This works a bit different than should_cache_fn
59 condition isn't meet. This works a bit different from should_cache_fn
57 And it's faster in cases we don't ever want to compute cached values
60 And it's faster in cases we don't ever want to compute cached values
58 """
61 """
59 expiration_time_is_callable = callable(expiration_time)
62 expiration_time_is_callable = callable(expiration_time)
63 if not namespace:
64 namespace = getattr(self, '_default_namespace', None)
60
65
61 if function_key_generator is None:
66 if function_key_generator is None:
62 function_key_generator = self.function_key_generator
67 function_key_generator = self.function_key_generator
63
68
64 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
69 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
65
70
66 if not condition:
71 if not condition:
67 log.debug('Calling un-cached method:%s', user_func.__name__)
72 log.debug('Calling un-cached method:%s', user_func.__name__)
68 start = time.time()
73 start = time.time()
69 result = user_func(*arg, **kw)
74 result = user_func(*arg, **kw)
70 total = time.time() - start
75 total = time.time() - start
71 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
76 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
72 return result
77 return result
73
78
74 key = key_generator(*arg, **kw)
79 key = func_key_generator(*arg, **kw)
75
80
76 timeout = expiration_time() if expiration_time_is_callable \
81 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
82 else expiration_time
78
83
79 log.debug('Calling cached method:`%s`', user_func.__name__)
84 log.debug('Calling cached method:`%s`', user_func.__name__)
80 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
85 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
81
86
82 def cache_decorator(user_func):
87 def cache_decorator(user_func):
83 if to_str is str:
88 if to_str is str:
84 # backwards compatible
89 # backwards compatible
85 key_generator = function_key_generator(namespace, user_func)
90 key_generator = function_key_generator(namespace, user_func)
86 else:
91 else:
87 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
92 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
88
93
89 def refresh(*arg, **kw):
94 def refresh(*arg, **kw):
90 """
95 """
91 Like invalidate, but regenerates the value instead
96 Like invalidate, but regenerates the value instead
92 """
97 """
93 key = key_generator(*arg, **kw)
98 key = key_generator(*arg, **kw)
94 value = user_func(*arg, **kw)
99 value = user_func(*arg, **kw)
95 self.set(key, value)
100 self.set(key, value)
96 return value
101 return value
97
102
98 def invalidate(*arg, **kw):
103 def invalidate(*arg, **kw):
99 key = key_generator(*arg, **kw)
104 key = key_generator(*arg, **kw)
100 self.delete(key)
105 self.delete(key)
101
106
102 def set_(value, *arg, **kw):
107 def set_(value, *arg, **kw):
103 key = key_generator(*arg, **kw)
108 key = key_generator(*arg, **kw)
104 self.set(key, value)
109 self.set(key, value)
105
110
106 def get(*arg, **kw):
111 def get(*arg, **kw):
107 key = key_generator(*arg, **kw)
112 key = key_generator(*arg, **kw)
108 return self.get(key)
113 return self.get(key)
109
114
110 user_func.set = set_
115 user_func.set = set_
111 user_func.invalidate = invalidate
116 user_func.invalidate = invalidate
112 user_func.get = get
117 user_func.get = get
113 user_func.refresh = refresh
118 user_func.refresh = refresh
114 user_func.key_generator = key_generator
119 user_func.key_generator = key_generator
115 user_func.original = user_func
120 user_func.original = user_func
116
121
117 # Use `decorate` to preserve the signature of :param:`user_func`.
122 # Use `decorate` to preserve the signature of :param:`user_func`.
118 return decorator.decorate(user_func, functools.partial(
123 return decorator.decorate(user_func, functools.partial(
119 get_or_create_for_user_func, key_generator))
124 get_or_create_for_user_func, key_generator))
120
125
121 return cache_decorator
126 return cache_decorator
122
127
123
128
124 def make_region(*arg, **kw):
129 def make_region(*arg, **kw):
125 return RhodeCodeCacheRegion(*arg, **kw)
130 return RhodeCodeCacheRegion(*arg, **kw)
126
131
127
132
128 def get_default_cache_settings(settings, prefixes=None):
133 def get_default_cache_settings(settings, prefixes=None):
129 prefixes = prefixes or []
134 prefixes = prefixes or []
130 cache_settings = {}
135 cache_settings = {}
131 for key in settings.keys():
136 for key in settings.keys():
132 for prefix in prefixes:
137 for prefix in prefixes:
133 if key.startswith(prefix):
138 if key.startswith(prefix):
134 name = key.split(prefix)[1].strip()
139 name = key.split(prefix)[1].strip()
135 val = settings[key]
140 val = settings[key]
136 if isinstance(val, str):
141 if isinstance(val, str):
137 val = val.strip()
142 val = val.strip()
138 cache_settings[name] = val
143 cache_settings[name] = val
139 return cache_settings
144 return cache_settings
140
145
141
146
142 def compute_key_from_params(*args):
147 def compute_key_from_params(*args):
143 """
148 """
144 Helper to compute key from given params to be used in cache manager
149 Helper to compute key from given params to be used in cache manager
145 """
150 """
146 return sha1(safe_bytes("_".join(map(str, args))))
151 return sha1(safe_bytes("_".join(map(str, args))))
147
152
148
153
149 def backend_key_generator(backend):
154 def custom_key_generator(backend, namespace, fn):
150 """
155 func_name = fn.__name__
151 Special wrapper that also sends over the backend to the key generator
152 """
153 def wrapper(namespace, fn):
154 return key_generator(backend, namespace, fn)
155 return wrapper
156
157
158 def key_generator(backend, namespace, fn):
159 fname = fn.__name__
160
156
161 def generate_key(*args):
157 def generate_key(*args):
162 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
158 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
163 namespace_pref = namespace or 'default_namespace'
159 namespace_pref = namespace or 'default_namespace'
164 arg_key = compute_key_from_params(*args)
160 arg_key = compute_key_from_params(*args)
165 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
161 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
166
162
167 return final_key
163 return final_key
168
164
169 return generate_key
165 return generate_key
170
166
171
167
172 def get_or_create_region(region_name, region_namespace=None):
168 def backend_key_generator(backend):
173 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
169 """
170 Special wrapper that also sends over the backend to the key generator
171 """
172 def wrapper(namespace, fn):
173 return custom_key_generator(backend, namespace, fn)
174 return wrapper
175
176
177 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
178 from .backends import FileNamespaceBackend
179 from . import async_creation_runner
180
174 region_obj = region_meta.dogpile_cache_regions.get(region_name)
181 region_obj = region_meta.dogpile_cache_regions.get(region_name)
175 if not region_obj:
182 if not region_obj:
176 raise EnvironmentError(
183 reg_keys = list(region_meta.dogpile_cache_regions.keys())
177 'Region `{}` not in configured: {}.'.format(
184 raise EnvironmentError(f'Region `{region_name}` not in configured: {reg_keys}.')
178 region_name, list(region_meta.dogpile_cache_regions.keys())))
185
186 region_uid_name = f'{region_name}:{region_namespace}'
179
187
180 region_uid_name = '{}:{}'.format(region_name, region_namespace)
181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
188 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
189 if not region_namespace:
190 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
191
182 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
192 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
183 if region_exist:
193 if region_exist:
184 log.debug('Using already configured region: %s', region_namespace)
194 log.debug('Using already configured region: %s', region_namespace)
185 return region_exist
195 return region_exist
186 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
196
187 expiration_time = region_obj.expiration_time
197 expiration_time = region_obj.expiration_time
188
198
189 if not os.path.isdir(cache_dir):
199 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
190 os.makedirs(cache_dir)
200 namespace_cache_dir = cache_dir
201
202 # we default the namespace_cache_dir to our default cache dir.
203 # however if this backend is configured with filename= param, we prioritize that
204 # so all caches within that particular region, even those namespaced end up in the same path
205 if region_obj.actual_backend.filename:
206 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
207
208 if not os.path.isdir(namespace_cache_dir):
209 os.makedirs(namespace_cache_dir)
191 new_region = make_region(
210 new_region = make_region(
192 name=region_uid_name,
211 name=region_uid_name,
193 function_key_generator=backend_key_generator(region_obj.actual_backend)
212 function_key_generator=backend_key_generator(region_obj.actual_backend)
194 )
213 )
214
195 namespace_filename = os.path.join(
215 namespace_filename = os.path.join(
196 cache_dir, "{}.cache.dbm".format(region_namespace))
216 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
197 # special type that allows 1db per namespace
217 # special type that allows 1db per namespace
198 new_region.configure(
218 new_region.configure(
199 backend='dogpile.cache.rc.file_namespace',
219 backend='dogpile.cache.rc.file_namespace',
200 expiration_time=expiration_time,
220 expiration_time=expiration_time,
201 arguments={"filename": namespace_filename}
221 arguments={"filename": namespace_filename}
202 )
222 )
203
223
204 # create and save in region caches
224 # create and save in region caches
205 log.debug('configuring new region: %s', region_uid_name)
225 log.debug('configuring new region: %s', region_uid_name)
206 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
226 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
207
227
228 region_obj._default_namespace = region_namespace
229 if use_async_runner:
230 region_obj.async_creation_runner = async_creation_runner
208 return region_obj
231 return region_obj
209
232
210
233
211 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
234 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str):
212 region = get_or_create_region(cache_region, cache_namespace_uid)
235 from . import CLEAR_DELETE, CLEAR_INVALIDATE
213 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
236
214 num_delete_keys = len(cache_keys)
237 if not isinstance(cache_region, RhodeCodeCacheRegion):
215 if invalidate:
238 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
216 region.invalidate(hard=False)
239 log.debug('clearing cache region: %s with method=%s', cache_region, method)
217 else:
240
218 if num_delete_keys:
241 num_affected_keys = None
219 region.delete_multi(cache_keys)
242
220 return num_delete_keys
243 if method == CLEAR_INVALIDATE:
244 # NOTE: The CacheRegion.invalidate() method’s default mode of
245 # operation is to set a timestamp local to this CacheRegion in this Python process only.
246 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
247 cache_region.invalidate(hard=True)
248
249 if method == CLEAR_DELETE:
250 cache_keys = cache_region.backend.list_keys(prefix=cache_namespace_uid)
251 num_affected_keys = len(cache_keys)
252 if num_affected_keys:
253 cache_region.delete_multi(cache_keys)
254
255 return num_affected_keys
221
256
222
257
223 class ActiveRegionCache(object):
258 class ActiveRegionCache(object):
224 def __init__(self, context, cache_data):
259 def __init__(self, context, cache_data):
225 self.context = context
260 self.context = context
226 self.cache_data = cache_data
261 self.cache_data = cache_data
227
262
228 def should_invalidate(self):
263 def should_invalidate(self):
229 return False
264 return False
230
265
231
266
232 class FreshRegionCache(object):
267 class FreshRegionCache(object):
233 def __init__(self, context, cache_data):
268 def __init__(self, context, cache_data):
234 self.context = context
269 self.context = context
235 self.cache_data = cache_data
270 self.cache_data = cache_data
236
271
237 def should_invalidate(self):
272 def should_invalidate(self):
238 return True
273 return True
239
274
240
275
241 class InvalidationContext(object):
276 class InvalidationContext(object):
242 """
277 """
243 usage::
278 usage::
244
279
245 from rhodecode.lib import rc_cache
280 from rhodecode.lib import rc_cache
246
281
247 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
282 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
248 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
283 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
249
284
250 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
285 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
251 def heavy_compute(cache_name, param1, param2):
286 def heavy_compute(cache_name, param1, param2):
252 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
287 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
253
288
254 # invalidation namespace is shared namespace key for all process caches
289 # invalidation namespace is shared namespace key for all process caches
255 # we use it to send a global signal
290 # we use it to send a global signal
256 invalidation_namespace = 'repo_cache:1'
291 invalidation_namespace = 'repo_cache:1'
257
292
258 inv_context_manager = rc_cache.InvalidationContext(
293 inv_context_manager = rc_cache.InvalidationContext(
259 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
294 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
260 with inv_context_manager as invalidation_context:
295 with inv_context_manager as invalidation_context:
261 args = ('one', 'two')
296 args = ('one', 'two')
262 # re-compute and store cache if we get invalidate signal
297 # re-compute and store cache if we get invalidate signal
263 if invalidation_context.should_invalidate():
298 if invalidation_context.should_invalidate():
264 result = heavy_compute.refresh(*args)
299 result = heavy_compute.refresh(*args)
265 else:
300 else:
266 result = heavy_compute(*args)
301 result = heavy_compute(*args)
267
302
268 compute_time = inv_context_manager.compute_time
303 compute_time = inv_context_manager.compute_time
269 log.debug('result computed in %.4fs', compute_time)
304 log.debug('result computed in %.4fs', compute_time)
270
305
271 # To send global invalidation signal, simply run
306 # To send global invalidation signal, simply run
272 CacheKey.set_invalidate(invalidation_namespace)
307 CacheKey.set_invalidate(invalidation_namespace)
273
308
274 """
309 """
275
310
276 def __repr__(self):
311 def __repr__(self):
277 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
312 return f'<InvalidationContext:{self.cache_key}[{self.uid}]>'
278
313
279 def __init__(self, uid, invalidation_namespace='',
314 def __init__(self, uid, invalidation_namespace='',
280 raise_exception=False, thread_scoped=None):
315 raise_exception=False, thread_scoped=None):
281 self.uid = uid
316 self.uid = uid
282 self.invalidation_namespace = invalidation_namespace
317 self.invalidation_namespace = invalidation_namespace
283 self.raise_exception = raise_exception
318 self.raise_exception = raise_exception
284 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
319 self.proc_id = rhodecode.CONFIG.get('instance_id') or 'DEFAULT'
285 self.thread_id = 'global'
320 self.thread_id = 'global'
286
321
287 if thread_scoped is None:
322 if thread_scoped is None:
288 # if we set "default" we can override this via .ini settings
323 # if we set "default" we can override this via .ini settings
289 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
324 thread_scoped = rhodecode.ConfigGet().get_bool('cache_thread_scoped')
290
325
291 # Append the thread id to the cache key if this invalidation context
326 # Append the thread id to the cache key if this invalidation context
292 # should be scoped to the current thread.
327 # should be scoped to the current thread.
293 if thread_scoped is True:
328 if thread_scoped is True:
294 self.thread_id = threading.current_thread().ident
329 self.thread_id = threading.current_thread().ident
295
330
296 self.cache_key = compute_key_from_params(uid)
331 self.cache_key = compute_key_from_params(uid)
297 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
332 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
298 self.proc_id, self.thread_id, self.cache_key)
333 self.proc_id, self.thread_id, self.cache_key)
299 self.proc_key = 'proc:{}'.format(self.proc_id)
334 self.proc_key = 'proc:{}'.format(self.proc_id)
300 self.compute_time = 0
335 self.compute_time = 0
301
336
302 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
337 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
303 from rhodecode.model.db import CacheKey
338 from rhodecode.model.db import CacheKey
304
339
305 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
340 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
306 # fetch all cache keys for this namespace and convert them to a map to find if we
341 # fetch all cache keys for this namespace and convert them to a map to find if we
307 # have specific cache_key object registered. We do this because we want to have
342 # have specific cache_key object registered. We do this because we want to have
308 # all consistent cache_state_uid for newly registered objects
343 # all consistent cache_state_uid for newly registered objects
309 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
344 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
310 cache_obj = cache_obj_map.get(self.cache_key)
345 cache_obj = cache_obj_map.get(self.cache_key)
311 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
346 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
312
347
313 if not cache_obj:
348 if not cache_obj:
314 new_cache_args = invalidation_namespace
349 new_cache_args = invalidation_namespace
315 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
350 first_cache_obj = next(iter(cache_obj_map.values())) if cache_obj_map else None
316 cache_state_uid = None
351 cache_state_uid = None
317 if first_cache_obj:
352 if first_cache_obj:
318 cache_state_uid = first_cache_obj.cache_state_uid
353 cache_state_uid = first_cache_obj.cache_state_uid
319 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
354 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
320 cache_state_uid=cache_state_uid)
355 cache_state_uid=cache_state_uid)
321 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
356 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
322
357
323 return cache_obj
358 return cache_obj
324
359
325 def __enter__(self):
360 def __enter__(self):
326 """
361 """
327 Test if current object is valid, and return CacheRegion function
362 Test if current object is valid, and return CacheRegion function
328 that does invalidation and calculation
363 that does invalidation and calculation
329 """
364 """
330 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
365 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
331 # register or get a new key based on uid
366 # register or get a new key based on uid
332 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
367 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
333 cache_data = self.cache_obj.get_dict()
368 cache_data = self.cache_obj.get_dict()
334 self._start_time = time.time()
369 self._start_time = time.time()
335 if self.cache_obj.cache_active:
370 if self.cache_obj.cache_active:
336 # means our cache obj is existing and marked as it's
371 # means our cache obj is existing and marked as it's
337 # cache is not outdated, we return ActiveRegionCache
372 # cache is not outdated, we return ActiveRegionCache
338 self.skip_cache_active_change = True
373 self.skip_cache_active_change = True
339
374
340 return ActiveRegionCache(context=self, cache_data=cache_data)
375 return ActiveRegionCache(context=self, cache_data=cache_data)
341
376
342 # the key is either not existing or set to False, we return
377 # the key is either not existing or set to False, we return
343 # the real invalidator which re-computes value. We additionally set
378 # the real invalidator which re-computes value. We additionally set
344 # the flag to actually update the Database objects
379 # the flag to actually update the Database objects
345 self.skip_cache_active_change = False
380 self.skip_cache_active_change = False
346 return FreshRegionCache(context=self, cache_data=cache_data)
381 return FreshRegionCache(context=self, cache_data=cache_data)
347
382
348 def __exit__(self, exc_type, exc_val, exc_tb):
383 def __exit__(self, exc_type, exc_val, exc_tb):
349 from rhodecode.model.db import Session, IntegrityError
384 from rhodecode.model.db import IntegrityError, Session
350
385
351 # save compute time
386 # save compute time
352 self.compute_time = time.time() - self._start_time
387 self.compute_time = time.time() - self._start_time
353
388
354 if self.skip_cache_active_change:
389 if self.skip_cache_active_change:
355 return
390 return
356
391
357 try:
392 try:
358 self.cache_obj.cache_active = True
393 self.cache_obj.cache_active = True
359 Session().add(self.cache_obj)
394 Session().add(self.cache_obj)
360 Session().commit()
395 Session().commit()
361 except IntegrityError:
396 except IntegrityError:
362 # if we catch integrity error, it means we inserted this object
397 # if we catch integrity error, it means we inserted this object
363 # assumption is that's really an edge race-condition case and
398 # assumption is that's really an edge race-condition case and
364 # it's safe is to skip it
399 # it's safe is to skip it
365 Session().rollback()
400 Session().rollback()
366 except Exception:
401 except Exception:
367 log.exception('Failed to commit on cache key update')
402 log.exception('Failed to commit on cache key update')
368 Session().rollback()
403 Session().rollback()
369 if self.raise_exception:
404 if self.raise_exception:
370 raise
405 raise
General Comments 0
You need to be logged in to leave comments. Login now