##// END OF EJS Templates
caches: Add an argument to make the cache context thread scoped.
Martin Bornhold -
r613:af6adc7d default
parent child Browse files
Show More
@@ -1,226 +1,234 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2015-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import beaker
23 23 import logging
24 import threading
24 25
25 26 from beaker.cache import _cache_decorate, cache_regions, region_invalidate
26 27
27 28 from rhodecode.lib.utils import safe_str, md5
28 29 from rhodecode.model.db import Session, CacheKey, IntegrityError
29 30
30 31 log = logging.getLogger(__name__)
31 32
32 33 FILE_TREE = 'cache_file_tree'
33 34 FILE_TREE_META = 'cache_file_tree_metadata'
34 35 FILE_SEARCH_TREE_META = 'cache_file_search_metadata'
35 36 SUMMARY_STATS = 'cache_summary_stats'
36 37
37 38 # This list of caches gets purged when invalidation happens
38 39 USED_REPO_CACHES = (FILE_TREE, FILE_SEARCH_TREE_META)
39 40
40 41 DEFAULT_CACHE_MANAGER_CONFIG = {
41 42 'type': 'memorylru_base',
42 43 'max_items': 10240,
43 44 'key_length': 256,
44 45 'enabled': True
45 46 }
46 47
47 48
48 49 def configure_cache_region(
49 50 region_name, region_kw, default_cache_kw, default_expire=60):
50 51 default_type = default_cache_kw.get('type', 'memory')
51 52 default_lock_dir = default_cache_kw.get('lock_dir')
52 53 default_data_dir = default_cache_kw.get('data_dir')
53 54
54 55 region_kw['lock_dir'] = region_kw.get('lock_dir', default_lock_dir)
55 56 region_kw['data_dir'] = region_kw.get('data_dir', default_data_dir)
56 57 region_kw['type'] = region_kw.get('type', default_type)
57 58 region_kw['expire'] = int(region_kw.get('expire', default_expire))
58 59
59 60 beaker.cache.cache_regions[region_name] = region_kw
60 61
61 62
62 63 def get_cache_manager(region_name, cache_name, custom_ttl=None):
63 64 """
64 65 Creates a Beaker cache manager. Such instance can be used like that::
65 66
66 67 _namespace = caches.get_repo_namespace_key(caches.XXX, repo_name)
67 68 cache_manager = caches.get_cache_manager('repo_cache_long', _namespace)
68 69 _cache_key = caches.compute_key_from_params(repo_name, commit.raw_id)
69 70 def heavy_compute():
70 71 ...
71 72 result = cache_manager.get(_cache_key, createfunc=heavy_compute)
72 73
73 74 :param region_name: region from ini file
74 75 :param cache_name: custom cache name, usually prefix+repo_name. eg
75 76 file_switcher_repo1
76 77 :param custom_ttl: override .ini file timeout on this cache
77 78 :return: instance of cache manager
78 79 """
79 80
80 81 cache_config = cache_regions.get(region_name, DEFAULT_CACHE_MANAGER_CONFIG)
81 82 if custom_ttl:
82 83 log.debug('Updating region %s with custom ttl: %s',
83 84 region_name, custom_ttl)
84 85 cache_config.update({'expire': custom_ttl})
85 86
86 87 return beaker.cache.Cache._get_cache(cache_name, cache_config)
87 88
88 89
89 90 def clear_cache_manager(cache_manager):
90 91 """
91 92 namespace = 'foobar'
92 93 cache_manager = get_cache_manager('repo_cache_long', namespace)
93 94 clear_cache_manager(cache_manager)
94 95 """
95 96
96 97 log.debug('Clearing all values for cache manager %s', cache_manager)
97 98 cache_manager.clear()
98 99
99 100
100 101 def clear_repo_caches(repo_name):
101 102 # invalidate cache manager for this repo
102 103 for prefix in USED_REPO_CACHES:
103 104 namespace = get_repo_namespace_key(prefix, repo_name)
104 105 cache_manager = get_cache_manager('repo_cache_long', namespace)
105 106 clear_cache_manager(cache_manager)
106 107
107 108
108 109 def compute_key_from_params(*args):
109 110 """
110 111 Helper to compute key from given params to be used in cache manager
111 112 """
112 113 return md5("_".join(map(safe_str, args)))
113 114
114 115
115 116 def get_repo_namespace_key(prefix, repo_name):
116 117 return '{0}_{1}'.format(prefix, compute_key_from_params(repo_name))
117 118
118 119
119 120 def conditional_cache(region, prefix, condition, func):
120 121 """
121 122 Conditional caching function use like::
122 123 def _c(arg):
123 124 # heavy computation function
124 125 return data
125 126
126 127 # depending on the condition the compute is wrapped in cache or not
127 128 compute = conditional_cache('short_term', 'cache_desc',
128 129 condition=True, func=func)
129 130 return compute(arg)
130 131
131 132 :param region: name of cache region
132 133 :param prefix: cache region prefix
133 134 :param condition: condition for cache to be triggered, and
134 135 return data cached
135 136 :param func: wrapped heavy function to compute
136 137
137 138 """
138 139 wrapped = func
139 140 if condition:
140 141 log.debug('conditional_cache: True, wrapping call of '
141 142 'func: %s into %s region cache', region, func)
142 143 cached_region = _cache_decorate((prefix,), None, None, region)
143 144 wrapped = cached_region(func)
144 145 return wrapped
145 146
146 147
147 148 class ActiveRegionCache(object):
148 149 def __init__(self, context):
149 150 self.context = context
150 151
151 152 def invalidate(self, *args, **kwargs):
152 153 return False
153 154
154 155 def compute(self):
155 156 log.debug('Context cache: getting obj %s from cache', self.context)
156 157 return self.context.compute_func(self.context.cache_key)
157 158
158 159
159 160 class FreshRegionCache(ActiveRegionCache):
160 161 def invalidate(self):
161 162 log.debug('Context cache: invalidating cache for %s', self.context)
162 163 region_invalidate(
163 164 self.context.compute_func, None, self.context.cache_key)
164 165 return True
165 166
166 167
167 168 class InvalidationContext(object):
168 169 def __repr__(self):
169 170 return '<InvalidationContext:{}[{}]>'.format(
170 171 safe_str(self.repo_name), safe_str(self.cache_type))
171 172
172 173 def __init__(self, compute_func, repo_name, cache_type,
173 raise_exception=False):
174 raise_exception=False, thread_scoped=False):
174 175 self.compute_func = compute_func
175 176 self.repo_name = repo_name
176 177 self.cache_type = cache_type
177 178 self.cache_key = compute_key_from_params(
178 179 repo_name, cache_type)
179 180 self.raise_exception = raise_exception
180 181
182 # Append the thread id to the cache key if this invalidation context
183 # should be scoped to the current thread.
184 if thread_scoped:
185 thread_id = threading.current_thread().ident
186 self.cache_key = '{cache_key}_{thread_id}'.format(
187 cache_key=self.cache_key, thread_id=thread_id)
188
181 189 def get_cache_obj(self):
182 190 cache_key = CacheKey.get_cache_key(
183 191 self.repo_name, self.cache_type)
184 192 cache_obj = CacheKey.get_active_cache(cache_key)
185 193 if not cache_obj:
186 194 cache_obj = CacheKey(cache_key, self.repo_name)
187 195 return cache_obj
188 196
189 197 def __enter__(self):
190 198 """
191 199 Test if current object is valid, and return CacheRegion function
192 200 that does invalidation and calculation
193 201 """
194 202
195 203 self.cache_obj = self.get_cache_obj()
196 204 if self.cache_obj.cache_active:
197 205 # means our cache obj is existing and marked as it's
198 206 # cache is not outdated, we return BaseInvalidator
199 207 self.skip_cache_active_change = True
200 208 return ActiveRegionCache(self)
201 209
202 210 # the key is either not existing or set to False, we return
203 211 # the real invalidator which re-computes value. We additionally set
204 212 # the flag to actually update the Database objects
205 213 self.skip_cache_active_change = False
206 214 return FreshRegionCache(self)
207 215
208 216 def __exit__(self, exc_type, exc_val, exc_tb):
209 217
210 218 if self.skip_cache_active_change:
211 219 return
212 220
213 221 try:
214 222 self.cache_obj.cache_active = True
215 223 Session().add(self.cache_obj)
216 224 Session().commit()
217 225 except IntegrityError:
218 226 # if we catch integrity error, it means we inserted this object
219 227 # assumption is that's really an edge race-condition case and
220 228 # it's safe is to skip it
221 229 Session().rollback()
222 230 except Exception:
223 231 log.exception('Failed to commit on cache key update')
224 232 Session().rollback()
225 233 if self.raise_exception:
226 234 raise
General Comments 0
You need to be logged in to leave comments. Login now