##// END OF EJS Templates
caches: add cython compat to our own decorator for caching
marcink -
r3496:faf385c1 default
parent child Browse files
Show More
@@ -1,322 +1,323 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2019 RhodeCode GmbH
3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import threading
24 import threading
25
25
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from . import region_meta
34 from . import region_meta
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class RhodeCodeCacheRegion(CacheRegion):
39 class RhodeCodeCacheRegion(CacheRegion):
40
40
41 def conditional_cache_on_arguments(
41 def conditional_cache_on_arguments(
42 self, namespace=None,
42 self, namespace=None,
43 expiration_time=None,
43 expiration_time=None,
44 should_cache_fn=None,
44 should_cache_fn=None,
45 to_str=compat.string_type,
45 to_str=compat.string_type,
46 function_key_generator=None,
46 function_key_generator=None,
47 condition=True):
47 condition=True):
48 """
48 """
49 Custom conditional decorator, that will not touch any dogpile internals if
49 Custom conditional decorator, that will not touch any dogpile internals if
50 condition isn't meet. This works a bit different than should_cache_fn
50 condition isn't meet. This works a bit different than should_cache_fn
51 And it's faster in cases we don't ever want to compute cached values
51 And it's faster in cases we don't ever want to compute cached values
52 """
52 """
53 expiration_time_is_callable = compat.callable(expiration_time)
53 expiration_time_is_callable = compat.callable(expiration_time)
54
54
55 if function_key_generator is None:
55 if function_key_generator is None:
56 function_key_generator = self.function_key_generator
56 function_key_generator = self.function_key_generator
57
57
58 def decorator(fn):
58 def decorator(fn):
59 if to_str is compat.string_type:
59 if to_str is compat.string_type:
60 # backwards compatible
60 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
61 key_generator = function_key_generator(namespace, fn)
62 else:
62 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64
64
65 @functools.wraps(fn)
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
67 key = key_generator(*arg, **kw)
68
68
69 @functools.wraps(fn)
69 @functools.wraps(fn)
70 def creator():
70 def creator():
71 return fn(*arg, **kw)
71 return fn(*arg, **kw)
72
72
73 if not condition:
73 if not condition:
74 return creator()
74 return creator()
75
75
76 timeout = expiration_time() if expiration_time_is_callable \
76 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
77 else expiration_time
78
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80
80
81 def invalidate(*arg, **kw):
81 def invalidate(*arg, **kw):
82 key = key_generator(*arg, **kw)
82 key = key_generator(*arg, **kw)
83 self.delete(key)
83 self.delete(key)
84
84
85 def set_(value, *arg, **kw):
85 def set_(value, *arg, **kw):
86 key = key_generator(*arg, **kw)
86 key = key_generator(*arg, **kw)
87 self.set(key, value)
87 self.set(key, value)
88
88
89 def get(*arg, **kw):
89 def get(*arg, **kw):
90 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
91 return self.get(key)
91 return self.get(key)
92
92
93 def refresh(*arg, **kw):
93 def refresh(*arg, **kw):
94 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
95 value = fn(*arg, **kw)
95 value = fn(*arg, **kw)
96 self.set(key, value)
96 self.set(key, value)
97 return value
97 return value
98
98
99 decorate.set = set_
99 decorate.set = set_
100 decorate.invalidate = invalidate
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
101 decorate.refresh = refresh
102 decorate.get = get
102 decorate.get = get
103 decorate.original = fn
103 decorate.original = fn
104 decorate.key_generator = key_generator
104 decorate.key_generator = key_generator
105 decorate.__wrapped__ = fn
105
106
106 return decorate
107 return decorate
107
108
108 return decorator
109 return decorator
109
110
110
111
111 def make_region(*arg, **kw):
112 def make_region(*arg, **kw):
112 return RhodeCodeCacheRegion(*arg, **kw)
113 return RhodeCodeCacheRegion(*arg, **kw)
113
114
114
115
115 def get_default_cache_settings(settings, prefixes=None):
116 def get_default_cache_settings(settings, prefixes=None):
116 prefixes = prefixes or []
117 prefixes = prefixes or []
117 cache_settings = {}
118 cache_settings = {}
118 for key in settings.keys():
119 for key in settings.keys():
119 for prefix in prefixes:
120 for prefix in prefixes:
120 if key.startswith(prefix):
121 if key.startswith(prefix):
121 name = key.split(prefix)[1].strip()
122 name = key.split(prefix)[1].strip()
122 val = settings[key]
123 val = settings[key]
123 if isinstance(val, compat.string_types):
124 if isinstance(val, compat.string_types):
124 val = val.strip()
125 val = val.strip()
125 cache_settings[name] = val
126 cache_settings[name] = val
126 return cache_settings
127 return cache_settings
127
128
128
129
129 def compute_key_from_params(*args):
130 def compute_key_from_params(*args):
130 """
131 """
131 Helper to compute key from given params to be used in cache manager
132 Helper to compute key from given params to be used in cache manager
132 """
133 """
133 return sha1("_".join(map(safe_str, args)))
134 return sha1("_".join(map(safe_str, args)))
134
135
135
136
136 def key_generator(namespace, fn):
137 def key_generator(namespace, fn):
137 fname = fn.__name__
138 fname = fn.__name__
138
139
139 def generate_key(*args):
140 def generate_key(*args):
140 namespace_pref = namespace or 'default'
141 namespace_pref = namespace or 'default'
141 arg_key = compute_key_from_params(*args)
142 arg_key = compute_key_from_params(*args)
142 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
143 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
143
144
144 return final_key
145 return final_key
145
146
146 return generate_key
147 return generate_key
147
148
148
149
149 def get_or_create_region(region_name, region_namespace=None):
150 def get_or_create_region(region_name, region_namespace=None):
150 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
151 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
151 region_obj = region_meta.dogpile_cache_regions.get(region_name)
152 region_obj = region_meta.dogpile_cache_regions.get(region_name)
152 if not region_obj:
153 if not region_obj:
153 raise EnvironmentError(
154 raise EnvironmentError(
154 'Region `{}` not in configured: {}.'.format(
155 'Region `{}` not in configured: {}.'.format(
155 region_name, region_meta.dogpile_cache_regions.keys()))
156 region_name, region_meta.dogpile_cache_regions.keys()))
156
157
157 region_uid_name = '{}:{}'.format(region_name, region_namespace)
158 region_uid_name = '{}:{}'.format(region_name, region_namespace)
158 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
159 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
159 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
160 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
160 if region_exist:
161 if region_exist:
161 log.debug('Using already configured region: %s', region_namespace)
162 log.debug('Using already configured region: %s', region_namespace)
162 return region_exist
163 return region_exist
163 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
164 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
164 expiration_time = region_obj.expiration_time
165 expiration_time = region_obj.expiration_time
165
166
166 if not os.path.isdir(cache_dir):
167 if not os.path.isdir(cache_dir):
167 os.makedirs(cache_dir)
168 os.makedirs(cache_dir)
168 new_region = make_region(
169 new_region = make_region(
169 name=region_uid_name, function_key_generator=key_generator
170 name=region_uid_name, function_key_generator=key_generator
170 )
171 )
171 namespace_filename = os.path.join(
172 namespace_filename = os.path.join(
172 cache_dir, "{}.cache.dbm".format(region_namespace))
173 cache_dir, "{}.cache.dbm".format(region_namespace))
173 # special type that allows 1db per namespace
174 # special type that allows 1db per namespace
174 new_region.configure(
175 new_region.configure(
175 backend='dogpile.cache.rc.file_namespace',
176 backend='dogpile.cache.rc.file_namespace',
176 expiration_time=expiration_time,
177 expiration_time=expiration_time,
177 arguments={"filename": namespace_filename}
178 arguments={"filename": namespace_filename}
178 )
179 )
179
180
180 # create and save in region caches
181 # create and save in region caches
181 log.debug('configuring new region: %s',region_uid_name)
182 log.debug('configuring new region: %s',region_uid_name)
182 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
183 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
183
184
184 return region_obj
185 return region_obj
185
186
186
187
187 def clear_cache_namespace(cache_region, cache_namespace_uid):
188 def clear_cache_namespace(cache_region, cache_namespace_uid):
188 region = get_or_create_region(cache_region, cache_namespace_uid)
189 region = get_or_create_region(cache_region, cache_namespace_uid)
189 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
190 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
190 num_delete_keys = len(cache_keys)
191 num_delete_keys = len(cache_keys)
191 if num_delete_keys:
192 if num_delete_keys:
192 region.delete_multi(cache_keys)
193 region.delete_multi(cache_keys)
193 return num_delete_keys
194 return num_delete_keys
194
195
195
196
196 class ActiveRegionCache(object):
197 class ActiveRegionCache(object):
197 def __init__(self, context):
198 def __init__(self, context):
198 self.context = context
199 self.context = context
199
200
200 def should_invalidate(self):
201 def should_invalidate(self):
201 return False
202 return False
202
203
203
204
204 class FreshRegionCache(object):
205 class FreshRegionCache(object):
205 def __init__(self, context):
206 def __init__(self, context):
206 self.context = context
207 self.context = context
207
208
208 def should_invalidate(self):
209 def should_invalidate(self):
209 return True
210 return True
210
211
211
212
212 class InvalidationContext(object):
213 class InvalidationContext(object):
213 """
214 """
214 usage::
215 usage::
215
216
216 from rhodecode.lib import rc_cache
217 from rhodecode.lib import rc_cache
217
218
218 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
219 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
219 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
220 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
220
221
221 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
222 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
222 def heavy_compute(cache_name, param1, param2):
223 def heavy_compute(cache_name, param1, param2):
223 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
224 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
224
225
225 # invalidation namespace is shared namespace key for all process caches
226 # invalidation namespace is shared namespace key for all process caches
226 # we use it to send a global signal
227 # we use it to send a global signal
227 invalidation_namespace = 'repo_cache:1'
228 invalidation_namespace = 'repo_cache:1'
228
229
229 inv_context_manager = rc_cache.InvalidationContext(
230 inv_context_manager = rc_cache.InvalidationContext(
230 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
231 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
231 with inv_context_manager as invalidation_context:
232 with inv_context_manager as invalidation_context:
232 args = ('one', 'two')
233 args = ('one', 'two')
233 # re-compute and store cache if we get invalidate signal
234 # re-compute and store cache if we get invalidate signal
234 if invalidation_context.should_invalidate():
235 if invalidation_context.should_invalidate():
235 result = heavy_compute.refresh(*args)
236 result = heavy_compute.refresh(*args)
236 else:
237 else:
237 result = heavy_compute(*args)
238 result = heavy_compute(*args)
238
239
239 compute_time = inv_context_manager.compute_time
240 compute_time = inv_context_manager.compute_time
240 log.debug('result computed in %.3fs', compute_time)
241 log.debug('result computed in %.3fs', compute_time)
241
242
242 # To send global invalidation signal, simply run
243 # To send global invalidation signal, simply run
243 CacheKey.set_invalidate(invalidation_namespace)
244 CacheKey.set_invalidate(invalidation_namespace)
244
245
245 """
246 """
246
247
247 def __repr__(self):
248 def __repr__(self):
248 return '<InvalidationContext:{}[{}]>'.format(
249 return '<InvalidationContext:{}[{}]>'.format(
249 safe_str(self.cache_key), safe_str(self.uid))
250 safe_str(self.cache_key), safe_str(self.uid))
250
251
251 def __init__(self, uid, invalidation_namespace='',
252 def __init__(self, uid, invalidation_namespace='',
252 raise_exception=False, thread_scoped=None):
253 raise_exception=False, thread_scoped=None):
253 self.uid = uid
254 self.uid = uid
254 self.invalidation_namespace = invalidation_namespace
255 self.invalidation_namespace = invalidation_namespace
255 self.raise_exception = raise_exception
256 self.raise_exception = raise_exception
256 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
257 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
257 self.thread_id = 'global'
258 self.thread_id = 'global'
258
259
259 if thread_scoped is None:
260 if thread_scoped is None:
260 # if we set "default" we can override this via .ini settings
261 # if we set "default" we can override this via .ini settings
261 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
262 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
262
263
263 # Append the thread id to the cache key if this invalidation context
264 # Append the thread id to the cache key if this invalidation context
264 # should be scoped to the current thread.
265 # should be scoped to the current thread.
265 if thread_scoped is True:
266 if thread_scoped is True:
266 self.thread_id = threading.current_thread().ident
267 self.thread_id = threading.current_thread().ident
267
268
268 self.cache_key = compute_key_from_params(uid)
269 self.cache_key = compute_key_from_params(uid)
269 self.cache_key = 'proc:{}_thread:{}_{}'.format(
270 self.cache_key = 'proc:{}_thread:{}_{}'.format(
270 self.proc_id, self.thread_id, self.cache_key)
271 self.proc_id, self.thread_id, self.cache_key)
271 self.compute_time = 0
272 self.compute_time = 0
272
273
273 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
274 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
274 cache_obj = CacheKey.get_active_cache(self.cache_key)
275 cache_obj = CacheKey.get_active_cache(self.cache_key)
275 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
276 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
276 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
277 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
277 if not cache_obj:
278 if not cache_obj:
278 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
279 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
279 return cache_obj
280 return cache_obj
280
281
281 def __enter__(self):
282 def __enter__(self):
282 """
283 """
283 Test if current object is valid, and return CacheRegion function
284 Test if current object is valid, and return CacheRegion function
284 that does invalidation and calculation
285 that does invalidation and calculation
285 """
286 """
286 # register or get a new key based on uid
287 # register or get a new key based on uid
287 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
288 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
288 self._start_time = time.time()
289 self._start_time = time.time()
289 if self.cache_obj.cache_active:
290 if self.cache_obj.cache_active:
290 # means our cache obj is existing and marked as it's
291 # means our cache obj is existing and marked as it's
291 # cache is not outdated, we return ActiveRegionCache
292 # cache is not outdated, we return ActiveRegionCache
292 self.skip_cache_active_change = True
293 self.skip_cache_active_change = True
293
294
294 return ActiveRegionCache(context=self)
295 return ActiveRegionCache(context=self)
295
296
296 # the key is either not existing or set to False, we return
297 # the key is either not existing or set to False, we return
297 # the real invalidator which re-computes value. We additionally set
298 # the real invalidator which re-computes value. We additionally set
298 # the flag to actually update the Database objects
299 # the flag to actually update the Database objects
299 self.skip_cache_active_change = False
300 self.skip_cache_active_change = False
300 return FreshRegionCache(context=self)
301 return FreshRegionCache(context=self)
301
302
302 def __exit__(self, exc_type, exc_val, exc_tb):
303 def __exit__(self, exc_type, exc_val, exc_tb):
303 # save compute time
304 # save compute time
304 self.compute_time = time.time() - self._start_time
305 self.compute_time = time.time() - self._start_time
305
306
306 if self.skip_cache_active_change:
307 if self.skip_cache_active_change:
307 return
308 return
308
309
309 try:
310 try:
310 self.cache_obj.cache_active = True
311 self.cache_obj.cache_active = True
311 Session().add(self.cache_obj)
312 Session().add(self.cache_obj)
312 Session().commit()
313 Session().commit()
313 except IntegrityError:
314 except IntegrityError:
314 # if we catch integrity error, it means we inserted this object
315 # if we catch integrity error, it means we inserted this object
315 # assumption is that's really an edge race-condition case and
316 # assumption is that's really an edge race-condition case and
316 # it's safe is to skip it
317 # it's safe is to skip it
317 Session().rollback()
318 Session().rollback()
318 except Exception:
319 except Exception:
319 log.exception('Failed to commit on cache key update')
320 log.exception('Failed to commit on cache key update')
320 Session().rollback()
321 Session().rollback()
321 if self.raise_exception:
322 if self.raise_exception:
322 raise
323 raise
General Comments 0
You need to be logged in to leave comments. Login now