##// END OF EJS Templates
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
marcink -
r2935:47998ee0 default
parent child Browse files
Show More
@@ -1,314 +1,318 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import logging
21 import logging
22 import functools
22 import functools
23 import threading
23 import threading
24
24
25 from dogpile.cache import CacheRegion
25 from dogpile.cache import CacheRegion
26 from dogpile.cache.util import compat
26 from dogpile.cache.util import compat
27
27
28 import rhodecode
28 import rhodecode
29 from rhodecode.lib.utils import safe_str, sha1
29 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils2 import safe_unicode
30 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.model.db import Session, CacheKey, IntegrityError
31 from rhodecode.model.db import Session, CacheKey, IntegrityError
32
32
33 from . import region_meta
33 from . import region_meta
34
34
35 log = logging.getLogger(__name__)
35 log = logging.getLogger(__name__)
36
36
37
37
38 class RhodeCodeCacheRegion(CacheRegion):
38 class RhodeCodeCacheRegion(CacheRegion):
39
39
40 def conditional_cache_on_arguments(
40 def conditional_cache_on_arguments(
41 self, namespace=None,
41 self, namespace=None,
42 expiration_time=None,
42 expiration_time=None,
43 should_cache_fn=None,
43 should_cache_fn=None,
44 to_str=compat.string_type,
44 to_str=compat.string_type,
45 function_key_generator=None,
45 function_key_generator=None,
46 condition=True):
46 condition=True):
47 """
47 """
48 Custom conditional decorator, that will not touch any dogpile internals if
48 Custom conditional decorator, that will not touch any dogpile internals if
49 condition isn't meet. This works a bit different than should_cache_fn
49 condition isn't meet. This works a bit different than should_cache_fn
50 And it's faster in cases we don't ever want to compute cached values
50 And it's faster in cases we don't ever want to compute cached values
51 """
51 """
52 expiration_time_is_callable = compat.callable(expiration_time)
52 expiration_time_is_callable = compat.callable(expiration_time)
53
53
54 if function_key_generator is None:
54 if function_key_generator is None:
55 function_key_generator = self.function_key_generator
55 function_key_generator = self.function_key_generator
56
56
57 def decorator(fn):
57 def decorator(fn):
58 if to_str is compat.string_type:
58 if to_str is compat.string_type:
59 # backwards compatible
59 # backwards compatible
60 key_generator = function_key_generator(namespace, fn)
60 key_generator = function_key_generator(namespace, fn)
61 else:
61 else:
62 key_generator = function_key_generator(namespace, fn, to_str=to_str)
62 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63
63
64 @functools.wraps(fn)
64 @functools.wraps(fn)
65 def decorate(*arg, **kw):
65 def decorate(*arg, **kw):
66 key = key_generator(*arg, **kw)
66 key = key_generator(*arg, **kw)
67
67
68 @functools.wraps(fn)
68 @functools.wraps(fn)
69 def creator():
69 def creator():
70 return fn(*arg, **kw)
70 return fn(*arg, **kw)
71
71
72 if not condition:
72 if not condition:
73 return creator()
73 return creator()
74
74
75 timeout = expiration_time() if expiration_time_is_callable \
75 timeout = expiration_time() if expiration_time_is_callable \
76 else expiration_time
76 else expiration_time
77
77
78 return self.get_or_create(key, creator, timeout, should_cache_fn)
78 return self.get_or_create(key, creator, timeout, should_cache_fn)
79
79
80 def invalidate(*arg, **kw):
80 def invalidate(*arg, **kw):
81 key = key_generator(*arg, **kw)
81 key = key_generator(*arg, **kw)
82 self.delete(key)
82 self.delete(key)
83
83
84 def set_(value, *arg, **kw):
84 def set_(value, *arg, **kw):
85 key = key_generator(*arg, **kw)
85 key = key_generator(*arg, **kw)
86 self.set(key, value)
86 self.set(key, value)
87
87
88 def get(*arg, **kw):
88 def get(*arg, **kw):
89 key = key_generator(*arg, **kw)
89 key = key_generator(*arg, **kw)
90 return self.get(key)
90 return self.get(key)
91
91
92 def refresh(*arg, **kw):
92 def refresh(*arg, **kw):
93 key = key_generator(*arg, **kw)
93 key = key_generator(*arg, **kw)
94 value = fn(*arg, **kw)
94 value = fn(*arg, **kw)
95 self.set(key, value)
95 self.set(key, value)
96 return value
96 return value
97
97
98 decorate.set = set_
98 decorate.set = set_
99 decorate.invalidate = invalidate
99 decorate.invalidate = invalidate
100 decorate.refresh = refresh
100 decorate.refresh = refresh
101 decorate.get = get
101 decorate.get = get
102 decorate.original = fn
102 decorate.original = fn
103 decorate.key_generator = key_generator
103 decorate.key_generator = key_generator
104
104
105 return decorate
105 return decorate
106
106
107 return decorator
107 return decorator
108
108
109
109
110 def make_region(*arg, **kw):
110 def make_region(*arg, **kw):
111 return RhodeCodeCacheRegion(*arg, **kw)
111 return RhodeCodeCacheRegion(*arg, **kw)
112
112
113
113
114 def get_default_cache_settings(settings, prefixes=None):
114 def get_default_cache_settings(settings, prefixes=None):
115 prefixes = prefixes or []
115 prefixes = prefixes or []
116 cache_settings = {}
116 cache_settings = {}
117 for key in settings.keys():
117 for key in settings.keys():
118 for prefix in prefixes:
118 for prefix in prefixes:
119 if key.startswith(prefix):
119 if key.startswith(prefix):
120 name = key.split(prefix)[1].strip()
120 name = key.split(prefix)[1].strip()
121 val = settings[key]
121 val = settings[key]
122 if isinstance(val, basestring):
122 if isinstance(val, basestring):
123 val = val.strip()
123 val = val.strip()
124 cache_settings[name] = val
124 cache_settings[name] = val
125 return cache_settings
125 return cache_settings
126
126
127
127
128 def compute_key_from_params(*args):
128 def compute_key_from_params(*args):
129 """
129 """
130 Helper to compute key from given params to be used in cache manager
130 Helper to compute key from given params to be used in cache manager
131 """
131 """
132 return sha1("_".join(map(safe_str, args)))
132 return sha1("_".join(map(safe_str, args)))
133
133
134
134
135 def key_generator(namespace, fn):
135 def key_generator(namespace, fn):
136 fname = fn.__name__
136 fname = fn.__name__
137
137
138 def generate_key(*args):
138 def generate_key(*args):
139 namespace_pref = namespace or 'default'
139 namespace_pref = namespace or 'default'
140 arg_key = compute_key_from_params(*args)
140 arg_key = compute_key_from_params(*args)
141 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
141 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
142
142
143 return final_key
143 return final_key
144
144
145 return generate_key
145 return generate_key
146
146
147
147
148 def get_or_create_region(region_name, region_namespace=None):
148 def get_or_create_region(region_name, region_namespace=None):
149 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
149 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
150 region_obj = region_meta.dogpile_cache_regions.get(region_name)
150 region_obj = region_meta.dogpile_cache_regions.get(region_name)
151 if not region_obj:
151 if not region_obj:
152 raise EnvironmentError(
152 raise EnvironmentError(
153 'Region `{}` not in configured: {}.'.format(
153 'Region `{}` not in configured: {}.'.format(
154 region_name, region_meta.dogpile_cache_regions.keys()))
154 region_name, region_meta.dogpile_cache_regions.keys()))
155
155
156 region_uid_name = '{}:{}'.format(region_name, region_namespace)
156 region_uid_name = '{}:{}'.format(region_name, region_namespace)
157 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
157 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
158 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
158 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
159 if region_exist:
159 if region_exist:
160 log.debug('Using already configured region: %s', region_namespace)
160 log.debug('Using already configured region: %s', region_namespace)
161 return region_exist
161 return region_exist
162 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
162 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
163 expiration_time = region_obj.expiration_time
163 expiration_time = region_obj.expiration_time
164
164
165 if not os.path.isdir(cache_dir):
165 if not os.path.isdir(cache_dir):
166 os.makedirs(cache_dir)
166 os.makedirs(cache_dir)
167 new_region = make_region(
167 new_region = make_region(
168 name=region_uid_name, function_key_generator=key_generator
168 name=region_uid_name, function_key_generator=key_generator
169 )
169 )
170 namespace_filename = os.path.join(
170 namespace_filename = os.path.join(
171 cache_dir, "{}.cache.dbm".format(region_namespace))
171 cache_dir, "{}.cache.dbm".format(region_namespace))
172 # special type that allows 1db per namespace
172 # special type that allows 1db per namespace
173 new_region.configure(
173 new_region.configure(
174 backend='dogpile.cache.rc.file_namespace',
174 backend='dogpile.cache.rc.file_namespace',
175 expiration_time=expiration_time,
175 expiration_time=expiration_time,
176 arguments={"filename": namespace_filename}
176 arguments={"filename": namespace_filename}
177 )
177 )
178
178
179 # create and save in region caches
179 # create and save in region caches
180 log.debug('configuring new region: %s',region_uid_name)
180 log.debug('configuring new region: %s',region_uid_name)
181 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
181 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
182
182
183 return region_obj
183 return region_obj
184
184
185
185
186 def clear_cache_namespace(cache_region, cache_namespace_uid):
186 def clear_cache_namespace(cache_region, cache_namespace_uid):
187 region = get_or_create_region(cache_region, cache_namespace_uid)
187 region = get_or_create_region(cache_region, cache_namespace_uid)
188 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
188 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
189 region.delete_multi(cache_keys)
189 region.delete_multi(cache_keys)
190 return len(cache_keys)
190 return len(cache_keys)
191
191
192
192
193 class ActiveRegionCache(object):
193 class ActiveRegionCache(object):
194 def __init__(self, context):
194 def __init__(self, context):
195 self.context = context
195 self.context = context
196
196
197 def should_invalidate(self):
197 def should_invalidate(self):
198 return False
198 return False
199
199
200
200
201 class FreshRegionCache(object):
201 class FreshRegionCache(object):
202 def __init__(self, context):
202 def __init__(self, context):
203 self.context = context
203 self.context = context
204
204
205 def should_invalidate(self):
205 def should_invalidate(self):
206 return True
206 return True
207
207
208
208
209 class InvalidationContext(object):
209 class InvalidationContext(object):
210 """
210 """
211 usage::
211 usage::
212
212
213 import time
213 import time
214 from rhodecode.lib import rc_cache
214 from rhodecode.lib import rc_cache
215 my_id = 1
215 my_id = 1
216 cache_namespace_uid = 'cache_demo.{}'.format(my_id)
216 cache_namespace_uid = 'cache_demo.{}'.format(my_id)
217 invalidation_namespace = 'repo_cache:1'
217 invalidation_namespace = 'repo_cache:1'
218 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
218 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
219
219
220 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid,
220 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid,
221 expiration_time=30,
221 expiration_time=30,
222 condition=True)
222 condition=True)
223 def heavy_compute(cache_name, param1, param2):
223 def heavy_compute(cache_name, param1, param2):
224 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
224 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
225 import time
225 import time
226 time.sleep(30)
226 time.sleep(30)
227 return True
227 return True
228
228
229 start = time.time()
229 start = time.time()
230 inv_context_manager = rc_cache.InvalidationContext(
230 inv_context_manager = rc_cache.InvalidationContext(
231 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
231 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
232 with inv_context_manager as invalidation_context:
232 with inv_context_manager as invalidation_context:
233 # check for stored invalidation signal, and maybe purge the cache
233 # check for stored invalidation signal, and maybe purge the cache
234 # before computing it again
234 # before computing it again
235 if invalidation_context.should_invalidate():
235 if invalidation_context.should_invalidate():
236 heavy_compute.invalidate('some_name', 'param1', 'param2')
236 heavy_compute.invalidate('some_name', 'param1', 'param2')
237
237
238 result = heavy_compute('some_name', 'param1', 'param2')
238 result = heavy_compute('some_name', 'param1', 'param2')
239 compute_time = time.time() - start
239 compute_time = time.time() - start
240 print(compute_time)
240 print(compute_time)
241
241
242 # To send global invalidation signal, simply run
242 # To send global invalidation signal, simply run
243 CacheKey.set_invalidate(invalidation_namespace)
243 CacheKey.set_invalidate(invalidation_namespace)
244
244
245 """
245 """
246
246
247 def __repr__(self):
247 def __repr__(self):
248 return '<InvalidationContext:{}[{}]>'.format(
248 return '<InvalidationContext:{}[{}]>'.format(
249 safe_str(self.cache_key), safe_str(self.uid))
249 safe_str(self.cache_key), safe_str(self.uid))
250
250
251 def __init__(self, uid, invalidation_namespace='',
251 def __init__(self, uid, invalidation_namespace='',
252 raise_exception=False, thread_scoped=True):
252 raise_exception=False, thread_scoped=None):
253 self.uid = uid
253 self.uid = uid
254 self.invalidation_namespace = invalidation_namespace
254 self.invalidation_namespace = invalidation_namespace
255 self.raise_exception = raise_exception
255 self.raise_exception = raise_exception
256 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
256 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
257 self.thread_id = 'global'
257 self.thread_id = 'global'
258
258
259 if thread_scoped is None:
260 # if we set "default" we can override this via .ini settings
261 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
262
259 # Append the thread id to the cache key if this invalidation context
263 # Append the thread id to the cache key if this invalidation context
260 # should be scoped to the current thread.
264 # should be scoped to the current thread.
261 if thread_scoped:
265 if thread_scoped is True:
262 self.thread_id = threading.current_thread().ident
266 self.thread_id = threading.current_thread().ident
263
267
264 self.cache_key = compute_key_from_params(uid)
268 self.cache_key = compute_key_from_params(uid)
265 self.cache_key = 'proc:{}_thread:{}_{}'.format(
269 self.cache_key = 'proc:{}_thread:{}_{}'.format(
266 self.proc_id, self.thread_id, self.cache_key)
270 self.proc_id, self.thread_id, self.cache_key)
267
271
268 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
272 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
269 log.debug('Checking if %s cache key is present and active', self.cache_key)
273 log.debug('Checking if %s cache key is present and active', self.cache_key)
270 cache_obj = CacheKey.get_active_cache(self.cache_key)
274 cache_obj = CacheKey.get_active_cache(self.cache_key)
271 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
275 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
272 if not cache_obj:
276 if not cache_obj:
273 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
277 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
274 return cache_obj
278 return cache_obj
275
279
276 def __enter__(self):
280 def __enter__(self):
277 """
281 """
278 Test if current object is valid, and return CacheRegion function
282 Test if current object is valid, and return CacheRegion function
279 that does invalidation and calculation
283 that does invalidation and calculation
280 """
284 """
281 # register or get a new key based on uid
285 # register or get a new key based on uid
282 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
286 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
283
287
284 if self.cache_obj.cache_active:
288 if self.cache_obj.cache_active:
285 # means our cache obj is existing and marked as it's
289 # means our cache obj is existing and marked as it's
286 # cache is not outdated, we return ActiveRegionCache
290 # cache is not outdated, we return ActiveRegionCache
287 self.skip_cache_active_change = True
291 self.skip_cache_active_change = True
288 return ActiveRegionCache(context=self)
292 return ActiveRegionCache(context=self)
289
293
290 # the key is either not existing or set to False, we return
294 # the key is either not existing or set to False, we return
291 # the real invalidator which re-computes value. We additionally set
295 # the real invalidator which re-computes value. We additionally set
292 # the flag to actually update the Database objects
296 # the flag to actually update the Database objects
293 self.skip_cache_active_change = False
297 self.skip_cache_active_change = False
294 return FreshRegionCache(context=self)
298 return FreshRegionCache(context=self)
295
299
296 def __exit__(self, exc_type, exc_val, exc_tb):
300 def __exit__(self, exc_type, exc_val, exc_tb):
297
301
298 if self.skip_cache_active_change:
302 if self.skip_cache_active_change:
299 return
303 return
300
304
301 try:
305 try:
302 self.cache_obj.cache_active = True
306 self.cache_obj.cache_active = True
303 Session().add(self.cache_obj)
307 Session().add(self.cache_obj)
304 Session().commit()
308 Session().commit()
305 except IntegrityError:
309 except IntegrityError:
306 # if we catch integrity error, it means we inserted this object
310 # if we catch integrity error, it means we inserted this object
307 # assumption is that's really an edge race-condition case and
311 # assumption is that's really an edge race-condition case and
308 # it's safe is to skip it
312 # it's safe is to skip it
309 Session().rollback()
313 Session().rollback()
310 except Exception:
314 except Exception:
311 log.exception('Failed to commit on cache key update')
315 log.exception('Failed to commit on cache key update')
312 Session().rollback()
316 Session().rollback()
313 if self.raise_exception:
317 if self.raise_exception:
314 raise
318 raise
General Comments 0
You need to be logged in to leave comments. Login now