##// END OF EJS Templates
cython: fixed problems with cythonized decorators on py2....
marcink -
r4175:a79bcd0c default
parent child Browse files
Show More
@@ -1,355 +1,415 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import time
22 22 import logging
23 23 import functools
24 from decorator import decorate
25 24 import threading
26 25
27 26 from dogpile.cache import CacheRegion
28 27 from dogpile.cache.util import compat
29 28
30 29 import rhodecode
31 30 from rhodecode.lib.utils import safe_str, sha1
32 31 from rhodecode.lib.utils2 import safe_unicode, str2bool
33 32 from rhodecode.model.db import Session, CacheKey, IntegrityError
34 33
35 34 from rhodecode.lib.rc_cache import cache_key_meta
36 35 from rhodecode.lib.rc_cache import region_meta
37 36
38 37 log = logging.getLogger(__name__)
39 38
40 39
40 def isCython(func):
41 """
42 Private helper that checks if a function is a cython function.
43 """
44 return func.__class__.__name__ == 'cython_function_or_method'
45
46
41 47 class RhodeCodeCacheRegion(CacheRegion):
42 48
43 49 def conditional_cache_on_arguments(
44 50 self, namespace=None,
45 51 expiration_time=None,
46 52 should_cache_fn=None,
47 53 to_str=compat.string_type,
48 54 function_key_generator=None,
49 55 condition=True):
50 56 """
51 57 Custom conditional decorator, that will not touch any dogpile internals if
52 58 condition isn't meet. This works a bit different than should_cache_fn
53 59 And it's faster in cases we don't ever want to compute cached values
54 60 """
55 61 expiration_time_is_callable = compat.callable(expiration_time)
56 62
57 63 if function_key_generator is None:
58 64 function_key_generator = self.function_key_generator
59 65
66 # workaround for py2 and cython problems, this block should be removed
67 # once we've migrated to py3
68 if 'cython' == 'cython':
69 def decorator(fn):
70 if to_str is compat.string_type:
71 # backwards compatible
72 key_generator = function_key_generator(namespace, fn)
73 else:
74 key_generator = function_key_generator(namespace, fn, to_str=to_str)
75
76 @functools.wraps(fn)
77 def decorate(*arg, **kw):
78 key = key_generator(*arg, **kw)
79
80 @functools.wraps(fn)
81 def creator():
82 return fn(*arg, **kw)
83
84 if not condition:
85 return creator()
86
87 timeout = expiration_time() if expiration_time_is_callable \
88 else expiration_time
89
90 return self.get_or_create(key, creator, timeout, should_cache_fn)
91
92 def invalidate(*arg, **kw):
93 key = key_generator(*arg, **kw)
94 self.delete(key)
95
96 def set_(value, *arg, **kw):
97 key = key_generator(*arg, **kw)
98 self.set(key, value)
99
100 def get(*arg, **kw):
101 key = key_generator(*arg, **kw)
102 return self.get(key)
103
104 def refresh(*arg, **kw):
105 key = key_generator(*arg, **kw)
106 value = fn(*arg, **kw)
107 self.set(key, value)
108 return value
109
110 decorate.set = set_
111 decorate.invalidate = invalidate
112 decorate.refresh = refresh
113 decorate.get = get
114 decorate.original = fn
115 decorate.key_generator = key_generator
116 decorate.__wrapped__ = fn
117
118 return decorate
119 return decorator
120
60 121 def get_or_create_for_user_func(key_generator, user_func, *arg, **kw):
61 122
62 123 if not condition:
63 124 log.debug('Calling un-cached func:%s', user_func.func_name)
64 125 return user_func(*arg, **kw)
65 126
66 127 key = key_generator(*arg, **kw)
67 128
68 129 timeout = expiration_time() if expiration_time_is_callable \
69 130 else expiration_time
70 131
71 132 log.debug('Calling cached fn:%s', user_func.func_name)
72 133 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
73 134
74 135 def cache_decorator(user_func):
75 136 if to_str is compat.string_type:
76 137 # backwards compatible
77 138 key_generator = function_key_generator(namespace, user_func)
78 139 else:
79 140 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
80 141
81 142 def refresh(*arg, **kw):
82 143 """
83 144 Like invalidate, but regenerates the value instead
84 145 """
85 146 key = key_generator(*arg, **kw)
86 147 value = user_func(*arg, **kw)
87 148 self.set(key, value)
88 149 return value
89 150
90 151 def invalidate(*arg, **kw):
91 152 key = key_generator(*arg, **kw)
92 153 self.delete(key)
93 154
94 155 def set_(value, *arg, **kw):
95 156 key = key_generator(*arg, **kw)
96 157 self.set(key, value)
97 158
98 159 def get(*arg, **kw):
99 160 key = key_generator(*arg, **kw)
100 161 return self.get(key)
101 162
102 163 user_func.set = set_
103 164 user_func.invalidate = invalidate
104 165 user_func.get = get
105 166 user_func.refresh = refresh
106 167 user_func.key_generator = key_generator
107 168 user_func.original = user_func
108 169
109 170 # Use `decorate` to preserve the signature of :param:`user_func`.
110
111 return decorate(user_func, functools.partial(
171 return decorator.decorate(user_func, functools.partial(
112 172 get_or_create_for_user_func, key_generator))
113 173
114 174 return cache_decorator
115 175
116 176
117 177 def make_region(*arg, **kw):
118 178 return RhodeCodeCacheRegion(*arg, **kw)
119 179
120 180
121 181 def get_default_cache_settings(settings, prefixes=None):
122 182 prefixes = prefixes or []
123 183 cache_settings = {}
124 184 for key in settings.keys():
125 185 for prefix in prefixes:
126 186 if key.startswith(prefix):
127 187 name = key.split(prefix)[1].strip()
128 188 val = settings[key]
129 189 if isinstance(val, compat.string_types):
130 190 val = val.strip()
131 191 cache_settings[name] = val
132 192 return cache_settings
133 193
134 194
135 195 def compute_key_from_params(*args):
136 196 """
137 197 Helper to compute key from given params to be used in cache manager
138 198 """
139 199 return sha1("_".join(map(safe_str, args)))
140 200
141 201
142 202 def backend_key_generator(backend):
143 203 """
144 204 Special wrapper that also sends over the backend to the key generator
145 205 """
146 206 def wrapper(namespace, fn):
147 207 return key_generator(backend, namespace, fn)
148 208 return wrapper
149 209
150 210
151 211 def key_generator(backend, namespace, fn):
152 212 fname = fn.__name__
153 213
154 214 def generate_key(*args):
155 215 backend_prefix = getattr(backend, 'key_prefix', None) or 'backend_prefix'
156 216 namespace_pref = namespace or 'default_namespace'
157 217 arg_key = compute_key_from_params(*args)
158 218 final_key = "{}:{}:{}_{}".format(backend_prefix, namespace_pref, fname, arg_key)
159 219
160 220 return final_key
161 221
162 222 return generate_key
163 223
164 224
165 225 def get_or_create_region(region_name, region_namespace=None):
166 226 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
167 227 region_obj = region_meta.dogpile_cache_regions.get(region_name)
168 228 if not region_obj:
169 229 raise EnvironmentError(
170 230 'Region `{}` not in configured: {}.'.format(
171 231 region_name, region_meta.dogpile_cache_regions.keys()))
172 232
173 233 region_uid_name = '{}:{}'.format(region_name, region_namespace)
174 234 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
175 235 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
176 236 if region_exist:
177 237 log.debug('Using already configured region: %s', region_namespace)
178 238 return region_exist
179 239 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
180 240 expiration_time = region_obj.expiration_time
181 241
182 242 if not os.path.isdir(cache_dir):
183 243 os.makedirs(cache_dir)
184 244 new_region = make_region(
185 245 name=region_uid_name,
186 246 function_key_generator=backend_key_generator(region_obj.actual_backend)
187 247 )
188 248 namespace_filename = os.path.join(
189 249 cache_dir, "{}.cache.dbm".format(region_namespace))
190 250 # special type that allows 1db per namespace
191 251 new_region.configure(
192 252 backend='dogpile.cache.rc.file_namespace',
193 253 expiration_time=expiration_time,
194 254 arguments={"filename": namespace_filename}
195 255 )
196 256
197 257 # create and save in region caches
198 258 log.debug('configuring new region: %s', region_uid_name)
199 259 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
200 260
201 261 return region_obj
202 262
203 263
204 264 def clear_cache_namespace(cache_region, cache_namespace_uid):
205 265 region = get_or_create_region(cache_region, cache_namespace_uid)
206 266 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
207 267 num_delete_keys = len(cache_keys)
208 268 if num_delete_keys:
209 269 region.delete_multi(cache_keys)
210 270 return num_delete_keys
211 271
212 272
213 273 class ActiveRegionCache(object):
214 274 def __init__(self, context, cache_data):
215 275 self.context = context
216 276 self.cache_data = cache_data
217 277
218 278 def should_invalidate(self):
219 279 return False
220 280
221 281
222 282 class FreshRegionCache(object):
223 283 def __init__(self, context, cache_data):
224 284 self.context = context
225 285 self.cache_data = cache_data
226 286
227 287 def should_invalidate(self):
228 288 return True
229 289
230 290
231 291 class InvalidationContext(object):
232 292 """
233 293 usage::
234 294
235 295 from rhodecode.lib import rc_cache
236 296
237 297 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
238 298 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
239 299
240 300 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
241 301 def heavy_compute(cache_name, param1, param2):
242 302 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
243 303
244 304 # invalidation namespace is shared namespace key for all process caches
245 305 # we use it to send a global signal
246 306 invalidation_namespace = 'repo_cache:1'
247 307
248 308 inv_context_manager = rc_cache.InvalidationContext(
249 309 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
250 310 with inv_context_manager as invalidation_context:
251 311 args = ('one', 'two')
252 312 # re-compute and store cache if we get invalidate signal
253 313 if invalidation_context.should_invalidate():
254 314 result = heavy_compute.refresh(*args)
255 315 else:
256 316 result = heavy_compute(*args)
257 317
258 318 compute_time = inv_context_manager.compute_time
259 319 log.debug('result computed in %.4fs', compute_time)
260 320
261 321 # To send global invalidation signal, simply run
262 322 CacheKey.set_invalidate(invalidation_namespace)
263 323
264 324 """
265 325
266 326 def __repr__(self):
267 327 return '<InvalidationContext:{}[{}]>'.format(
268 328 safe_str(self.cache_key), safe_str(self.uid))
269 329
270 330 def __init__(self, uid, invalidation_namespace='',
271 331 raise_exception=False, thread_scoped=None):
272 332 self.uid = uid
273 333 self.invalidation_namespace = invalidation_namespace
274 334 self.raise_exception = raise_exception
275 335 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
276 336 self.thread_id = 'global'
277 337
278 338 if thread_scoped is None:
279 339 # if we set "default" we can override this via .ini settings
280 340 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
281 341
282 342 # Append the thread id to the cache key if this invalidation context
283 343 # should be scoped to the current thread.
284 344 if thread_scoped is True:
285 345 self.thread_id = threading.current_thread().ident
286 346
287 347 self.cache_key = compute_key_from_params(uid)
288 348 self.cache_key = 'proc:{}|thread:{}|params:{}'.format(
289 349 self.proc_id, self.thread_id, self.cache_key)
290 350 self.compute_time = 0
291 351
292 352 def get_or_create_cache_obj(self, cache_type, invalidation_namespace=''):
293 353 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
294 354 # fetch all cache keys for this namespace and convert them to a map to find if we
295 355 # have specific cache_key object registered. We do this because we want to have
296 356 # all consistent cache_state_uid for newly registered objects
297 357 cache_obj_map = CacheKey.get_namespace_map(invalidation_namespace)
298 358 cache_obj = cache_obj_map.get(self.cache_key)
299 359 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
300 360 if not cache_obj:
301 361 new_cache_args = invalidation_namespace
302 362 first_cache_obj = next(cache_obj_map.itervalues()) if cache_obj_map else None
303 363 cache_state_uid = None
304 364 if first_cache_obj:
305 365 cache_state_uid = first_cache_obj.cache_state_uid
306 366 cache_obj = CacheKey(self.cache_key, cache_args=new_cache_args,
307 367 cache_state_uid=cache_state_uid)
308 368 cache_key_meta.cache_keys_by_pid.append(self.cache_key)
309 369
310 370 return cache_obj
311 371
312 372 def __enter__(self):
313 373 """
314 374 Test if current object is valid, and return CacheRegion function
315 375 that does invalidation and calculation
316 376 """
317 377 log.debug('Entering cache invalidation check context: %s', self.invalidation_namespace)
318 378 # register or get a new key based on uid
319 379 self.cache_obj = self.get_or_create_cache_obj(cache_type=self.uid)
320 380 cache_data = self.cache_obj.get_dict()
321 381 self._start_time = time.time()
322 382 if self.cache_obj.cache_active:
323 383 # means our cache obj is existing and marked as it's
324 384 # cache is not outdated, we return ActiveRegionCache
325 385 self.skip_cache_active_change = True
326 386
327 387 return ActiveRegionCache(context=self, cache_data=cache_data)
328 388
329 389 # the key is either not existing or set to False, we return
330 390 # the real invalidator which re-computes value. We additionally set
331 391 # the flag to actually update the Database objects
332 392 self.skip_cache_active_change = False
333 393 return FreshRegionCache(context=self, cache_data=cache_data)
334 394
335 395 def __exit__(self, exc_type, exc_val, exc_tb):
336 396 # save compute time
337 397 self.compute_time = time.time() - self._start_time
338 398
339 399 if self.skip_cache_active_change:
340 400 return
341 401
342 402 try:
343 403 self.cache_obj.cache_active = True
344 404 Session().add(self.cache_obj)
345 405 Session().commit()
346 406 except IntegrityError:
347 407 # if we catch integrity error, it means we inserted this object
348 408 # assumption is that's really an edge race-condition case and
349 409 # it's safe is to skip it
350 410 Session().rollback()
351 411 except Exception:
352 412 log.exception('Failed to commit on cache key update')
353 413 Session().rollback()
354 414 if self.raise_exception:
355 415 raise
General Comments 0
You need to be logged in to leave comments. Login now