##// END OF EJS Templates
caches: improve logging.
marcink -
r2938:8ed0cc06 default
parent child Browse files
Show More
@@ -1,319 +1,319 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2018 RhodeCode GmbH
3 # Copyright (C) 2015-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import os
20 import os
21 import time
21 import time
22 import logging
22 import logging
23 import functools
23 import functools
24 import threading
24 import threading
25
25
26 from dogpile.cache import CacheRegion
26 from dogpile.cache import CacheRegion
27 from dogpile.cache.util import compat
27 from dogpile.cache.util import compat
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib.utils import safe_str, sha1
30 from rhodecode.lib.utils import safe_str, sha1
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
31 from rhodecode.lib.utils2 import safe_unicode, str2bool
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
32 from rhodecode.model.db import Session, CacheKey, IntegrityError
33
33
34 from . import region_meta
34 from . import region_meta
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38
38
39 class RhodeCodeCacheRegion(CacheRegion):
39 class RhodeCodeCacheRegion(CacheRegion):
40
40
41 def conditional_cache_on_arguments(
41 def conditional_cache_on_arguments(
42 self, namespace=None,
42 self, namespace=None,
43 expiration_time=None,
43 expiration_time=None,
44 should_cache_fn=None,
44 should_cache_fn=None,
45 to_str=compat.string_type,
45 to_str=compat.string_type,
46 function_key_generator=None,
46 function_key_generator=None,
47 condition=True):
47 condition=True):
48 """
48 """
49 Custom conditional decorator, that will not touch any dogpile internals if
49 Custom conditional decorator, that will not touch any dogpile internals if
50 condition isn't meet. This works a bit different than should_cache_fn
50 condition isn't meet. This works a bit different than should_cache_fn
51 And it's faster in cases we don't ever want to compute cached values
51 And it's faster in cases we don't ever want to compute cached values
52 """
52 """
53 expiration_time_is_callable = compat.callable(expiration_time)
53 expiration_time_is_callable = compat.callable(expiration_time)
54
54
55 if function_key_generator is None:
55 if function_key_generator is None:
56 function_key_generator = self.function_key_generator
56 function_key_generator = self.function_key_generator
57
57
58 def decorator(fn):
58 def decorator(fn):
59 if to_str is compat.string_type:
59 if to_str is compat.string_type:
60 # backwards compatible
60 # backwards compatible
61 key_generator = function_key_generator(namespace, fn)
61 key_generator = function_key_generator(namespace, fn)
62 else:
62 else:
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
63 key_generator = function_key_generator(namespace, fn, to_str=to_str)
64
64
65 @functools.wraps(fn)
65 @functools.wraps(fn)
66 def decorate(*arg, **kw):
66 def decorate(*arg, **kw):
67 key = key_generator(*arg, **kw)
67 key = key_generator(*arg, **kw)
68
68
69 @functools.wraps(fn)
69 @functools.wraps(fn)
70 def creator():
70 def creator():
71 return fn(*arg, **kw)
71 return fn(*arg, **kw)
72
72
73 if not condition:
73 if not condition:
74 return creator()
74 return creator()
75
75
76 timeout = expiration_time() if expiration_time_is_callable \
76 timeout = expiration_time() if expiration_time_is_callable \
77 else expiration_time
77 else expiration_time
78
78
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
79 return self.get_or_create(key, creator, timeout, should_cache_fn)
80
80
81 def invalidate(*arg, **kw):
81 def invalidate(*arg, **kw):
82 key = key_generator(*arg, **kw)
82 key = key_generator(*arg, **kw)
83 self.delete(key)
83 self.delete(key)
84
84
85 def set_(value, *arg, **kw):
85 def set_(value, *arg, **kw):
86 key = key_generator(*arg, **kw)
86 key = key_generator(*arg, **kw)
87 self.set(key, value)
87 self.set(key, value)
88
88
89 def get(*arg, **kw):
89 def get(*arg, **kw):
90 key = key_generator(*arg, **kw)
90 key = key_generator(*arg, **kw)
91 return self.get(key)
91 return self.get(key)
92
92
93 def refresh(*arg, **kw):
93 def refresh(*arg, **kw):
94 key = key_generator(*arg, **kw)
94 key = key_generator(*arg, **kw)
95 value = fn(*arg, **kw)
95 value = fn(*arg, **kw)
96 self.set(key, value)
96 self.set(key, value)
97 return value
97 return value
98
98
99 decorate.set = set_
99 decorate.set = set_
100 decorate.invalidate = invalidate
100 decorate.invalidate = invalidate
101 decorate.refresh = refresh
101 decorate.refresh = refresh
102 decorate.get = get
102 decorate.get = get
103 decorate.original = fn
103 decorate.original = fn
104 decorate.key_generator = key_generator
104 decorate.key_generator = key_generator
105
105
106 return decorate
106 return decorate
107
107
108 return decorator
108 return decorator
109
109
110
110
111 def make_region(*arg, **kw):
111 def make_region(*arg, **kw):
112 return RhodeCodeCacheRegion(*arg, **kw)
112 return RhodeCodeCacheRegion(*arg, **kw)
113
113
114
114
115 def get_default_cache_settings(settings, prefixes=None):
115 def get_default_cache_settings(settings, prefixes=None):
116 prefixes = prefixes or []
116 prefixes = prefixes or []
117 cache_settings = {}
117 cache_settings = {}
118 for key in settings.keys():
118 for key in settings.keys():
119 for prefix in prefixes:
119 for prefix in prefixes:
120 if key.startswith(prefix):
120 if key.startswith(prefix):
121 name = key.split(prefix)[1].strip()
121 name = key.split(prefix)[1].strip()
122 val = settings[key]
122 val = settings[key]
123 if isinstance(val, basestring):
123 if isinstance(val, basestring):
124 val = val.strip()
124 val = val.strip()
125 cache_settings[name] = val
125 cache_settings[name] = val
126 return cache_settings
126 return cache_settings
127
127
128
128
129 def compute_key_from_params(*args):
129 def compute_key_from_params(*args):
130 """
130 """
131 Helper to compute key from given params to be used in cache manager
131 Helper to compute key from given params to be used in cache manager
132 """
132 """
133 return sha1("_".join(map(safe_str, args)))
133 return sha1("_".join(map(safe_str, args)))
134
134
135
135
136 def key_generator(namespace, fn):
136 def key_generator(namespace, fn):
137 fname = fn.__name__
137 fname = fn.__name__
138
138
139 def generate_key(*args):
139 def generate_key(*args):
140 namespace_pref = namespace or 'default'
140 namespace_pref = namespace or 'default'
141 arg_key = compute_key_from_params(*args)
141 arg_key = compute_key_from_params(*args)
142 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
142 final_key = "{}:{}_{}".format(namespace_pref, fname, arg_key)
143
143
144 return final_key
144 return final_key
145
145
146 return generate_key
146 return generate_key
147
147
148
148
149 def get_or_create_region(region_name, region_namespace=None):
149 def get_or_create_region(region_name, region_namespace=None):
150 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
150 from rhodecode.lib.rc_cache.backends import FileNamespaceBackend
151 region_obj = region_meta.dogpile_cache_regions.get(region_name)
151 region_obj = region_meta.dogpile_cache_regions.get(region_name)
152 if not region_obj:
152 if not region_obj:
153 raise EnvironmentError(
153 raise EnvironmentError(
154 'Region `{}` not in configured: {}.'.format(
154 'Region `{}` not in configured: {}.'.format(
155 region_name, region_meta.dogpile_cache_regions.keys()))
155 region_name, region_meta.dogpile_cache_regions.keys()))
156
156
157 region_uid_name = '{}:{}'.format(region_name, region_namespace)
157 region_uid_name = '{}:{}'.format(region_name, region_namespace)
158 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
158 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
159 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
159 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
160 if region_exist:
160 if region_exist:
161 log.debug('Using already configured region: %s', region_namespace)
161 log.debug('Using already configured region: %s', region_namespace)
162 return region_exist
162 return region_exist
163 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
163 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
164 expiration_time = region_obj.expiration_time
164 expiration_time = region_obj.expiration_time
165
165
166 if not os.path.isdir(cache_dir):
166 if not os.path.isdir(cache_dir):
167 os.makedirs(cache_dir)
167 os.makedirs(cache_dir)
168 new_region = make_region(
168 new_region = make_region(
169 name=region_uid_name, function_key_generator=key_generator
169 name=region_uid_name, function_key_generator=key_generator
170 )
170 )
171 namespace_filename = os.path.join(
171 namespace_filename = os.path.join(
172 cache_dir, "{}.cache.dbm".format(region_namespace))
172 cache_dir, "{}.cache.dbm".format(region_namespace))
173 # special type that allows 1db per namespace
173 # special type that allows 1db per namespace
174 new_region.configure(
174 new_region.configure(
175 backend='dogpile.cache.rc.file_namespace',
175 backend='dogpile.cache.rc.file_namespace',
176 expiration_time=expiration_time,
176 expiration_time=expiration_time,
177 arguments={"filename": namespace_filename}
177 arguments={"filename": namespace_filename}
178 )
178 )
179
179
180 # create and save in region caches
180 # create and save in region caches
181 log.debug('configuring new region: %s',region_uid_name)
181 log.debug('configuring new region: %s',region_uid_name)
182 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
182 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
183
183
184 return region_obj
184 return region_obj
185
185
186
186
187 def clear_cache_namespace(cache_region, cache_namespace_uid):
187 def clear_cache_namespace(cache_region, cache_namespace_uid):
188 region = get_or_create_region(cache_region, cache_namespace_uid)
188 region = get_or_create_region(cache_region, cache_namespace_uid)
189 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
189 cache_keys = region.backend.list_keys(prefix=cache_namespace_uid)
190 region.delete_multi(cache_keys)
190 region.delete_multi(cache_keys)
191 return len(cache_keys)
191 return len(cache_keys)
192
192
193
193
194 class ActiveRegionCache(object):
194 class ActiveRegionCache(object):
195 def __init__(self, context):
195 def __init__(self, context):
196 self.context = context
196 self.context = context
197
197
198 def should_invalidate(self):
198 def should_invalidate(self):
199 return False
199 return False
200
200
201
201
202 class FreshRegionCache(object):
202 class FreshRegionCache(object):
203 def __init__(self, context):
203 def __init__(self, context):
204 self.context = context
204 self.context = context
205
205
206 def should_invalidate(self):
206 def should_invalidate(self):
207 return True
207 return True
208
208
209
209
210 class InvalidationContext(object):
210 class InvalidationContext(object):
211 """
211 """
212 usage::
212 usage::
213
213
214 from rhodecode.lib import rc_cache
214 from rhodecode.lib import rc_cache
215
215
216 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
216 cache_namespace_uid = CacheKey.SOME_NAMESPACE.format(1)
217 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
217 region = rc_cache.get_or_create_region('cache_perms', cache_namespace_uid)
218
218
219 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
219 @region.conditional_cache_on_arguments(namespace=cache_namespace_uid, condition=True)
220 def heavy_compute(cache_name, param1, param2):
220 def heavy_compute(cache_name, param1, param2):
221 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
221 print('COMPUTE {}, {}, {}'.format(cache_name, param1, param2))
222
222
223 # invalidation namespace is shared namespace key for all process caches
223 # invalidation namespace is shared namespace key for all process caches
224 # we use it to send a global signal
224 # we use it to send a global signal
225 invalidation_namespace = 'repo_cache:1'
225 invalidation_namespace = 'repo_cache:1'
226
226
227 inv_context_manager = rc_cache.InvalidationContext(
227 inv_context_manager = rc_cache.InvalidationContext(
228 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
228 uid=cache_namespace_uid, invalidation_namespace=invalidation_namespace)
229 with inv_context_manager as invalidation_context:
229 with inv_context_manager as invalidation_context:
230 # check for stored invalidation signal, and maybe purge the cache
230 # check for stored invalidation signal, and maybe purge the cache
231 # before computing it again
231 # before computing it again
232 if invalidation_context.should_invalidate():
232 if invalidation_context.should_invalidate():
233 heavy_compute.invalidate('some_name', 'param1', 'param2')
233 heavy_compute.invalidate('some_name', 'param1', 'param2')
234
234
235 result = heavy_compute('some_name', 'param1', 'param2')
235 result = heavy_compute('some_name', 'param1', 'param2')
236 compute_time = inv_context_manager.compute_time
236 compute_time = inv_context_manager.compute_time
237 print(compute_time)
237 print(compute_time)
238
238
239 # To send global invalidation signal, simply run
239 # To send global invalidation signal, simply run
240 CacheKey.set_invalidate(invalidation_namespace)
240 CacheKey.set_invalidate(invalidation_namespace)
241
241
242 """
242 """
243
243
244 def __repr__(self):
244 def __repr__(self):
245 return '<InvalidationContext:{}[{}]>'.format(
245 return '<InvalidationContext:{}[{}]>'.format(
246 safe_str(self.cache_key), safe_str(self.uid))
246 safe_str(self.cache_key), safe_str(self.uid))
247
247
248 def __init__(self, uid, invalidation_namespace='',
248 def __init__(self, uid, invalidation_namespace='',
249 raise_exception=False, thread_scoped=None):
249 raise_exception=False, thread_scoped=None):
250 self.uid = uid
250 self.uid = uid
251 self.invalidation_namespace = invalidation_namespace
251 self.invalidation_namespace = invalidation_namespace
252 self.raise_exception = raise_exception
252 self.raise_exception = raise_exception
253 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
253 self.proc_id = safe_unicode(rhodecode.CONFIG.get('instance_id') or 'DEFAULT')
254 self.thread_id = 'global'
254 self.thread_id = 'global'
255
255
256 if thread_scoped is None:
256 if thread_scoped is None:
257 # if we set "default" we can override this via .ini settings
257 # if we set "default" we can override this via .ini settings
258 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
258 thread_scoped = str2bool(rhodecode.CONFIG.get('cache_thread_scoped'))
259
259
260 # Append the thread id to the cache key if this invalidation context
260 # Append the thread id to the cache key if this invalidation context
261 # should be scoped to the current thread.
261 # should be scoped to the current thread.
262 if thread_scoped is True:
262 if thread_scoped is True:
263 self.thread_id = threading.current_thread().ident
263 self.thread_id = threading.current_thread().ident
264
264
265 self.cache_key = compute_key_from_params(uid)
265 self.cache_key = compute_key_from_params(uid)
266 self.cache_key = 'proc:{}_thread:{}_{}'.format(
266 self.cache_key = 'proc:{}_thread:{}_{}'.format(
267 self.proc_id, self.thread_id, self.cache_key)
267 self.proc_id, self.thread_id, self.cache_key)
268 self.compute_time = 0
268 self.compute_time = 0
269
269
270 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
270 def get_or_create_cache_obj(self, uid, invalidation_namespace=''):
271 log.debug('Checking if %s cache key is present and active', self.cache_key)
272 cache_obj = CacheKey.get_active_cache(self.cache_key)
271 cache_obj = CacheKey.get_active_cache(self.cache_key)
272 log.debug('Fetched cache obj %s using %s cache key.', cache_obj, self.cache_key)
273 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
273 invalidation_namespace = invalidation_namespace or self.invalidation_namespace
274 if not cache_obj:
274 if not cache_obj:
275 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
275 cache_obj = CacheKey(self.cache_key, cache_args=invalidation_namespace)
276 return cache_obj
276 return cache_obj
277
277
278 def __enter__(self):
278 def __enter__(self):
279 """
279 """
280 Test if current object is valid, and return CacheRegion function
280 Test if current object is valid, and return CacheRegion function
281 that does invalidation and calculation
281 that does invalidation and calculation
282 """
282 """
283 # register or get a new key based on uid
283 # register or get a new key based on uid
284 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
284 self.cache_obj = self.get_or_create_cache_obj(uid=self.uid)
285 self._start_time = time.time()
285 self._start_time = time.time()
286 if self.cache_obj.cache_active:
286 if self.cache_obj.cache_active:
287 # means our cache obj is existing and marked as it's
287 # means our cache obj is existing and marked as it's
288 # cache is not outdated, we return ActiveRegionCache
288 # cache is not outdated, we return ActiveRegionCache
289 self.skip_cache_active_change = True
289 self.skip_cache_active_change = True
290
290
291 return ActiveRegionCache(context=self)
291 return ActiveRegionCache(context=self)
292
292
293 # the key is either not existing or set to False, we return
293 # the key is either not existing or set to False, we return
294 # the real invalidator which re-computes value. We additionally set
294 # the real invalidator which re-computes value. We additionally set
295 # the flag to actually update the Database objects
295 # the flag to actually update the Database objects
296 self.skip_cache_active_change = False
296 self.skip_cache_active_change = False
297 return FreshRegionCache(context=self)
297 return FreshRegionCache(context=self)
298
298
299 def __exit__(self, exc_type, exc_val, exc_tb):
299 def __exit__(self, exc_type, exc_val, exc_tb):
300 # save compute time
300 # save compute time
301 self.compute_time = time.time() - self._start_time
301 self.compute_time = time.time() - self._start_time
302
302
303 if self.skip_cache_active_change:
303 if self.skip_cache_active_change:
304 return
304 return
305
305
306 try:
306 try:
307 self.cache_obj.cache_active = True
307 self.cache_obj.cache_active = True
308 Session().add(self.cache_obj)
308 Session().add(self.cache_obj)
309 Session().commit()
309 Session().commit()
310 except IntegrityError:
310 except IntegrityError:
311 # if we catch integrity error, it means we inserted this object
311 # if we catch integrity error, it means we inserted this object
312 # assumption is that's really an edge race-condition case and
312 # assumption is that's really an edge race-condition case and
313 # it's safe is to skip it
313 # it's safe is to skip it
314 Session().rollback()
314 Session().rollback()
315 except Exception:
315 except Exception:
316 log.exception('Failed to commit on cache key update')
316 log.exception('Failed to commit on cache key update')
317 Session().rollback()
317 Session().rollback()
318 if self.raise_exception:
318 if self.raise_exception:
319 raise
319 raise
General Comments 0
You need to be logged in to leave comments. Login now