##// END OF EJS Templates
caches: turn off thread scoped caches, and allow .ini override. Thread scoped caches are only usefull for development on using pserve
marcink -
r2935:47998ee0 default
parent child Browse files
Show More
@@ -1,314 +1,318 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import logging
22 22 import functools
23 23 import threading
24 24
25 25 from dogpile.cache import CacheRegion
26 26 from dogpile.cache.util import compat
27 27
28 28 import rhodecode
29 29 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils2 import safe_unicode
30 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 31 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 32
33 33 from . import region_meta
34 34
35 35 log = logging.getLogger(__name__)
36 36
37 37
38 38 class RhodeCodeCacheRegion(CacheRegion):
39 39
40 40 def conditional_cache_on_arguments(
41 41 self, namespace=None,
42 42 expiration_time=None,
43 43 should_cache_fn=None,
44 44 to_str=compat.string_type,
45 45 function_key_generator=None,
46 46 condition=True):
47 47 """
48 48 Custom conditional decorator, that will not touch any dogpile internals if
49 49 condition isn't meet. This works a bit different than should_cache_fn
50 50 And it's faster in cases we don't ever want to compute cached values
51 51 """
52 52 expiration_time_is_callable = compat.callable(expiration_time)
53 53
54 54 if function_key_generator is None:
55 55 function_key_generator = self.function_key_generator
56 56
57 57 def decorator(fn):
58 58 if to_str is compat.string_type:
59 59 # backwards compatible
60 60 key_generator = function_key_generator(namespace, fn)
61 61 else:
62 62 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63 63
64 64 @functools.wraps(fn)
65 65 def decorate(*arg, **kw):
66 66 key = key_generator(*arg, **kw)
67 67
68 68 @functools.wraps(fn)
69 69 def creator():
70 70 return fn(*arg, **kw)
71 71
72 72 if not condition:
73 73 return creator()
74 74
75 75 timeout = expiration_time() if expiration_time_is_callable \
76 76 else expiration_time
77 77
78 78 return self.get_or_create(key, creator, timeout, should_cache_fn)
79 79
80 80 def invalidate(*arg, **kw):
81 81 key = key_generator(*arg, **kw)
82 82 self.delete(key)
83 83
84 84 def set_(value, *arg, **kw):
85 85 key = key_generator(*arg, **kw)
86 86 self.set(key, value)
87 87
88 88 def get(*arg, **kw):
89 89 key = key_generator(*arg, **kw)
90 90 return self.get(key)
91 91
92 92 def refresh(*arg, **kw):
93 93 key = key_generator(*arg, **kw)
94 94 value = fn(*arg, **kw)
95 95 self.set(key, value)
96 96 return value
97 97
98 98 decorate.set = set_
99 99 decorate.invalidate = invalidate
100 100 decorate.refresh = refresh
101 101 decorate.get = get
102 102 decorate.original = fn
103 103 decorate.key_generator = key_generator
104 104
105 105 return decorate
106 106
107 107 return decorator
108 108
109 109
110 110 def make_region(*arg, **kw):
111 111 return RhodeCodeCacheRegion(*arg, **kw)
112 112
113 113
114 114 def get_default_cache_settings(settings, prefixes=None):
115 115 prefixes = prefixes or []
116 116 cache_settings = {}
117 117 for key in settings.keys():
118 118 for prefix in prefixes:
119 119 if key.startswith(prefix):
120 120 name = key.split(prefix)[1].strip()
121 121 val = settings[key]
122 122 if isinstance(val, basestring):
123 123 val = val.strip()
124 124 cache_settings[name] = val
125 125 return cache_settings
126 126
127 127
128 128 def compute_key_from_params(*args):
129 129 """
130 130 Helper to compute key from given params to be used in cache manager
131 131 """
132 132 return sha1("_".join(map(safe_str, args)))
133 133
134 134
135 135 def key_generator(namespace, fn):
136 136 fname = fn.__name__
137 137
138 138 def generate_key(*args):
139 139 namespace_pref = namespace or 'default'
140 140 arg_key = compute_key_from_params(*args)
141 141 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
142 142
143 143 return final_key
144 144
145 145 return generate_key
146 146
147 147
148 148 def get_or_create_region(region_name, region_namespace=None):
149 149 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
150 150 region_obj = region_meta.dogpile_cache_regions.get(region_name)
151 151 if not region_obj:
152 152 raise EnvironmentError(
153 153 'Region `{}` not in configured: {}.'.format(
154 154 region_name, region_meta.dogpile_cache_regions.keys()))
155 155
156 156 region_uid_name = '{}:{}'.format(region_name, region_namespace)
157 157 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
158 158 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
159 159 if region_exist:
160 160 log.debug('Using already configured region: %s', region_namespace)
161 161 return region_exist
162 162 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
163 163 expiration_time = region_obj.expiration_time
164 164
165 165 if not os.path.isdir(cache_dir):
166 166 os.makedirs(cache_dir)
167 167 new_region = make_region(
168 168 name=region_uid_name, function_key_generator=key_generator
169 169 )
170 170 namespace_filename = os.path.join(
171 171 cache_dir, "{}.cache.dbm".format(region_namespace))
172 172 # special type that allows 1db per namespace
173 173 new_region.configure(
174 174 backend='dogpile.cache.rc.file_namespace',
175 175 expiration_time=expiration_time,
176 176 arguments={"filename": namespace_filename}
177 177 )
178 178
179 179 # create and save in region caches
180 180 log.debug('configuring new region: %s',region_uid_name)
181 181 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
182 182
183 183 return region_obj
184 184
185 185
186 186 def clear_cache_namespace(cache_region, cache_namespace_uid):
187 187 region = get_or_create_region(cache_region, cache_namespace_uid)
188 188 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
189 189 region.delete_multi(cache_keys)
190 190 return len(cache_keys)
191 191
192 192
193 193 class ActiveRegionCache(object):
194 194 def __init__(self, context):
195 195 self.context = context
196 196
197 197 def should_invalidate(self):
198 198 return False
199 199
200 200
201 201 class FreshRegionCache(object):
202 202 def __init__(self, context):
203 203 self.context = context
204 204
205 205 def should_invalidate(self):
206 206 return True
207 207
208 208
209 209 class InvalidationContext(object):
210 210 """
211 211 usage::
212 212
213 213 import time
214 214 from rhodecode.lib import rc_cache
215 215 my_id = 1
216 216 cache_namespace_uid = 'cache_demo.{}'.format(my_id)
217 217 invalidation_namespace = 'repo_cache:1'
218 218 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
219 219
220 220 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid,
221 221 expiration_time=30,
222 222 condition=True)
223 223 def heavy_compute(cache_name, param1, param2):
224 224 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
225 225 import time
226 226 time.sleep(30)
227 227 return True
228 228
229 229 start = time.time()
230 230 inv_context_manager = rc_cache.InvalidationContext(
231 231 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
232 232 with inv_context_manager as invalidation_context:
233 233 # check for stored invalidation signal, and maybe purge the cache
234 234 # before computing it again
235 235 if invalidation_context.should_invalidate():
236 236 heavy_compute.invalidate('some_name', 'param1', 'param2')
237 237
238 238 result = heavy_compute('some_name', 'param1', 'param2')
239 239 compute_time = time.time() - start
240 240 print(compute_time)
241 241
242 242 # To send global invalidation signal, simply run
243 243 CacheKey.set_invalidate(invalidation_namespace)
244 244
245 245 """
246 246
247 247 def __repr__(self):
248 248 return '<InvalidationContext:{}[{}]>'.format(
249 249 safe_str(self.cache_key), safe_str(self.uid))
250 250
251 251 def __init__(self, uid, invalidation_namespace='',
252 raise_exception=False, thread_scoped=True):
252 raise_exception=False, thread_scoped=None):
253 253 self.uid = uid
254 254 self.invalidation_namespace = invalidation_namespace
255 255 self.raise_exception = raise_exception
256 256 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
257 257 self.thread_id = 'global'
258 258
259 if thread_scoped is None:
260 # if we set "default" we can override this via .ini settings
261 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
262
259 263 # Append the thread id to the cache key if this invalidation context
260 264 # should be scoped to the current thread.
261 if thread_scoped:
265 if thread_scoped is True:
262 266 self.thread_id = threading.current_thread().ident
263 267
264 268 self.cache_key = compute_key_from_params(uid)
265 269 self.cache_key = 'proc:{}_thread:{}_{}'.format(
266 270 self.proc_id, self.thread_id, self.cache_key)
267 271
268 272 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
269 273 log.debug('Checking if %s cache key is present and active', self.cache_key)
270 274 cache_obj = CacheKey.get_active_cache(self.cache_key)
271 275 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
272 276 if not cache_obj:
273 277 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
274 278 return cache_obj
275 279
276 280 def __enter__(self):
277 281 """
278 282 Test if current object is valid, and return CacheRegion function
279 283 that does invalidation and calculation
280 284 """
281 285 # register or get a new key based on uid
282 286 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
283 287
284 288 if self.cache_obj.cache_active:
285 289 # means our cache obj is existing and marked as it's
286 290 # cache is not outdated, we return ActiveRegionCache
287 291 self.skip_cache_active_change = True
288 292 return ActiveRegionCache(context=self)
289 293
290 294 # the key is either not existing or set to False, we return
291 295 # the real invalidator which re-computes value. We additionally set
292 296 # the flag to actually update the Database objects
293 297 self.skip_cache_active_change = False
294 298 return FreshRegionCache(context=self)
295 299
296 300 def __exit__(self, exc_type, exc_val, exc_tb):
297 301
298 302 if self.skip_cache_active_change:
299 303 return
300 304
301 305 try:
302 306 self.cache_obj.cache_active = True
303 307 Session().add(self.cache_obj)
304 308 Session().commit()
305 309 except IntegrityError:
306 310 # if we catch integrity error, it means we inserted this object
307 311 # assumption is that's really an edge race-condition case and
308 312 # it's safe is to skip it
309 313 Session().rollback()
310 314 except Exception:
311 315 log.exception('Failed to commit on cache key update')
312 316 Session().rollback()
313 317 if self.raise_exception:
314 318 raise
General Comments 0
You need to be logged in to leave comments. Login now