##// END OF EJS Templates
caches: add cython compat to our own decorator for caching
marcink -
r3496:faf385c1 default
parent child Browse files
Show More
@@ -1,322 +1,323 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 24 import threading
25 25
26 26 from dogpile.cache import CacheRegion
27 27 from dogpile.cache.util import compat
28 28
29 29 import rhodecode
30 30 from rhodecode.lib.utils import safe_str, sha1
31 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33 33
34 34 from . import region_meta
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38
39 39 class RhodeCodeCacheRegion(CacheRegion):
40 40
41 41 def conditional_cache_on_arguments(
42 42 self, namespace=None,
43 43 expiration_time=None,
44 44 should_cache_fn=None,
45 45 to_str=compat.string_type,
46 46 function_key_generator=None,
47 47 condition=True):
48 48 """
49 49 Custom conditional decorator, that will not touch any dogpile internals if
50 50 condition isn't meet. This works a bit different than should_cache_fn
51 51 And it's faster in cases we don't ever want to compute cached values
52 52 """
53 53 expiration_time_is_callable = compat.callable(expiration_time)
54 54
55 55 if function_key_generator is None:
56 56 function_key_generator = self.function_key_generator
57 57
58 58 def decorator(fn):
59 59 if to_str is compat.string_type:
60 60 # backwards compatible
61 61 key_generator = function_key_generator(namespace, fn)
62 62 else:
63 63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64 64
65 65 @functools.wraps(fn)
66 66 def decorate(*arg, **kw):
67 67 key = key_generator(*arg, **kw)
68 68
69 69 @functools.wraps(fn)
70 70 def creator():
71 71 return fn(*arg, **kw)
72 72
73 73 if not condition:
74 74 return creator()
75 75
76 76 timeout = expiration_time() if expiration_time_is_callable \
77 77 else expiration_time
78 78
79 79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80 80
81 81 def invalidate(*arg, **kw):
82 82 key = key_generator(*arg, **kw)
83 83 self.delete(key)
84 84
85 85 def set_(value, *arg, **kw):
86 86 key = key_generator(*arg, **kw)
87 87 self.set(key, value)
88 88
89 89 def get(*arg, **kw):
90 90 key = key_generator(*arg, **kw)
91 91 return self.get(key)
92 92
93 93 def refresh(*arg, **kw):
94 94 key = key_generator(*arg, **kw)
95 95 value = fn(*arg, **kw)
96 96 self.set(key, value)
97 97 return value
98 98
99 99 decorate.set = set_
100 100 decorate.invalidate = invalidate
101 101 decorate.refresh = refresh
102 102 decorate.get = get
103 103 decorate.original = fn
104 104 decorate.key_generator = key_generator
105 decorate.__wrapped__ = fn
105 106
106 107 return decorate
107 108
108 109 return decorator
109 110
110 111
111 112 def make_region(*arg, **kw):
112 113 return RhodeCodeCacheRegion(*arg, **kw)
113 114
114 115
115 116 def get_default_cache_settings(settings, prefixes=None):
116 117 prefixes = prefixes or []
117 118 cache_settings = {}
118 119 for key in settings.keys():
119 120 for prefix in prefixes:
120 121 if key.startswith(prefix):
121 122 name = key.split(prefix)[1].strip()
122 123 val = settings[key]
123 124 if isinstance(val, compat.string_types):
124 125 val = val.strip()
125 126 cache_settings[name] = val
126 127 return cache_settings
127 128
128 129
129 130 def compute_key_from_params(*args):
130 131 """
131 132 Helper to compute key from given params to be used in cache manager
132 133 """
133 134 return sha1("_".join(map(safe_str, args)))
134 135
135 136
136 137 def key_generator(namespace, fn):
137 138 fname = fn.__name__
138 139
139 140 def generate_key(*args):
140 141 namespace_pref = namespace or 'default'
141 142 arg_key = compute_key_from_params(*args)
142 143 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
143 144
144 145 return final_key
145 146
146 147 return generate_key
147 148
148 149
149 150 def get_or_create_region(region_name, region_namespace=None):
150 151 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
151 152 region_obj = region_meta.dogpile_cache_regions.get(region_name)
152 153 if not region_obj:
153 154 raise EnvironmentError(
154 155 'Region `{}` not in configured: {}.'.format(
155 156 region_name, region_meta.dogpile_cache_regions.keys()))
156 157
157 158 region_uid_name = '{}:{}'.format(region_name, region_namespace)
158 159 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
159 160 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
160 161 if region_exist:
161 162 log.debug('Using already configured region: %s', region_namespace)
162 163 return region_exist
163 164 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
164 165 expiration_time = region_obj.expiration_time
165 166
166 167 if not os.path.isdir(cache_dir):
167 168 os.makedirs(cache_dir)
168 169 new_region = make_region(
169 170 name=region_uid_name, function_key_generator=key_generator
170 171 )
171 172 namespace_filename = os.path.join(
172 173 cache_dir, "{}.cache.dbm".format(region_namespace))
173 174 # special type that allows 1db per namespace
174 175 new_region.configure(
175 176 backend='dogpile.cache.rc.file_namespace',
176 177 expiration_time=expiration_time,
177 178 arguments={"filename": namespace_filename}
178 179 )
179 180
180 181 # create and save in region caches
181 182 log.debug('configuring new region: %s',region_uid_name)
182 183 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
183 184
184 185 return region_obj
185 186
186 187
187 188 def clear_cache_namespace(cache_region, cache_namespace_uid):
188 189 region = get_or_create_region(cache_region, cache_namespace_uid)
189 190 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
190 191 num_delete_keys = len(cache_keys)
191 192 if num_delete_keys:
192 193 region.delete_multi(cache_keys)
193 194 return num_delete_keys
194 195
195 196
196 197 class ActiveRegionCache(object):
197 198 def __init__(self, context):
198 199 self.context = context
199 200
200 201 def should_invalidate(self):
201 202 return False
202 203
203 204
204 205 class FreshRegionCache(object):
205 206 def __init__(self, context):
206 207 self.context = context
207 208
208 209 def should_invalidate(self):
209 210 return True
210 211
211 212
212 213 class InvalidationContext(object):
213 214 """
214 215 usage::
215 216
216 217 from rhodecode.lib import rc_cache
217 218
218 219 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
219 220 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
220 221
221 222 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
222 223 def heavy_compute(cache_name, param1, param2):
223 224 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
224 225
225 226 # invalidation namespace is shared namespace key for all process caches
226 227 # we use it to send a global signal
227 228 invalidation_namespace = 'repo_cache:1'
228 229
229 230 inv_context_manager = rc_cache.InvalidationContext(
230 231 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
231 232 with inv_context_manager as invalidation_context:
232 233 args = ('one', 'two')
233 234 # re-compute and store cache if we get invalidate signal
234 235 if invalidation_context.should_invalidate():
235 236 result = heavy_compute.refresh(*args)
236 237 else:
237 238 result = heavy_compute(*args)
238 239
239 240 compute_time = inv_context_manager.compute_time
240 241 log.debug('result computed in %.3fs', compute_time)
241 242
242 243 # To send global invalidation signal, simply run
243 244 CacheKey.set_invalidate(invalidation_namespace)
244 245
245 246 """
246 247
247 248 def __repr__(self):
248 249 return '<InvalidationContext:{}[{}]>'.format(
249 250 safe_str(self.cache_key), safe_str(self.uid))
250 251
251 252 def __init__(self, uid, invalidation_namespace='',
252 253 raise_exception=False, thread_scoped=None):
253 254 self.uid = uid
254 255 self.invalidation_namespace = invalidation_namespace
255 256 self.raise_exception = raise_exception
256 257 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
257 258 self.thread_id = 'global'
258 259
259 260 if thread_scoped is None:
260 261 # if we set "default" we can override this via .ini settings
261 262 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
262 263
263 264 # Append the thread id to the cache key if this invalidation context
264 265 # should be scoped to the current thread.
265 266 if thread_scoped is True:
266 267 self.thread_id = threading.current_thread().ident
267 268
268 269 self.cache_key = compute_key_from_params(uid)
269 270 self.cache_key = 'proc:{}_thread:{}_{}'.format(
270 271 self.proc_id, self.thread_id, self.cache_key)
271 272 self.compute_time = 0
272 273
273 274 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
274 275 cache_obj = CacheKey.get_active_cache(self.cache_key)
275 276 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
276 277 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
277 278 if not cache_obj:
278 279 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
279 280 return cache_obj
280 281
281 282 def __enter__(self):
282 283 """
283 284 Test if current object is valid, and return CacheRegion function
284 285 that does invalidation and calculation
285 286 """
286 287 # register or get a new key based on uid
287 288 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
288 289 self._start_time = time.time()
289 290 if self.cache_obj.cache_active:
290 291 # means our cache obj is existing and marked as it's
291 292 # cache is not outdated, we return ActiveRegionCache
292 293 self.skip_cache_active_change = True
293 294
294 295 return ActiveRegionCache(context=self)
295 296
296 297 # the key is either not existing or set to False, we return
297 298 # the real invalidator which re-computes value. We additionally set
298 299 # the flag to actually update the Database objects
299 300 self.skip_cache_active_change = False
300 301 return FreshRegionCache(context=self)
301 302
302 303 def __exit__(self, exc_type, exc_val, exc_tb):
303 304 # save compute time
304 305 self.compute_time = time.time() - self._start_time
305 306
306 307 if self.skip_cache_active_change:
307 308 return
308 309
309 310 try:
310 311 self.cache_obj.cache_active = True
311 312 Session().add(self.cache_obj)
312 313 Session().commit()
313 314 except IntegrityError:
314 315 # if we catch integrity error, it means we inserted this object
315 316 # assumption is that's really an edge race-condition case and
316 317 # it's safe is to skip it
317 318 Session().rollback()
318 319 except Exception:
319 320 log.exception('Failed to commit on cache key update')
320 321 Session().rollback()
321 322 if self.raise_exception:
322 323 raise
General Comments 0
You need to be logged in to leave comments. Login now