##// END OF EJS Templates
caches: use a better logic to clear cache keys
super-admin -
r4845:58a1f157 default
parent child Browse files
Show More
@@ -1,45 +1,51 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import atexit
22 import atexit
23 import logging
23 import logging
24 import signal
24
25
25 log = logging.getLogger(__name__)
26 log = logging.getLogger(__name__)
26
27
27 cache_keys_by_pid = []
28 cache_keys_by_pid = set()
28
29
29
30
30 @atexit.register
31 def free_cache_keys(*args):
31 def free_cache_keys():
32 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
32 ssh_cmd = os.environ.get('RC_CMD_SSH_WRAPPER')
33 if ssh_cmd:
33 if ssh_cmd:
34 return
34 return
35
35
36 from rhodecode.model.db import Session, CacheKey
36 from rhodecode.model.db import Session, CacheKey
37 log.info('Clearing %s cache keys', len(cache_keys_by_pid))
37 log.info('Clearing %s cache keys', len(cache_keys_by_pid))
38
38
39 if cache_keys_by_pid:
39 if cache_keys_by_pid:
40 try:
40 try:
41 for cache_key in cache_keys_by_pid:
41 for cache_proc in cache_keys_by_pid:
42 CacheKey.query().filter(CacheKey.cache_key == cache_key).delete()
42 CacheKey.query().filter(CacheKey.cache_key.startswith(cache_proc)).delete()
43 Session().commit()
43 Session().commit()
44 cache_keys_by_pid.clear()
44 except Exception:
45 except Exception:
45 log.warn('Failed to clear keys, exiting gracefully')
46 log.warn('Failed to clear keys, exiting gracefully')
47
48
49 atexit.register(free_cache_keys)
50 signal.signal(signal.SIGTERM, free_cache_keys)
51 signal.signal(signal.SIGINT, free_cache_keys)
@@ -1,422 +1,423 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2020 RhodeCode GmbH
3 # Copyright (C) 2015-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import threading
24 import threading
25
25
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from rhodecode.lib.rc_cache import cache_key_meta
34 from rhodecode.lib.rc_cache import cache_key_meta
35 from rhodecode.lib.rc_cache import region_meta
35 from rhodecode.lib.rc_cache import region_meta
36
36
37 log = logging.getLogger(__name__)
37 log = logging.getLogger(__name__)
38
38
39
39
40 def isCython(func):
40 def isCython(func):
41 """
41 """
42 Private helper that checks if a function is a cython function.
42 Private helper that checks if a function is a cython function.
43 """
43 """
44 return func.__class__.__name__ == 'cython_function_or_method'
44 return func.__class__.__name__ == 'cython_function_or_method'
45
45
46
46
47 class RhodeCodeCacheRegion(CacheRegion):
47 class RhodeCodeCacheRegion(CacheRegion):
48
48
49 def conditional_cache_on_arguments(
49 def conditional_cache_on_arguments(
50 self, namespace=None,
50 self, namespace=None,
51 expiration_time=None,
51 expiration_time=None,
52 should_cache_fn=None,
52 should_cache_fn=None,
53 to_str=compat.string_type,
53 to_str=compat.string_type,
54 function_key_generator=None,
54 function_key_generator=None,
55 condition=True):
55 condition=True):
56 """
56 """
57 Custom conditional decorator, that will not touch any dogpile internals if
57 Custom conditional decorator, that will not touch any dogpile internals if
58 condition isn't meet. This works a bit different than should_cache_fn
58 condition isn't meet. This works a bit different than should_cache_fn
59 And it's faster in cases we don't ever want to compute cached values
59 And it's faster in cases we don't ever want to compute cached values
60 """
60 """
61 expiration_time_is_callable = compat.callable(expiration_time)
61 expiration_time_is_callable = compat.callable(expiration_time)
62
62
63 if function_key_generator is None:
63 if function_key_generator is None:
64 function_key_generator = self.function_key_generator
64 function_key_generator = self.function_key_generator
65
65
66 # workaround for py2 and cython problems, this block should be removed
66 # workaround for py2 and cython problems, this block should be removed
67 # once we've migrated to py3
67 # once we've migrated to py3
68 if 'cython' == 'cython':
68 if 'cython' == 'cython':
69 def decorator(fn):
69 def decorator(fn):
70 if to_str is compat.string_type:
70 if to_str is compat.string_type:
71 # backwards compatible
71 # backwards compatible
72 key_generator = function_key_generator(namespace, fn)
72 key_generator = function_key_generator(namespace, fn)
73 else:
73 else:
74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75
75
76 @functools.wraps(fn)
76 @functools.wraps(fn)
77 def decorate(*arg, **kw):
77 def decorate(*arg, **kw):
78 key = key_generator(*arg, **kw)
78 key = key_generator(*arg, **kw)
79
79
80 @functools.wraps(fn)
80 @functools.wraps(fn)
81 def creator():
81 def creator():
82 return fn(*arg, **kw)
82 return fn(*arg, **kw)
83
83
84 if not condition:
84 if not condition:
85 return creator()
85 return creator()
86
86
87 timeout = expiration_time() if expiration_time_is_callable \
87 timeout = expiration_time() if expiration_time_is_callable \
88 else expiration_time
88 else expiration_time
89
89
90 return self.get_or_create(key, creator, timeout, should_cache_fn)
90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91
91
92 def invalidate(*arg, **kw):
92 def invalidate(*arg, **kw):
93 key = key_generator(*arg, **kw)
93 key = key_generator(*arg, **kw)
94 self.delete(key)
94 self.delete(key)
95
95
96 def set_(value, *arg, **kw):
96 def set_(value, *arg, **kw):
97 key = key_generator(*arg, **kw)
97 key = key_generator(*arg, **kw)
98 self.set(key, value)
98 self.set(key, value)
99
99
100 def get(*arg, **kw):
100 def get(*arg, **kw):
101 key = key_generator(*arg, **kw)
101 key = key_generator(*arg, **kw)
102 return self.get(key)
102 return self.get(key)
103
103
104 def refresh(*arg, **kw):
104 def refresh(*arg, **kw):
105 key = key_generator(*arg, **kw)
105 key = key_generator(*arg, **kw)
106 value = fn(*arg, **kw)
106 value = fn(*arg, **kw)
107 self.set(key, value)
107 self.set(key, value)
108 return value
108 return value
109
109
110 decorate.set = set_
110 decorate.set = set_
111 decorate.invalidate = invalidate
111 decorate.invalidate = invalidate
112 decorate.refresh = refresh
112 decorate.refresh = refresh
113 decorate.get = get
113 decorate.get = get
114 decorate.original = fn
114 decorate.original = fn
115 decorate.key_generator = key_generator
115 decorate.key_generator = key_generator
116 decorate.__wrapped__ = fn
116 decorate.__wrapped__ = fn
117
117
118 return decorate
118 return decorate
119 return decorator
119 return decorator
120
120
121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
122
122
123 if not condition:
123 if not condition:
124 log.debug('Calling un-cached method:%s', user_func.func_name)
124 log.debug('Calling un-cached method:%s', user_func.func_name)
125 start = time.time()
125 start = time.time()
126 result = user_func(*arg, **kw)
126 result = user_func(*arg, **kw)
127 total = time.time() - start
127 total = time.time() - start
128 log.debug('un-cached method:%s took %.4fs', user_func.func_name, total)
128 log.debug('un-cached method:%s took %.4fs', user_func.func_name, total)
129 return result
129 return result
130
130
131 key = key_generator(*arg, **kw)
131 key = key_generator(*arg, **kw)
132
132
133 timeout = expiration_time() if expiration_time_is_callable \
133 timeout = expiration_time() if expiration_time_is_callable \
134 else expiration_time
134 else expiration_time
135
135
136 log.debug('Calling cached method:`%s`', user_func.func_name)
136 log.debug('Calling cached method:`%s`', user_func.func_name)
137 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
137 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
138
138
139 def cache_decorator(user_func):
139 def cache_decorator(user_func):
140 if to_str is compat.string_type:
140 if to_str is compat.string_type:
141 # backwards compatible
141 # backwards compatible
142 key_generator = function_key_generator(namespace, user_func)
142 key_generator = function_key_generator(namespace, user_func)
143 else:
143 else:
144 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
144 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
145
145
146 def refresh(*arg, **kw):
146 def refresh(*arg, **kw):
147 """
147 """
148 Like invalidate, but regenerates the value instead
148 Like invalidate, but regenerates the value instead
149 """
149 """
150 key = key_generator(*arg, **kw)
150 key = key_generator(*arg, **kw)
151 value = user_func(*arg, **kw)
151 value = user_func(*arg, **kw)
152 self.set(key, value)
152 self.set(key, value)
153 return value
153 return value
154
154
155 def invalidate(*arg, **kw):
155 def invalidate(*arg, **kw):
156 key = key_generator(*arg, **kw)
156 key = key_generator(*arg, **kw)
157 self.delete(key)
157 self.delete(key)
158
158
159 def set_(value, *arg, **kw):
159 def set_(value, *arg, **kw):
160 key = key_generator(*arg, **kw)
160 key = key_generator(*arg, **kw)
161 self.set(key, value)
161 self.set(key, value)
162
162
163 def get(*arg, **kw):
163 def get(*arg, **kw):
164 key = key_generator(*arg, **kw)
164 key = key_generator(*arg, **kw)
165 return self.get(key)
165 return self.get(key)
166
166
167 user_func.set = set_
167 user_func.set = set_
168 user_func.invalidate = invalidate
168 user_func.invalidate = invalidate
169 user_func.get = get
169 user_func.get = get
170 user_func.refresh = refresh
170 user_func.refresh = refresh
171 user_func.key_generator = key_generator
171 user_func.key_generator = key_generator
172 user_func.original = user_func
172 user_func.original = user_func
173
173
174 # Use `decorate` to preserve the signature of :param:`user_func`.
174 # Use `decorate` to preserve the signature of :param:`user_func`.
175 return decorator.decorate(user_func, functools.partial(
175 return decorator.decorate(user_func, functools.partial(
176 get_or_create_for_user_func, key_generator))
176 get_or_create_for_user_func, key_generator))
177
177
178 return cache_decorator
178 return cache_decorator
179
179
180
180
181 def make_region(*arg, **kw):
181 def make_region(*arg, **kw):
182 return RhodeCodeCacheRegion(*arg, **kw)
182 return RhodeCodeCacheRegion(*arg, **kw)
183
183
184
184
185 def get_default_cache_settings(settings, prefixes=None):
185 def get_default_cache_settings(settings, prefixes=None):
186 prefixes = prefixes or []
186 prefixes = prefixes or []
187 cache_settings = {}
187 cache_settings = {}
188 for key in settings.keys():
188 for key in settings.keys():
189 for prefix in prefixes:
189 for prefix in prefixes:
190 if key.startswith(prefix):
190 if key.startswith(prefix):
191 name = key.split(prefix)[1].strip()
191 name = key.split(prefix)[1].strip()
192 val = settings[key]
192 val = settings[key]
193 if isinstance(val, compat.string_types):
193 if isinstance(val, compat.string_types):
194 val = val.strip()
194 val = val.strip()
195 cache_settings[name] = val
195 cache_settings[name] = val
196 return cache_settings
196 return cache_settings
197
197
198
198
199 def compute_key_from_params(*args):
199 def compute_key_from_params(*args):
200 """
200 """
201 Helper to compute key from given params to be used in cache manager
201 Helper to compute key from given params to be used in cache manager
202 """
202 """
203 return sha1("_".join(map(safe_str, args)))
203 return sha1("_".join(map(safe_str, args)))
204
204
205
205
206 def backend_key_generator(backend):
206 def backend_key_generator(backend):
207 """
207 """
208 Special wrapper that also sends over the backend to the key generator
208 Special wrapper that also sends over the backend to the key generator
209 """
209 """
210 def wrapper(namespace, fn):
210 def wrapper(namespace, fn):
211 return key_generator(backend, namespace, fn)
211 return key_generator(backend, namespace, fn)
212 return wrapper
212 return wrapper
213
213
214
214
215 def key_generator(backend, namespace, fn):
215 def key_generator(backend, namespace, fn):
216 fname = fn.__name__
216 fname = fn.__name__
217
217
218 def generate_key(*args):
218 def generate_key(*args):
219 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
219 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
220 namespace_pref = namespace or 'default_namespace'
220 namespace_pref = namespace or 'default_namespace'
221 arg_key = compute_key_from_params(*args)
221 arg_key = compute_key_from_params(*args)
222 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
222 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
223
223
224 return final_key
224 return final_key
225
225
226 return generate_key
226 return generate_key
227
227
228
228
229 def get_or_create_region(region_name, region_namespace=None):
229 def get_or_create_region(region_name, region_namespace=None):
230 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
230 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
231 region_obj = region_meta.dogpile_cache_regions.get(region_name)
231 region_obj = region_meta.dogpile_cache_regions.get(region_name)
232 if not region_obj:
232 if not region_obj:
233 raise EnvironmentError(
233 raise EnvironmentError(
234 'Region `{}` not in configured: {}.'.format(
234 'Region `{}` not in configured: {}.'.format(
235 region_name, region_meta.dogpile_cache_regions.keys()))
235 region_name, region_meta.dogpile_cache_regions.keys()))
236
236
237 region_uid_name = '{}:{}'.format(region_name, region_namespace)
237 region_uid_name = '{}:{}'.format(region_name, region_namespace)
238 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
238 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
239 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
239 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
240 if region_exist:
240 if region_exist:
241 log.debug('Using already configured region: %s', region_namespace)
241 log.debug('Using already configured region: %s', region_namespace)
242 return region_exist
242 return region_exist
243 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
243 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
244 expiration_time = region_obj.expiration_time
244 expiration_time = region_obj.expiration_time
245
245
246 if not os.path.isdir(cache_dir):
246 if not os.path.isdir(cache_dir):
247 os.makedirs(cache_dir)
247 os.makedirs(cache_dir)
248 new_region = make_region(
248 new_region = make_region(
249 name=region_uid_name,
249 name=region_uid_name,
250 function_key_generator=backend_key_generator(region_obj.actual_backend)
250 function_key_generator=backend_key_generator(region_obj.actual_backend)
251 )
251 )
252 namespace_filename = os.path.join(
252 namespace_filename = os.path.join(
253 cache_dir, "{}.cache.dbm".format(region_namespace))
253 cache_dir, "{}.cache.dbm".format(region_namespace))
254 # special type that allows 1db per namespace
254 # special type that allows 1db per namespace
255 new_region.configure(
255 new_region.configure(
256 backend='dogpile.cache.rc.file_namespace',
256 backend='dogpile.cache.rc.file_namespace',
257 expiration_time=expiration_time,
257 expiration_time=expiration_time,
258 arguments={"filename": namespace_filename}
258 arguments={"filename": namespace_filename}
259 )
259 )
260
260
261 # create and save in region caches
261 # create and save in region caches
262 log.debug('configuring new region: %s', region_uid_name)
262 log.debug('configuring new region: %s', region_uid_name)
263 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
263 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
264
264
265 return region_obj
265 return region_obj
266
266
267
267
268 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
268 def clear_cache_namespace(cache_region, cache_namespace_uid, invalidate=False):
269 region = get_or_create_region(cache_region, cache_namespace_uid)
269 region = get_or_create_region(cache_region, cache_namespace_uid)
270 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
270 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
271 num_delete_keys = len(cache_keys)
271 num_delete_keys = len(cache_keys)
272 if invalidate:
272 if invalidate:
273 region.invalidate(hard=False)
273 region.invalidate(hard=False)
274 else:
274 else:
275 if num_delete_keys:
275 if num_delete_keys:
276 region.delete_multi(cache_keys)
276 region.delete_multi(cache_keys)
277 return num_delete_keys
277 return num_delete_keys
278
278
279
279
280 class ActiveRegionCache(object):
280 class ActiveRegionCache(object):
281 def __init__(self, context, cache_data):
281 def __init__(self, context, cache_data):
282 self.context = context
282 self.context = context
283 self.cache_data = cache_data
283 self.cache_data = cache_data
284
284
285 def should_invalidate(self):
285 def should_invalidate(self):
286 return False
286 return False
287
287
288
288
289 class FreshRegionCache(object):
289 class FreshRegionCache(object):
290 def __init__(self, context, cache_data):
290 def __init__(self, context, cache_data):
291 self.context = context
291 self.context = context
292 self.cache_data = cache_data
292 self.cache_data = cache_data
293
293
294 def should_invalidate(self):
294 def should_invalidate(self):
295 return True
295 return True
296
296
297
297
298 class InvalidationContext(object):
298 class InvalidationContext(object):
299 """
299 """
300 usage::
300 usage::
301
301
302 from rhodecode.lib import rc_cache
302 from rhodecode.lib import rc_cache
303
303
304 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
304 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
305 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
305 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
306
306
307 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
307 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
308 def heavy_compute(cache_name, param1, param2):
308 def heavy_compute(cache_name, param1, param2):
309 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
309 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
310
310
311 # invalidation namespace is shared namespace key for all process caches
311 # invalidation namespace is shared namespace key for all process caches
312 # we use it to send a global signal
312 # we use it to send a global signal
313 invalidation_namespace = 'repo_cache:1'
313 invalidation_namespace = 'repo_cache:1'
314
314
315 inv_context_manager = rc_cache.InvalidationContext(
315 inv_context_manager = rc_cache.InvalidationContext(
316 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
316 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
317 with inv_context_manager as invalidation_context:
317 with inv_context_manager as invalidation_context:
318 args = ('one', 'two')
318 args = ('one', 'two')
319 # re-compute and store cache if we get invalidate signal
319 # re-compute and store cache if we get invalidate signal
320 if invalidation_context.should_invalidate():
320 if invalidation_context.should_invalidate():
321 result = heavy_compute.refresh(*args)
321 result = heavy_compute.refresh(*args)
322 else:
322 else:
323 result = heavy_compute(*args)
323 result = heavy_compute(*args)
324
324
325 compute_time = inv_context_manager.compute_time
325 compute_time = inv_context_manager.compute_time
326 log.debug('result computed in %.4fs', compute_time)
326 log.debug('result computed in %.4fs', compute_time)
327
327
328 # To send global invalidation signal, simply run
328 # To send global invalidation signal, simply run
329 CacheKey.set_invalidate(invalidation_namespace)
329 CacheKey.set_invalidate(invalidation_namespace)
330
330
331 """
331 """
332
332
333 def __repr__(self):
333 def __repr__(self):
334 return '<InvalidationContext:{}[{}]>'.format(
334 return '<InvalidationContext:{}[{}]>'.format(
335 safe_str(self.cache_key), safe_str(self.uid))
335 safe_str(self.cache_key), safe_str(self.uid))
336
336
337 def __init__(self, uid, invalidation_namespace='',
337 def __init__(self, uid, invalidation_namespace='',
338 raise_exception=False, thread_scoped=None):
338 raise_exception=False, thread_scoped=None):
339 self.uid = uid
339 self.uid = uid
340 self.invalidation_namespace = invalidation_namespace
340 self.invalidation_namespace = invalidation_namespace
341 self.raise_exception = raise_exception
341 self.raise_exception = raise_exception
342 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
342 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
343 self.thread_id = 'global'
343 self.thread_id = 'global'
344
344
345 if thread_scoped is None:
345 if thread_scoped is None:
346 # if we set "default" we can override this via .ini settings
346 # if we set "default" we can override this via .ini settings
347 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
347 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
348
348
349 # Append the thread id to the cache key if this invalidation context
349 # Append the thread id to the cache key if this invalidation context
350 # should be scoped to the current thread.
350 # should be scoped to the current thread.
351 if thread_scoped is True:
351 if thread_scoped is True:
352 self.thread_id = threading.current_thread().ident
352 self.thread_id = threading.current_thread().ident
353
353
354 self.cache_key = compute_key_from_params(uid)
354 self.cache_key = compute_key_from_params(uid)
355 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
355 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
356 self.proc_id, self.thread_id, self.cache_key)
356 self.proc_id, self.thread_id, self.cache_key)
357 self.proc_key = 'proc:{}'.format(self.proc_id)
357 self.compute_time = 0
358 self.compute_time = 0
358
359
359 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
360 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
360 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
361 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
361 # fetch all cache keys for this namespace and convert them to a map to find if we
362 # fetch all cache keys for this namespace and convert them to a map to find if we
362 # have specific cache_key object registered. We do this because we want to have
363 # have specific cache_key object registered. We do this because we want to have
363 # all consistent cache_state_uid for newly registered objects
364 # all consistent cache_state_uid for newly registered objects
364 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
365 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
365 cache_obj = cache_obj_map.get(self.cache_key)
366 cache_obj = cache_obj_map.get(self.cache_key)
366 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
367 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
367 if not cache_obj:
368 if not cache_obj:
368 new_cache_args = invalidation_namespace
369 new_cache_args = invalidation_namespace
369 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
370 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
370 cache_state_uid = None
371 cache_state_uid = None
371 if first_cache_obj:
372 if first_cache_obj:
372 cache_state_uid = first_cache_obj.cache_state_uid
373 cache_state_uid = first_cache_obj.cache_state_uid
373 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
374 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
374 cache_state_uid=cache_state_uid)
375 cache_state_uid=cache_state_uid)
375 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
376 cache_key_meta.cache_keys_by_pid.add(self.proc_key)
376
377
377 return cache_obj
378 return cache_obj
378
379
379 def __enter__(self):
380 def __enter__(self):
380 """
381 """
381 Test if current object is valid, and return CacheRegion function
382 Test if current object is valid, and return CacheRegion function
382 that does invalidation and calculation
383 that does invalidation and calculation
383 """
384 """
384 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
385 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
385 # register or get a new key based on uid
386 # register or get a new key based on uid
386 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
387 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
387 cache_data = self.cache_obj.get_dict()
388 cache_data = self.cache_obj.get_dict()
388 self._start_time = time.time()
389 self._start_time = time.time()
389 if self.cache_obj.cache_active:
390 if self.cache_obj.cache_active:
390 # means our cache obj is existing and marked as it's
391 # means our cache obj is existing and marked as it's
391 # cache is not outdated, we return ActiveRegionCache
392 # cache is not outdated, we return ActiveRegionCache
392 self.skip_cache_active_change = True
393 self.skip_cache_active_change = True
393
394
394 return ActiveRegionCache(context=self, cache_data=cache_data)
395 return ActiveRegionCache(context=self, cache_data=cache_data)
395
396
396 # the key is either not existing or set to False, we return
397 # the key is either not existing or set to False, we return
397 # the real invalidator which re-computes value. We additionally set
398 # the real invalidator which re-computes value. We additionally set
398 # the flag to actually update the Database objects
399 # the flag to actually update the Database objects
399 self.skip_cache_active_change = False
400 self.skip_cache_active_change = False
400 return FreshRegionCache(context=self, cache_data=cache_data)
401 return FreshRegionCache(context=self, cache_data=cache_data)
401
402
402 def __exit__(self, exc_type, exc_val, exc_tb):
403 def __exit__(self, exc_type, exc_val, exc_tb):
403 # save compute time
404 # save compute time
404 self.compute_time = time.time() - self._start_time
405 self.compute_time = time.time() - self._start_time
405
406
406 if self.skip_cache_active_change:
407 if self.skip_cache_active_change:
407 return
408 return
408
409
409 try:
410 try:
410 self.cache_obj.cache_active = True
411 self.cache_obj.cache_active = True
411 Session().add(self.cache_obj)
412 Session().add(self.cache_obj)
412 Session().commit()
413 Session().commit()
413 except IntegrityError:
414 except IntegrityError:
414 # if we catch integrity error, it means we inserted this object
415 # if we catch integrity error, it means we inserted this object
415 # assumption is that's really an edge race-condition case and
416 # assumption is that's really an edge race-condition case and
416 # it's safe is to skip it
417 # it's safe is to skip it
417 Session().rollback()
418 Session().rollback()
418 except Exception:
419 except Exception:
419 log.exception('Failed to commit on cache key update')
420 log.exception('Failed to commit on cache key update')
420 Session().rollback()
421 Session().rollback()
421 if self.raise_exception:
422 if self.raise_exception:
422 raise
423 raise
General Comments 0
You need to be logged in to leave comments. Login now