##// END OF EJS Templates
caches: Add an argument to make the cache context thread scoped.
Martin Bornhold -
r613:af6adc7d default
parent child Browse files
Show More
@@ -1,226 +1,234 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2015-2016 RhodeCode GmbH
3 # Copyright (C) 2015-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21
21
22 import beaker
22 import beaker
23 import logging
23 import logging
24 import threading
24
25
25 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
26 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
26
27
27 from rhodecode.lib.utils import safe_str, md5
28 from rhodecode.lib.utils import safe_str, md5
28 from rhodecode.model.db import Session, CacheKey, IntegrityError
29 from rhodecode.model.db import Session, CacheKey, IntegrityError
29
30
30 log = logging.getLogger(__name__)
31 log = logging.getLogger(__name__)
31
32
32 FILE_TREE = 'cache_file_tree'
33 FILE_TREE = 'cache_file_tree'
33 FILE_TREE_META = 'cache_file_tree_metadata'
34 FILE_TREE_META = 'cache_file_tree_metadata'
34 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
35 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
35 SUMMARY_STATS = 'cache_summary_stats'
36 SUMMARY_STATS = 'cache_summary_stats'
36
37
37 # This list of caches gets purged when invalidation happens
38 # This list of caches gets purged when invalidation happens
38 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
39 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
39
40
40 DEFAULT_CACHE_MANAGER_CONFIG = {
41 DEFAULT_CACHE_MANAGER_CONFIG = {
41 'type': 'memorylru_base',
42 'type': 'memorylru_base',
42 'max_items': 10240,
43 'max_items': 10240,
43 'key_length': 256,
44 'key_length': 256,
44 'enabled': True
45 'enabled': True
45 }
46 }
46
47
47
48
48 def configure_cache_region(
49 def configure_cache_region(
49 region_name, region_kw, default_cache_kw, default_expire=60):
50 region_name, region_kw, default_cache_kw, default_expire=60):
50 default_type = default_cache_kw.get('type', 'memory')
51 default_type = default_cache_kw.get('type', 'memory')
51 default_lock_dir = default_cache_kw.get('lock_dir')
52 default_lock_dir = default_cache_kw.get('lock_dir')
52 default_data_dir = default_cache_kw.get('data_dir')
53 default_data_dir = default_cache_kw.get('data_dir')
53
54
54 region_kw['lock_dir'] = region_kw.get('lock_dir', default_lock_dir)
55 region_kw['lock_dir'] = region_kw.get('lock_dir', default_lock_dir)
55 region_kw['data_dir'] = region_kw.get('data_dir', default_data_dir)
56 region_kw['data_dir'] = region_kw.get('data_dir', default_data_dir)
56 region_kw['type'] = region_kw.get('type', default_type)
57 region_kw['type'] = region_kw.get('type', default_type)
57 region_kw['expire'] = int(region_kw.get('expire', default_expire))
58 region_kw['expire'] = int(region_kw.get('expire', default_expire))
58
59
59 beaker.cache.cache_regions[region_name] = region_kw
60 beaker.cache.cache_regions[region_name] = region_kw
60
61
61
62
62 def get_cache_manager(region_name, cache_name, custom_ttl=None):
63 def get_cache_manager(region_name, cache_name, custom_ttl=None):
63 """
64 """
64 Creates a Beaker cache manager. Such instance can be used like that::
65 Creates a Beaker cache manager. Such instance can be used like that::
65
66
66 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
67 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
67 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
68 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
68 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
69 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
69 def heavy_compute():
70 def heavy_compute():
70 ...
71 ...
71 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
72 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
72
73
73 :param region_name: region from ini file
74 :param region_name: region from ini file
74 :param cache_name: custom cache name, usually prefix+repo_name. eg
75 :param cache_name: custom cache name, usually prefix+repo_name. eg
75 file_switcher_repo1
76 file_switcher_repo1
76 :param custom_ttl: override .ini file timeout on this cache
77 :param custom_ttl: override .ini file timeout on this cache
77 :return: instance of cache manager
78 :return: instance of cache manager
78 """
79 """
79
80
80 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
81 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
81 if custom_ttl:
82 if custom_ttl:
82 log.debug('Updating region %s with custom ttl: %s',
83 log.debug('Updating region %s with custom ttl: %s',
83 region_name, custom_ttl)
84 region_name, custom_ttl)
84 cache_config.update({'expire': custom_ttl})
85 cache_config.update({'expire': custom_ttl})
85
86
86 return beaker.cache.Cache._get_cache(cache_name, cache_config)
87 return beaker.cache.Cache._get_cache(cache_name, cache_config)
87
88
88
89
89 def clear_cache_manager(cache_manager):
90 def clear_cache_manager(cache_manager):
90 """
91 """
91 namespace = 'foobar'
92 namespace = 'foobar'
92 cache_manager = get_cache_manager('repo_cache_long', namespace)
93 cache_manager = get_cache_manager('repo_cache_long', namespace)
93 clear_cache_manager(cache_manager)
94 clear_cache_manager(cache_manager)
94 """
95 """
95
96
96 log.debug('Clearing all values for cache manager %s', cache_manager)
97 log.debug('Clearing all values for cache manager %s', cache_manager)
97 cache_manager.clear()
98 cache_manager.clear()
98
99
99
100
100 def clear_repo_caches(repo_name):
101 def clear_repo_caches(repo_name):
101 # invalidate cache manager for this repo
102 # invalidate cache manager for this repo
102 for prefix in USED_REPO_CACHES:
103 for prefix in USED_REPO_CACHES:
103 namespace = get_repo_namespace_key(prefix, repo_name)
104 namespace = get_repo_namespace_key(prefix, repo_name)
104 cache_manager = get_cache_manager('repo_cache_long', namespace)
105 cache_manager = get_cache_manager('repo_cache_long', namespace)
105 clear_cache_manager(cache_manager)
106 clear_cache_manager(cache_manager)
106
107
107
108
108 def compute_key_from_params(*args):
109 def compute_key_from_params(*args):
109 """
110 """
110 Helper to compute key from given params to be used in cache manager
111 Helper to compute key from given params to be used in cache manager
111 """
112 """
112 return md5("_".join(map(safe_str, args)))
113 return md5("_".join(map(safe_str, args)))
113
114
114
115
115 def get_repo_namespace_key(prefix, repo_name):
116 def get_repo_namespace_key(prefix, repo_name):
116 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
117 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
117
118
118
119
119 def conditional_cache(region, prefix, condition, func):
120 def conditional_cache(region, prefix, condition, func):
120 """
121 """
121 Conditional caching function use like::
122 Conditional caching function use like::
122 def _c(arg):
123 def _c(arg):
123 # heavy computation function
124 # heavy computation function
124 return data
125 return data
125
126
126 # depending on the condition the compute is wrapped in cache or not
127 # depending on the condition the compute is wrapped in cache or not
127 compute = conditional_cache('short_term', 'cache_desc',
128 compute = conditional_cache('short_term', 'cache_desc',
128 condition=True, func=func)
129 condition=True, func=func)
129 return compute(arg)
130 return compute(arg)
130
131
131 :param region: name of cache region
132 :param region: name of cache region
132 :param prefix: cache region prefix
133 :param prefix: cache region prefix
133 :param condition: condition for cache to be triggered, and
134 :param condition: condition for cache to be triggered, and
134 return data cached
135 return data cached
135 :param func: wrapped heavy function to compute
136 :param func: wrapped heavy function to compute
136
137
137 """
138 """
138 wrapped = func
139 wrapped = func
139 if condition:
140 if condition:
140 log.debug('conditional_cache: True, wrapping call of '
141 log.debug('conditional_cache: True, wrapping call of '
141 'func: %s into %s region cache', region, func)
142 'func: %s into %s region cache', region, func)
142 cached_region = _cache_decorate((prefix,), None, None, region)
143 cached_region = _cache_decorate((prefix,), None, None, region)
143 wrapped = cached_region(func)
144 wrapped = cached_region(func)
144 return wrapped
145 return wrapped
145
146
146
147
147 class ActiveRegionCache(object):
148 class ActiveRegionCache(object):
148 def __init__(self, context):
149 def __init__(self, context):
149 self.context = context
150 self.context = context
150
151
151 def invalidate(self, *args, **kwargs):
152 def invalidate(self, *args, **kwargs):
152 return False
153 return False
153
154
154 def compute(self):
155 def compute(self):
155 log.debug('Context cache: getting obj %s from cache', self.context)
156 log.debug('Context cache: getting obj %s from cache', self.context)
156 return self.context.compute_func(self.context.cache_key)
157 return self.context.compute_func(self.context.cache_key)
157
158
158
159
159 class FreshRegionCache(ActiveRegionCache):
160 class FreshRegionCache(ActiveRegionCache):
160 def invalidate(self):
161 def invalidate(self):
161 log.debug('Context cache: invalidating cache for %s', self.context)
162 log.debug('Context cache: invalidating cache for %s', self.context)
162 region_invalidate(
163 region_invalidate(
163 self.context.compute_func, None, self.context.cache_key)
164 self.context.compute_func, None, self.context.cache_key)
164 return True
165 return True
165
166
166
167
167 class InvalidationContext(object):
168 class InvalidationContext(object):
168 def __repr__(self):
169 def __repr__(self):
169 return '<InvalidationContext:{}[{}]>'.format(
170 return '<InvalidationContext:{}[{}]>'.format(
170 safe_str(self.repo_name), safe_str(self.cache_type))
171 safe_str(self.repo_name), safe_str(self.cache_type))
171
172
172 def __init__(self, compute_func, repo_name, cache_type,
173 def __init__(self, compute_func, repo_name, cache_type,
173 raise_exception=False):
174 raise_exception=False, thread_scoped=False):
174 self.compute_func = compute_func
175 self.compute_func = compute_func
175 self.repo_name = repo_name
176 self.repo_name = repo_name
176 self.cache_type = cache_type
177 self.cache_type = cache_type
177 self.cache_key = compute_key_from_params(
178 self.cache_key = compute_key_from_params(
178 repo_name, cache_type)
179 repo_name, cache_type)
179 self.raise_exception = raise_exception
180 self.raise_exception = raise_exception
180
181
182 # Append the thread id to the cache key if this invalidation context
183 # should be scoped to the current thread.
184 if thread_scoped:
185 thread_id = threading.current_thread().ident
186 self.cache_key = '{cache_key}_{thread_id}'.format(
187 cache_key=self.cache_key, thread_id=thread_id)
188
181 def get_cache_obj(self):
189 def get_cache_obj(self):
182 cache_key = CacheKey.get_cache_key(
190 cache_key = CacheKey.get_cache_key(
183 self.repo_name, self.cache_type)
191 self.repo_name, self.cache_type)
184 cache_obj = CacheKey.get_active_cache(cache_key)
192 cache_obj = CacheKey.get_active_cache(cache_key)
185 if not cache_obj:
193 if not cache_obj:
186 cache_obj = CacheKey(cache_key, self.repo_name)
194 cache_obj = CacheKey(cache_key, self.repo_name)
187 return cache_obj
195 return cache_obj
188
196
189 def __enter__(self):
197 def __enter__(self):
190 """
198 """
191 Test if current object is valid, and return CacheRegion function
199 Test if current object is valid, and return CacheRegion function
192 that does invalidation and calculation
200 that does invalidation and calculation
193 """
201 """
194
202
195 self.cache_obj = self.get_cache_obj()
203 self.cache_obj = self.get_cache_obj()
196 if self.cache_obj.cache_active:
204 if self.cache_obj.cache_active:
197 # means our cache obj is existing and marked as it's
205 # means our cache obj is existing and marked as it's
198 # cache is not outdated, we return BaseInvalidator
206 # cache is not outdated, we return BaseInvalidator
199 self.skip_cache_active_change = True
207 self.skip_cache_active_change = True
200 return ActiveRegionCache(self)
208 return ActiveRegionCache(self)
201
209
202 # the key is either not existing or set to False, we return
210 # the key is either not existing or set to False, we return
203 # the real invalidator which re-computes value. We additionally set
211 # the real invalidator which re-computes value. We additionally set
204 # the flag to actually update the Database objects
212 # the flag to actually update the Database objects
205 self.skip_cache_active_change = False
213 self.skip_cache_active_change = False
206 return FreshRegionCache(self)
214 return FreshRegionCache(self)
207
215
208 def __exit__(self, exc_type, exc_val, exc_tb):
216 def __exit__(self, exc_type, exc_val, exc_tb):
209
217
210 if self.skip_cache_active_change:
218 if self.skip_cache_active_change:
211 return
219 return
212
220
213 try:
221 try:
214 self.cache_obj.cache_active = True
222 self.cache_obj.cache_active = True
215 Session().add(self.cache_obj)
223 Session().add(self.cache_obj)
216 Session().commit()
224 Session().commit()
217 except IntegrityError:
225 except IntegrityError:
218 # if we catch integrity error, it means we inserted this object
226 # if we catch integrity error, it means we inserted this object
219 # assumption is that's really an edge race-condition case and
227 # assumption is that's really an edge race-condition case and
220 # it's safe is to skip it
228 # it's safe is to skip it
221 Session().rollback()
229 Session().rollback()
222 except Exception:
230 except Exception:
223 log.exception('Failed to commit on cache key update')
231 log.exception('Failed to commit on cache key update')
224 Session().rollback()
232 Session().rollback()
225 if self.raise_exception:
233 if self.raise_exception:
226 raise
234 raise
General Comments 0
You need to be logged in to leave comments. Login now